<a href="https://colab.research.google.com/github/fellowship/web-agent/blob/main/Cohort%2034%20-%20Web%20Agent/Firoj's%20Code/Autinomous_web_agent_first_iteration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install fastapi pydantic google-generativeai playwright playwright-stealth python-ghost-cursor python-dotenv pillow -q
!playwright install --with-deps

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.2/95.2 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.2/45.2 MB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.0/72.0 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m20.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling dependencies...
Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes

In [2]:
# Set Gemini API key
from google.colab import userdata
import os
os.environ["GEMINI_API_KEY"] = userdata.get('Gemini_2')

In [3]:
# Import libraries
import asyncio
import base64
import datetime
import io
import json
import logging
import random
import sys
import textwrap
import re
from pydantic import BaseModel, Field
from typing import List, Optional
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
from python_ghost_cursor.playwright_async import create_cursor
from PIL import Image
import google.generativeai as genai

In [4]:
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
model = genai.GenerativeModel('gemini-2.0-flash')

In [5]:
# Pydantic schemas
class UserPrompt(BaseModel):
    prompt: str = Field(..., description="The user prompt containing a starting URL and a task to be performed.")

class ExtractedTask(BaseModel):
    url: str = Field(..., description="The extracted URL from the user prompt.")
    task: str = Field(..., description="The task to be performed on the URL.")
    what_do_you_plan_to_do: str = Field(..., description="What do you plan to achieve with the task to be performed?")

class HealingCommand(BaseModel):
    command: str = Field(..., description="The corrected Python Playwright command(s) to execute next.")
    task_completed: bool = Field(..., description="Indicates whether the task has been fully completed.")
    reason_for_fix: str = Field(..., description="Rationale for why the new command will resolve the issue.")
    output: Optional[str] = Field(None, description="Result extracted from screenshots, if applicable.")

class NextStepCommand(BaseModel):
    command: str = Field(..., description="The next valid Python Playwright command(s) to be executed.")
    task_completed: bool = Field(..., description="Indicates whether the task has been fully completed.")
    what_do_you_plan_to_do: str = Field(..., description="Explanation of the intended result of the next step.")
    output: Optional[str] = Field(None, description="Result extracted from screenshots, if applicable.")

In [6]:
# Playwright reference as a string
playwright_ref = """# BASIC NAVIGATION:
# Navigate to a URL
page.goto('https://example.com')

# Reload the page
page.reload()

# Go back and forward in history
page.goBack()
page.goForward()

# INTERACTING WITH ELEMENTS:
# Click an element
page.click('css selector')

# Type text into an input field
page.type('input[name="username"]', 'your_username')

# Press Enter key
page.press('input[name="password"]', 'Enter')

# Get text content of an element
text = page.text_content('h1')

# Select an option in a dropdown
page.select_option('select', label='Option 1')

# Waiting for elements to appear or become visible
page.wait_for_selector('div#my-element', state='visible')

# Check if an element exists
assert page.locator("button").is_visible()

# Check the page title
assert page.title() == "Expected Title"

# SCREENSHOTS AND PDFS:
# Take a screenshot
page.screenshot(path='screenshot.png')

# Generate a PDF
page.pdf(path='document.pdf')

# HANDLING COOKIES:
# Get all cookies
cookies = page.cookies()

# Set a cookie
page.set_cookie(name='my_cookie', value='cookie_value')

# Delete a cookie
page.delete_cookie(name='my_cookie')

# HANDLING ALERTS AND DIALOGS:
# Handle a JavaScript alert
page.on('dialog').accept()
page.on('dialog').dismiss()

# KEYBOARD AND MOUSE INPUT:
# Type text
page.type('input', 'Hello, Playwright!')

# Press and release keyboard keys
page.keyboard.press('Enter')
page.keyboard.release('Shift')

# Move the mouse and click
page.mouse.move(100, 100)
page.mouse.click()

# EVALUATING JAVASCRIPT:
# Evaluate JavaScript in the context of the page
result = page.evaluate('1 + 2')

# WORKING WITH FRAMES:
# Switch to a frame by name, id, or index
page.frame(name='frameName')
page.frame(index=0)

# Execute code in the context of a frame
frame = page.frame(index=0)
frame.evaluate('console.log("Hello from frame!")')
"""

In [7]:
# JavaScript highlighter code
highlight_js = """() => {
    document.querySelectorAll("a,button,input,textarea").forEach((e) => {
        const attrs = e.attributes;
        const attrPairsP1 = [];
        const attrPairsP2 = [];
        const attrPairsP3 = [];
        for (let i = 0; i < attrs.length; i++) {
            const attr = attrs[i];
            const attrName = attr.name.trim();
            const attrValue = attr.value.trim();
            if (attrName.length === 0 || attrValue.length === 0) continue;
            switch (attrName) {
                case "id":
                    attrPairsP1.push(`${attrName}=${attrValue}`);
                    break;
                case "aria-label":
                case "title":
                    if (attrPairsP2.length === 0) {
                        if (attrValue !== e.innerText) {
                            attrPairsP2.push(`${attrName}=${attrValue}`);
                        }
                    }
                    break;
                case "class":
                    attrPairsP3.push(`${attrName}=${attrValue}`);
                    break;
            }
        }
        const attrPairs = [];
        if (attrPairsP1.length > 0) {
            attrPairs.push(...attrPairsP1);
        } else if (attrPairsP2.length > 0) {
            attrPairs.push(...attrPairsP2);
        } else {
            attrPairs.push(...attrPairsP3);
        }
        if (attrPairs.length === 0) return;
        const rect = e.getBoundingClientRect();
        const left = rect.left + window.scrollX;
        const top = rect.top + window.scrollY;
        const div = document.createElement("div");
        div.innerText = attrPairs.join(" ");
        div.style.cssText = `
            background: black;
            color: white;
            font-size: 12px;
            position: absolute;
            top: ${top - 12}px;
            left: ${left - 6}px;
            z-index: 2147483647;
        `;
        document.body.appendChild(div);
    });
}
"""

In [8]:
# Fallback functions to parse plain text
def extract_task_from_text(response_text):
    url_match = re.search(r'(?:URL:|"url":\s*")([^"]+)', response_text, re.IGNORECASE)
    task_match = re.search(r'(?:Task:|"task":\s*")([^"]+)', response_text, re.IGNORECASE)
    plan_match = re.search(r'(?:what_do_you_plan_to_do:|"what_do_you_plan_to_do":\s*")([^"]+)', response_text, re.IGNORECASE)

    url = url_match.group(1).strip() if url_match else ""
    task = task_match.group(1).strip() if task_match else ""
    what_do_you_plan_to_do = plan_match.group(1).strip() if plan_match else "Obtain the requested information from the URL"

    if not url or not task:
        logging.error(f"Failed to extract valid URL or task from response: {response_text}")
        raise ValueError("Could not extract valid URL or task from response")

    return ExtractedTask(
        url=url,
        task=task,
        what_do_you_plan_to_do=what_do_you_plan_to_do
    )

def extract_next_step_from_text(response_text):
    # Try parsing as JSON first if it looks like JSON
    if response_text.strip().startswith('{') or '```json' in response_text:
        try:
            if response_text.startswith("```json") and response_text.endswith("```"):
                response_text = response_text[7:-3].strip()
            response_dict = json.loads(response_text)
            return NextStepCommand(**response_dict)
        except json.JSONDecodeError:
            logging.warning("Direct JSON parsing in fallback failed, proceeding with regex")

    # Fallback to regex parsing
    command_match = re.search(r'"command":\s*"([^"]+)"', response_text, re.IGNORECASE | re.DOTALL)
    task_completed_match = re.search(r'"task_completed":\s*(true|false)', response_text, re.IGNORECASE)
    plan_match = re.search(r'"what_do_you_plan_to_do":\s*"([^"]+)"', response_text, re.IGNORECASE | re.DOTALL)

    command = command_match.group(1).strip() if command_match else ""
    task_completed = task_completed_match.group(1).lower() == "true" if task_completed_match else False
    what_do_you_plan_to_do = plan_match.group(1).strip() if plan_match else "Proceed with the next step"

    if not command:
        logging.error(f"Failed to extract command from response: {response_text}")
        raise ValueError("Could not extract valid command from response")

    return NextStepCommand(
        command=command,
        task_completed=task_completed,
        what_do_you_plan_to_do=what_do_you_plan_to_do,
        output=None
    )

def extract_healed_command_from_text(response_text):
    # Try parsing as JSON first if it looks like JSON
    if response_text.strip().startswith('{') or '```json' in response_text:
        try:
            if response_text.startswith("```json") and response_text.endswith("```"):
                response_text = response_text[7:-3].strip()
            response_dict = json.loads(response_text)
            return HealingCommand(**response_dict)
        except json.JSONDecodeError:
            logging.warning("Direct JSON parsing in fallback failed, proceeding with regex")

    # Fallback to regex parsing
    command_match = re.search(r'"command":\s*"([^"]+)"', response_text, re.IGNORECASE | re.DOTALL)
    task_completed_match = re.search(r'"task_completed":\s*(true|false)', response_text, re.IGNORECASE)
    reason_match = re.search(r'"reason_for_fix":\s*"([^"]+)"', response_text, re.IGNORECASE | re.DOTALL)

    command = command_match.group(1).strip() if command_match else ""
    task_completed = task_completed_match.group(1).lower() == "true" if task_completed_match else False
    reason_for_fix = reason_match.group(1).strip() if reason_match else "Fix the failed command"

    if not command:
        logging.error(f"Failed to extract command from response: {response_text}")
        raise ValueError("Could not extract valid command from response")

    return HealingCommand(
        command=command,
        task_completed=task_completed,
        reason_for_fix=reason_for_fix,
        output=None
    )

In [9]:
# GhostPage for custom click behavior
class GhostPage:
    def __init__(self, page):
        self.page = page
        self.cursor = create_cursor(page)

    async def click(self, selector, **kwargs):
        await self.cursor.click(selector, **kwargs)

    def __getattr__(self, name):
        return getattr(self.page, name)

In [10]:
# Initialize browser
async def init_browser(base_url: str = ""):
    userAgents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
    ]
    playwright = await async_playwright().start()
    browser = await playwright.chromium.launch(
        headless=True,
        args=["--disable-web-security", "--ignore-certificate-errors", "--allow-external-pages", "--no-sandbox"]
    )
    agent = userAgents[random.randint(0, len(userAgents) - 1)]
    print(f"Chosen User Agent: {agent}")
    context = await browser.new_context(
        ignore_https_errors=True,
        viewport={"width": 1920, "height": 1080},
        geolocation={"longitude": 41.890221, "latitude": 12.492348},
        permissions=["geolocation"],
        user_agent=agent
    )
    page = await context.new_page()
    await stealth_async(page)
    return browser, page

# Fix URL
def fix_url(url: str):
    if not url.startswith("http://") and not url.startswith("https://"):
        url = f"https://{url}"
    return url

# Navigate to URL
async def navigate_to_url(page, url: str):
    try:
        url = fix_url(url)
        response = await page.goto(url, timeout=30000)
        await page.wait_for_load_state('networkidle')
        if response is None or response.status != 200:
            raise Exception(f"Failed to navigate to {url}. Status code: {response.status if response else 'None'}")
    except Exception as e:
        logging.error(f"Failed to navigate to URL {url}: {str(e)}", exc_info=True)
        raise

# Run Playwright command or script
async def run_playwright_script_or_command(page, command_or_script: str, state):
    exec_globals = {"page": page, "state": state, "results": None}  # Initialize results to None
    exec_locals = {}
    old_stdout = sys.stdout
    redirected_output = io.StringIO()
    sys.stdout = redirected_output
    try:
        if 'await' in command_or_script or '\n' in command_or_script:
            async def async_wrapper():
                exec_globals = {"page": page, "state": state, "results": None}
                exec_locals = {}
                try:
                    exec(f"async def _async_exec_func():\n{textwrap.indent(command_or_script, '    ')}", exec_globals, exec_locals)
                    result = await exec_locals["_async_exec_func"]()
                    if 'results' in exec_locals and isinstance(exec_locals['results'], (type(async_wrapper),)):
                        exec_locals['results'] = await exec_locals['results']
                    return exec_locals.get('results', None)
                except UnboundLocalError as e:
                    logging.error(f"UnboundLocalError in script execution: {str(e)}", exc_info=True)
                    raise Exception(f"Script uses undefined variable: {str(e)}")
                except Exception as e:
                    raise e
            result = await async_wrapper()
        else:
            command_to_execute = command_or_script.replace('await ', '').strip()
            result = await eval(command_to_execute, exec_globals, exec_locals)
            if result is not None:
                state['last_output'] = result
        output = redirected_output.getvalue()
        if output:
            state['last_output'] = output.strip()
            print(f"Captured output: {output.strip()}")
    except Exception as e:
        logging.error(f"Failed to execute Playwright command/script: {str(e)}", exc_info=True)
        raise
    finally:
        sys.stdout = old_stdout
        redirected_output.close()
    return result

In [11]:
# Execute Playwright command with retries
async def execute_playwright_command(page, command: str, state, main_task, screenshot, clean_screenshot, what_do_you_plan_to_do: str = '', max_retries=4):
    failed_attempts = []
    try:
        result = await run_playwright_script_or_command(page, command, state)
        state['steps'].append({
            "command": command,
            "status": "success",
            "reason": what_do_you_plan_to_do,
            "output": result.strip() if result else None
        })
        return result
    except Exception as e:
        logging.error(f"Failed to execute Playwright command: {str(e)}", exc_info=True)
        failed_attempts.append({
            "attempt": 1,
            "command": command,
            "error": str(e)
        })
        retry_count = 1
        while retry_count <= max_retries:
            try:
                healed_command_or_script = await request_healed_command(state, main_task, screenshot, clean_screenshot, failed_attempts)
                if healed_command_or_script.command == command:
                    logging.warning(f"Healed command is the same as the original command: {command}. Skipping re-execution.")
                    break
                if healed_command_or_script.output:
                    state['steps'].append({
                        "command": healed_command_or_script.command,
                        "status": "healed_vision",
                        "reason": healed_command_or_script.reason_for_fix,
                        "output": healed_command_or_script.output.strip()
                    })
                    return healed_command_or_script.output
                result = await run_playwright_script_or_command(page, healed_command_or_script.command, state)
                state['steps'].append({
                    "command": healed_command_or_script.command,
                    "status": "healed",
                    "reason": healed_command_or_script.reason_for_fix,
                    "output": result.strip() if result else None
                })
                return result
            except Exception as healed_exception:
                logging.error(f"Failed to execute healed Playwright command on attempt {retry_count + 1}/{max_retries}: {str(healed_exception)}", exc_info=True)
                failed_attempts.append({
                    "attempt": retry_count + 1,
                    "command": command,  # Use original command if healed one fails
                    "error": str(healed_exception)
                })
                retry_count += 1
        raise Exception(f"Failed to execute healed Playwright commands after {max_retries} attempts.")

In [12]:
# Perform task
async def perform_task(page, main_task, state):
    last_screenshot = state["screenshot"]
    last_clean_screenshot = state["clean_screenshot"]
    print(f"Initial state captured")
    while True:
        print(f"Requesting next step as command")
        next_command = await request_next_step_as_command(state, main_task, last_screenshot, last_clean_screenshot)
        print(f"Received next command: {next_command}")
        if next_command.output:
            print(f"Task step answered directly using vision, with output: {next_command.output}")
            state['steps'].append({
                "command": None,
                "status": "success",
                "reason": next_command.what_do_you_plan_to_do,
                "output": next_command.output.strip()
            })
        else:
            print(f"Executing Playwright command: {next_command.command}")
            await execute_playwright_command(page, next_command.command, state, main_task, last_screenshot, last_clean_screenshot, next_command.what_do_you_plan_to_do)
        print(f"Updating state with new screenshot and source code")
        clean_screenshot, screenshot_data_url, source_code = await capture_screenshot_and_source(page, f"step-{len(state['steps']) + 1}")
        last_screenshot = screenshot_data_url
        last_clean_screenshot = clean_screenshot
        state["source_code"] = source_code
        print(f"State updated. Next command: {next_command.command}")
        if next_command.task_completed:
            break
    print(f"********** Task completed **********")
    del state["screenshot"]
    del state["clean_screenshot"]
    del state["source_code"]
    print(f"Task completed. Final state: {json.dumps(state, indent=2)}")

In [13]:
# Capture screenshot and source
async def capture_screenshot_and_source(page, step: str):
    print(f"Capturing CLEAN screenshot for step {step}")
    screenshot_buffer0 = await page.screenshot(full_page=True, type="jpeg", quality=60)
    print(f"Highlighting elements on the page for step {step}")
    await page.evaluate(highlight_js)
    print(f"Capturing screenshot for step {step}")
    screenshot_buffer = await page.screenshot(full_page=True, type="jpeg", quality=80)
    print(f"Screenshot captured for step {step}")
    print(f"Converting screenshot to base64 data URL")
    screenshot_data_url0 = f"data:image/jpeg;base64,{base64.b64encode(screenshot_buffer0).decode('utf-8')}"
    screenshot_data_url = f"data:image/jpeg;base64,{base64.b64encode(screenshot_buffer).decode('utf-8')}"
    source_code = await page.content()
    with open(f"/content/screenshot_{step}.jpg", "wb") as f:
        f.write(screenshot_buffer)
    from google.colab import files
    files.download(f"/content/screenshot_{step}.jpg")
    return screenshot_data_url0, screenshot_data_url, source_code

In [14]:
# Capture initial screenshot
async def capture_initial_screenshot(page):
    screenshot_clean, screenshot_data_url, source = await capture_screenshot_and_source(page, "initial")
    return screenshot_clean, screenshot_data_url

In [15]:
async def request_healed_command(state, main_task, screenshot, clean_screenshot, failed_attempts):
    failed_attempts_info = "\n".join([f"Attempt {attempt['attempt']}: Command `{attempt['command']}` failed with error: {attempt['error']}" for attempt in failed_attempts])
    current_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    source_code_info = ""
    if len(failed_attempts) > 1:
        page_source_code = state.get('source_code', '')
        if page_source_code:
            source_code_info = f"\n# The current page source code is as follows:\n```html\n{page_source_code}\n```\n"

    # Updated prompt to ensure element visibility before interaction
    prompt = f"""
        # You are a professional automation quality engineer.
        # The current date and time is: {current_datetime}.
        # You read requirement and write Playwright in Python to automate the test.

        The following attempts to execute the command failed:
        {failed_attempts_info}

        {source_code_info}

        Please provide a new Playwright script or command that:
        1. Ensures the element is present and visible before interacting with it by using `await page.wait_for_selector(selector, state='visible')`.
        2. Uses a reliable selector for the search bar on Google (https://www.google.com/). The primary selector should be 'input[name="q"]', but if that fails, use a fallback like 'textarea[name="q"]' or 'input[title="Search"]'.
        3. Avoids searching for elements based on long text sentences. Use CSS selectors, ID attributes, or short, stable text fragments.
        4. Prioritizes CSS class or ID selectors for reliability.
        5. Completes the task within a maximum of 20 steps, avoiding repetitive actions.
        6. For the current task, fills the search bar with the term 'agents'.

        # Important:
        - Always use asynchronous Playwright commands with 'await'.
        - Ensure the element is visible using `await page.wait_for_selector()` before interacting.
        - Combine the wait and action into a single command string, separated by a semicolon (e.g., `await page.wait_for_selector('selector', state='visible'); await page.fill('selector', 'value')`).
        - Do not use undefined variables or assume prior assignments.
        - Avoid complex constructs like async functions or lists of commands in a single response.

        # Use the following reference for Playwright commands:
        ```playwright commands reference
        {playwright_ref}
        ```

        # Requirement:
        `{main_task}`

        # Here are the steps taken so far:
        `{json.dumps(state['steps'], indent=2)}`

        # Return the response strictly in the following JSON format:
        ```json
        {{
            "command": "string",
            "task_completed": false,
            "reason_for_fix": "string",
            "output": null
        }}
        ```
    """
    print(f"Healing Prompt: {prompt}")
    try:
        response = model.generate_content(
            contents=prompt,
            generation_config={'response_mime_type': 'application/json'}
        )
        response_text = response.text
        print(f"Raw healed response: {response_text}")
        try:
            response_dict = json.loads(response_text)
            healed_command = HealingCommand(**response_dict)
            print(f"Healed Response: {healed_command}")
            return healed_command
        except json.JSONDecodeError as e:
            logging.warning(f"JSON parsing failed for healed command: {str(e)}, attempting fallback parsing")
            return extract_healed_command_from_text(response_text)
    except Exception as e:
        logging.error(f"Failed to get healed command: {str(e)}", exc_info=True)
        raise

In [16]:
async def request_next_step_as_command(state, main_task, screenshot, clean_screenshot):
    action_prompt = "# Based on the output, take the necessary action to proceed with the task." if 'last_output' in state and isinstance(state['last_output'], list) and len(state['last_output']) > 0 else "# Proceed with the next step in the task."
    last_output_info = f"\n# The last command produced the following output: {state['last_output']}\n" if 'last_output' in state else ""
    current_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    prompt = f"""
        # You are a professional automation quality engineer.
        # The current date and time is: {current_datetime}.
        # You read requirement and write Playwright in Python to automate the test.

        You write one Playwright command at a time, using the asynchronous version of Playwright commands prefixed with 'await'. A screenshot will be provided after executing that command.
        Each element will be highlighted with a black box and its id, class attributes.
        For other elements, prefer text-based selector like await page.click("text='Button text'") to improve the stability of the script.
        Some website uses custom date or number picker, you should trigger the custom picker then click the buttons instead of filling textual value directly.
        You write one Playwright command at a time.

        Please provide a new Playwright script or command that:
        1. Avoids searching for elements based on long text sentences as these are unlikely to be found reliably. Instead, focus on using CSS selectors, ID attributes, or short, stable text fragments.
        2. Prioritizes the use of CSS class or ID selectors, which are more reliable for locating elements. For the Federal Reserve interest rates page (https://www.federalreserve.gov/releases/h15/), target the table containing interest rates using a selector like 'table.pubtables', 'table.data', or 'table' as a fallback.
        3. Completes the task within a maximum of 20 steps, avoiding repetitive approaches or actions. If an approach fails, try a different strategy instead of repeating the same action consecutively.
        4. Initializes the `results` variable explicitly with `await page.query_selector()` or `await page.locator()` before using it, and checks if `results` is not None before interacting.
        5. Accounts for visibility issues by ensuring elements are present, visible, and interactable before attempting actions. Use `await page.wait_for_selector()` if needed.

        # Important:
        - Always use asynchronous Playwright commands, prefixed with 'await'.
        - Initialize `results` with `await page.query_selector()` or `await page.locator()` before using it.
        - Check if `results` is not None before interacting (e.g., `if results:`).
        - If you need to assign data to a variable, always assign it to the variable `results`.
        - Ensure that assignments are done on a single line and avoid including print statements on the same line as an assignment.
        - Do not create multiline assignments or concatenate multiple statements on the same line.
        - After assigning data to `results`, use a separate `print(results)` statement on the following line to output the data.
        - Avoid chaining operations directly on coroutine results; always await the coroutine first, assign the result to `results`, then process it separately.

        # Use the following reference for Playwright commands as a guide to complete the task:
        ```playwright commands reference
        {playwright_ref}
        ```

        # Requirement:
        `{main_task}`

        {last_output_info}

        {action_prompt}

        # Here are the steps taken so far:
        `{json.dumps(state['steps'], indent=2)}`

        # Avoid using complex constructs like async functions. Prefer returning a sequence of Playwright commands that can be executed directly, one after another.

        # Return the response strictly in the following JSON format:
        ```json
        {{
            "command": "string",
            "task_completed": false,
            "what_do_you_plan_to_do": "string",
            "output": null
        }}
        ```
    """
    print(f"Prompt: {prompt}")
    try:
        response = model.generate_content(
            contents=prompt,
            generation_config={'response_mime_type': 'application/json'}
        )
        response_text = response.text
        print(f"Raw next step response: {response_text}")
        try:
            response_dict = json.loads(response_text)
            next_command = NextStepCommand(**response_dict)
            print(f"Response: {next_command}")
            return next_command
        except json.JSONDecodeError as e:
            logging.warning(f"JSON parsing failed for next step command: {str(e)}, attempting fallback parsing")
            return extract_next_step_from_text(response_text)
    except Exception as e:
        logging.error(f"Failed to get next step command: {str(e)}", exc_info=True)
        raise

In [17]:
# Close browser
async def close_browser(browser):
    await browser.close()

In [18]:
# Extract task from prompt
async def extract_task_from_prompt(prompt: str):
    user_prompt = f"""
        Extract the URL and the task from the following prompt: {prompt}.
        Return the response strictly as a JSON object with the following structure:
        ```json
        {{
            "url": "string",
            "task": "string",
            "what_do_you_plan_to_do": "string"
        }}
        ```
        Example:
        ```json
        {{
            "url": "https://www.federalreserve.gov/releases/h15/",
            "task": "provide the latest interest rates",
            "what_do_you_plan_to_do": "Obtain the interest rates from the provided URL"
        }}
        ```
    """
    try:
        response = model.generate_content(
            contents=user_prompt,
            generation_config={'response_mime_type': 'application/json'}
        )
        response_text = response.text
        print(f"Raw task extraction response: {response_text}")
        try:
            response_dict = json.loads(response_text)
            extracted_task = ExtractedTask(**response_dict)
            print(f"Extracted Task: {extracted_task}")
            return extracted_task
        except json.JSONDecodeError as e:
            logging.warning(f"JSON parsing failed for task extraction: {str(e)}, attempting fallback parsing")
            return extract_task_from_text(response_text)
    except Exception as e:
        logging.error(f"Failed to extract task: {str(e)}", exc_info=True)
        raise

In [21]:
# Main navigate function
async def navigate(user_prompt: UserPrompt):
    extracted_task = await extract_task_from_prompt(user_prompt.prompt)
    print(f"Extracted task: {extracted_task}")
    browser, page = await init_browser(extracted_task.url)
    print(f"Browser and page initialized")
    state = {"steps": []}
    try:
        print(f"Extracted URL: {extracted_task.url}")
        await navigate_to_url(page, extracted_task.url)
        state['steps'].append({
            "command": f"await page.goto('{extracted_task.url}', timeout=300000)",
            "status": "success",
            "reason": "Initial navigation to the extracted URL"
        })
        clean, high = await capture_initial_screenshot(page)
        state["screenshot"] = high
        state["clean_screenshot"] = clean
        print(f"Performing task: {extracted_task.task}")
        await perform_task(page, extracted_task.task, state)
        print(f"Task completed successfully")
        return {"message": "Task completed successfully", "state": state}
    except Exception as e:
        print(f"Failed to perform task: {str(e)}")
        raise
    finally:
        await close_browser(browser)

In [28]:
# Test the code
async def run_test():
    search_query= "Launchpad.ai"
    prompt = UserPrompt(prompt="Navigate to https://www.wikipedia.org/, fill in 'WWW' and hit Enter.")
    result = await navigate(prompt)
    print(result)

await run_test()

Raw task extraction response: {
    "url": "https://www.wikipedia.org/",
    "task": "fill in 'WWW' and hit Enter",
    "what_do_you_plan_to_do": "Navigate to the URL, input 'WWW', submit the form, and present the resulting page content."
}
Extracted Task: url='https://www.wikipedia.org/' task="fill in 'WWW' and hit Enter" what_do_you_plan_to_do="Navigate to the URL, input 'WWW', submit the form, and present the resulting page content."
Extracted task: url='https://www.wikipedia.org/' task="fill in 'WWW' and hit Enter" what_do_you_plan_to_do="Navigate to the URL, input 'WWW', submit the form, and present the resulting page content."
Chosen User Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36
Browser and page initialized
Extracted URL: https://www.wikipedia.org/
Capturing CLEAN screenshot for step initial
Highlighting elements on the page for step initial
Capturing screenshot for step initial
Screenshot captured for 

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Performing task: fill in 'WWW' and hit Enter
Initial state captured
Requesting next step as command
Prompt: 
        # You are a professional automation quality engineer.
        # The current date and time is: 2025-04-28 18:04:37.
        # You read requirement and write Playwright in Python to automate the test.

        You write one Playwright command at a time, using the asynchronous version of Playwright commands prefixed with 'await'. A screenshot will be provided after executing that command.
        Each element will be highlighted with a black box and its id, class attributes.
        For other elements, prefer text-based selector like await page.click("text='Button text'") to improve the stability of the script.
        Some website uses custom date or number picker, you should trigger the custom picker then click the buttons instead of filling textual value directly.
        You write one Playwright command at a time.

        Please provide a new Playwright script or comma

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

State updated. Next command: await page.type('#searchInput', 'WWW')
Requesting next step as command
Prompt: 
        # You are a professional automation quality engineer.
        # The current date and time is: 2025-04-28 18:04:40.
        # You read requirement and write Playwright in Python to automate the test.

        You write one Playwright command at a time, using the asynchronous version of Playwright commands prefixed with 'await'. A screenshot will be provided after executing that command.
        Each element will be highlighted with a black box and its id, class attributes.
        For other elements, prefer text-based selector like await page.click("text='Button text'") to improve the stability of the script.
        Some website uses custom date or number picker, you should trigger the custom picker then click the buttons instead of filling textual value directly.
        You write one Playwright command at a time.

        Please provide a new Playwright script or comma

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

State updated. Next command: await page.press('#searchInput', 'Enter')
********** Task completed **********
Task completed. Final state: {
  "steps": [
    {
      "command": "await page.goto('https://www.wikipedia.org/', timeout=300000)",
      "status": "success",
      "reason": "Initial navigation to the extracted URL"
    },
    {
      "command": "await page.type('#searchInput', 'WWW')",
      "status": "success",
      "reason": "Type 'WWW' into the search input field.",
      "output": null
    },
    {
      "command": "await page.press('#searchInput', 'Enter')",
      "status": "success",
      "reason": "Press Enter key to submit the search.",
      "output": null
    }
  ]
}
Task completed successfully
{'message': 'Task completed successfully', 'state': {'steps': [{'command': "await page.goto('https://www.wikipedia.org/', timeout=300000)", 'status': 'success', 'reason': 'Initial navigation to the extracted URL'}, {'command': "await page.type('#searchInput', 'WWW')", 'status