In [None]:
%%capture
!pip install langchain_openai

In [None]:
from langchain_openai import AzureChatOpenAI
from google.colab import userdata
import os

os.environ["AZURE_OPENAI_API_KEY"] = userdata.get('AZURE_OPENAI_API_KEY')

llm = AzureChatOpenAI(
    azure_endpoint="https://obelmoal98-openai.openai.azure.com/",
    deployment_name="gpt-4o-tfm",
    model_name="gpt-4o",
    api_version="2024-12-01-preview",
    temperature=0,
    max_tokens=None,
)

In [None]:
%%capture
pip install langgraph

In [None]:
%%capture
%pip install playwright

In [None]:
%%capture
!playwright install

In [None]:
from playwright.async_api import async_playwright
import asyncio

In [None]:
import requests
import base64
from PIL import Image
import io
import matplotlib.pyplot as plt

url = "https://5678-34-126-86-99.ngrok-free.app/process_image" #URL should be changed each execution

In [None]:
import base64
from langchain.prompts import (
    PromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
    MessagesPlaceholder,
    ChatPromptTemplate,
)
from langchain_core.messages import SystemMessage, HumanMessage
from typing import List, Optional


def create_multimodal_agent() -> ChatPromptTemplate:
    """Crea un agente multimodal para navegación web."""

    # Plantilla de mensaje del sistema
    system_message_prompt = SystemMessagePromptTemplate(
        prompt=[
            PromptTemplate(
                input_variables=[],
                template="""Imagine you are a robot browsing the web, just like humans. Now you need to complete a task.
                In each iteration, you will receive an Observation that includes a screenshot of a webpage and some texts.
                This screenshot will feature Numerical Labels placed in the TOP LEFT corner of each Web Element.
                Carefully analyze the visual information to identify the Numerical Label corresponding to the Web Element that requires interaction,
                then follow the guidelines and choose one of the following actions:

                1. Click a Web Element.
                2. Delete existing content in a textbox and then type content.
                3. Scroll up or down.
                4. Wait.
                5. Go back.
                6. Return to google to start over.
                7. Respond with the final answer if the task is accomplished.

                Correspondingly, Action should STRICTLY follow the format:

                1. Click [Numerical_Label]
                2. Type [Numerical_Label]; [Content]
                3. Scroll [Numerical_Label or WINDOW]; [up or down]
                4. Wait
                5. GoBack
                6. Google
                7. ANSWER; [content]

                Key Guidelines You MUST follow:

                * Action guidelines *
                1) Execute only one action per iteration.
                2) When clicking or typing, ensure to select the correct bounding box.
                3) Numeric labels lie in the top-left corner of their corresponding bounding boxes and are colored the same.

                * Web Browsing Guidelines *
                1) Don't interact with useless web elements like Login, Sign-in, donation that appear in Webpages
                2) Select strategically to minimize time wasted.

                Your reply should strictly follow the format:

                Thought: {{Your brief thoughts (briefly summarize the info that will help ANSWER)}}
                Action: {{One Action format you choose}}
                Then the User will provide:
                Observation: {{A labeled screenshot Given by User}}
                """
            )
        ]
    )

    # Placeholder for chat history
    scratchpad_placeholder = MessagesPlaceholder(variable_name="scratchpad", optional=True)

    # Human message with multimodal input
    human_message_prompt = HumanMessagePromptTemplate.from_template(
        [
            {"type": "image_url", "image_url": "data:image/png;base64,{img}"},
            {"type": "text", "text": "{bbox_descriptions}"},
            {"type": "text", "text": "{input}"},
        ]
    )

    # Final ChatPromptTemplate
    prompt = ChatPromptTemplate.from_messages(
        messages=[
            system_message_prompt,
            scratchpad_placeholder,
            human_message_prompt,
        ]
    )


    return prompt


prompt = create_multimodal_agent()

In [None]:
from typing import List, Optional, Any, Tuple
from typing_extensions import TypedDict

from langchain_core.messages import BaseMessage, SystemMessage
from playwright.async_api import async_playwright, Page

class Prediction(TypedDict):
    action: str
    args: Optional[List[Any]]

class BBox(TypedDict):
    x: float
    y: float
    interactivity: bool
    content: str

class AgentState(TypedDict):
    page: Page
    input: str
    img: str
    bboxes: List[BBox]
    prediction: Prediction
    scratchpad: List[BaseMessage]
    observation: str

In [None]:
import asyncio
import platform


async def click(state: AgentState):
    page = state["page"]
    click_args = state["prediction"]["args"]
    if click_args is None or len(click_args) != 1:
        return f"Failed to click bounding box labeled as number {click_args}"
    bbox_id = click_args[0]
    bbox_id = int(bbox_id)
    try:
        bbox = state["bboxes"][bbox_id]
        content = state["bboxes"][bbox_id]["content"]
    except Exception:
        return f"Error: no bbox for : {bbox_id}"
    viewport_size = await page.evaluate('''() => ({
        width: window.innerWidth,
        height: window.innerHeight
    })''')

    x = viewport_size['width'] * bbox["x"]
    y = viewport_size['height'] * bbox["y"]

    await page.mouse.click(x, y)
    return f"Clicked {bbox_id} with content: {content}"


async def type_text(state: AgentState):
    page = state["page"]
    type_args = state["prediction"]["args"]
    if type_args is None or len(type_args) != 2:
        return (
            f"Failed to type in element from bounding box labeled as number {type_args}"
        )
    bbox_id = type_args[0]
    bbox_id = int(bbox_id)
    bbox = state["bboxes"][bbox_id]
    # x, y = bbox["x"], bbox["y"]
    text_content = type_args[1]
    viewport_size = await page.evaluate('''() => ({
        width: window.innerWidth,
        height: window.innerHeight
    })''')

    x = viewport_size['width'] * bbox["x"]
    y = viewport_size['height'] * bbox["y"]
    await page.mouse.click(x, y)
    await page.keyboard.type(text_content)
    await page.keyboard.press("Enter")
    return f"Typed {text_content} and submitted"


async def scroll(state: AgentState):
    page = state["page"]
    scroll_args = state["prediction"]["args"]
    if scroll_args is None or len(scroll_args) != 2:
        return "Failed to scroll due to incorrect arguments."

    target, direction = scroll_args

    if target.upper() == "WINDOW":
        scroll_amount = 500
        scroll_direction = (
            -scroll_amount if direction.lower() == "up" else scroll_amount
        )
        await page.evaluate(f"window.scrollBy(0, {scroll_direction})")
    else:
        scroll_amount = 200
        target_id = int(target)
        bbox = state["bboxes"][target_id]
        x, y = bbox["x"], bbox["y"]
        scroll_direction = (
            -scroll_amount if direction.lower() == "up" else scroll_amount
        )
        await page.mouse.move(x, y)
        await page.mouse.wheel(0, scroll_direction)

    return f"Scrolled {direction} in {'window' if target.upper() == 'WINDOW' else 'element'}"


async def wait(state: AgentState):
    sleep_time = 5
    await asyncio.sleep(sleep_time)
    return f"Waited for {sleep_time}s."


async def go_back(state: AgentState):
    page = state["page"]
    await page.go_back()
    return f"Navigated back a page to {page.url}."


async def to_google(state: AgentState):
    page = state["page"]
    await page.goto("https://www.google.com/")
    return "Navigated to google.com."


from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

async def annotate(state):
    screenshot = await state["page"].screenshot(type="jpeg")
    file = io.BytesIO(screenshot)

    files = {
        "file": ("screenshot.jpg", file, "image/jpeg")
    }

    response = requests.post(url, files=files)
    data = response.json()

    return {
        **state,
        "img": data["img"],
        "bboxes": data["bboxes"]
    }

def format_descriptions(state):
    labels = []
    for i, bbox in enumerate(state["bboxes"]):
        text = bbox["content"]
        el_type = bbox.get("interactivity")
        labels.append(f'{i} (<{el_type}/>): "{text}"')
    bbox_descriptions = "\nValid Bounding Boxes:\n" + "\n".join(labels)
    return {**state, "bbox_descriptions": bbox_descriptions}

def parse(text: str) -> dict:
    action_prefix = "Action: "
    if not text.strip().split("\n")[-1].startswith(action_prefix):
        return {"action": "retry", "args": f"Could not parse LLM Output: {text}"}
    action_block = text.strip().split("\n")[-1]

    action_str = action_block[len(action_prefix) :]
    split_output = action_str.split(" ", 1)
    if len(split_output) == 1:
        action, action_input = split_output[0], None
    else:
        action, action_input = split_output
    action = action.strip()
    if action_input is not None:
        action_input = [
            inp.strip().strip("[]") for inp in action_input.strip().split(";")
        ]
    return {"action": action, "args": action_input}


agent = annotate | RunnablePassthrough.assign(
    prediction=format_descriptions | prompt | llm | StrOutputParser() | parse
)

import re


def update_scratchpad(state: AgentState):
    """After a tool is invoked, we want to update
    the scratchpad so the agent is aware of its previous steps"""
    old = state.get("scratchpad")
    if old:
        txt = old[0].content
        last_line = txt.rsplit("\n", 1)[-1]
        step = int(re.match(r"\d+", last_line).group()) + 1
    else:
        txt = "Previous action observations:\n"
        step = 1
    txt += f"\n{step}. {state['observation']}"
    # print(txt)
    return {**state, "scratchpad": [SystemMessage(content=txt)]}


from langchain_core.runnables import RunnableLambda

from langgraph.graph import END, START, StateGraph

graph_builder = StateGraph(AgentState)


graph_builder.add_node("agent", agent)
graph_builder.add_edge(START, "agent")

graph_builder.add_node("update_scratchpad", update_scratchpad)
graph_builder.add_edge("update_scratchpad", "agent")

tools = {
    "Click": click,
    "Type": type_text,
    "Scroll": scroll,
    "Wait": wait,
    "GoBack": go_back,
    "Google": to_google,
}


for node_name, tool in tools.items():
    graph_builder.add_node(
        node_name,
        RunnableLambda(tool) | (lambda observation: {"observation": observation}),
    )
    graph_builder.add_edge(node_name, "update_scratchpad")


def select_tool(state: AgentState):
    action = state["prediction"]["action"]
    if action == "ANSWER":
        return END
    if action == "retry":
        return "agent"
    return action


graph_builder.add_conditional_edges("agent", select_tool)

graph = graph_builder.compile()

from IPython import display
from playwright.async_api import async_playwright

browser = await async_playwright().start()
browser = await browser.chromium.launch(headless=True, args=None)
page = await browser.new_page()
_ = await page.goto("https://docs.google.com/forms/d/e/1FAIpQLScMx25lNn7XhLWwCK6mSTs6Q8qJwNrNRl5JhPI6j07uiMhWjg/viewform?usp=header")


async def call_agent(question: str, page, max_steps: int = 150):
    event_stream = graph.astream(
        {
            "page": page,
            "input": question,
            "scratchpad": [],
        },
        {
            "recursion_limit": max_steps,
        },
    )
    final_answer = None
    steps = []
    async for event in event_stream:
        if "agent" not in event:
            continue
        pred = event["agent"].get("prediction") or {}
        action = pred.get("action")
        action_input = pred.get("args")
        display.clear_output(wait=False)
        steps.append(f"{len(steps) + 1}. {action}: {action_input}")
        print("\n".join(steps))
        display.display(display.Image(base64.b64decode(event["agent"]["img"])))
        if "ANSWER" in action:
            final_answer = action_input[0]
            break
    return final_answer

res = await call_agent("Fill only one form with: Name = Example, Surname = final, Country = Spain, Verification = I'm not human, Able to change page = Yes , then Submit the form. Note: USE SCROLL if you cannot see all the values or to find the next page or submit button. Once submitted, send confirm as ANSWER", page)
print(f"Final response: {res}")