In [1]:
# changing the working directory and loading the env variables
import os
from dotenv import load_dotenv

os.chdir(os.path.join(os.getcwd(),os.pardir))
load_dotenv()

True

In [2]:
# setting up langsmith tracing
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_PROJECT'] = 'flipkart-scraping'

In [3]:
from typing import Sequence, Optional, TypedDict, Annotated, Union, Literal
from langchain_core.messages import BaseMessage, SystemMessage, ToolMessage, HumanMessage
from playwright.async_api import Page
import operator
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate, PromptTemplate, MessagesPlaceholder
from langchain_core.prompts.image import ImagePromptTemplate
from langgraph.graph import StateGraph, END

class BBox(TypedDict):
    x: float
    y: float
    text: str
    type: str
    ariaLabel: str

class Prediction(TypedDict):
    action: str
    args: Optional[Sequence[str]]

class AgentState(TypedDict):
    page: Page
    input: str
    img: str
    bboxes: Sequence[BBox]
    prediction: Prediction
    scratchpad: Annotated[Sequence[BaseMessage],operator.add]
    bbox_descriptions: Sequence[str]
    tool_output: str

In [4]:
import asyncio
import platform
from langchain_core.pydantic_v1 import BaseModel, Field

async def click(page: Page, bbox_id:int, bboxes:Sequence[BBox])->str:
    """Used to click on a bounding box on a web page.

    Args:
        page (Page): The page to click on.
        bbox_id (int): The ID of the bounding box to click on.
        bboxes (Sequence[BBox]): The list of bounding boxes on the page.

    Returns:
        str: The message that the click was successful.
    """
    try:
        bbox = bboxes[bbox_id]
    except Exception as e:
        return f"Error occured. {e}"
    x, y = bbox["x"], bbox["y"]
    _ = await page.mouse.click(x, y)
    return {"tool_output": f"Clicked {bbox_id}"}


async def type_text(page: Page, bbox_id:int, type_content:str, bboxes:Sequence[BBox])->str:
    """Used to type text in a web page.

    Args:
        page (Page): The page to type on.
        bbox_id (int): The ID of the bounding box to type on.
        type_content (str): The content to type.
        bboxes (Sequence[BBox]): The list of bounding boxes on the page.

    Returns:
        str: The message that the text was typed successfully.
    """
    bbox = bboxes[bbox_id]
    x, y = bbox["x"], bbox["y"]
    await page.mouse.click(x, y)
    select_all = "Meta+A" if platform.system() == "Darwin" else "Control+A"
    await page.keyboard.press(select_all)
    await page.keyboard.press("Backspace")
    await page.keyboard.type(type_content)
    await page.keyboard.press("Enter")
    return {"tool_output": f"Typed {type_content} and submitted"}


async def scroll(page: Page, target: Union[Literal['WINDOW'],int], direction:Literal['up','down'], bboxes: Sequence[BBox])->str:
    """Used to scroll up or down in a web page.
    If the complete window needs to be scrolled, then the target is 'WINDOW'. Else the target is the ID of the bounding box to scroll in.

    Args:
        page (Page): The page to scroll in.
        target (Union[Literal['WINDOW'],int]): The target to scroll in. 'WINDOW' if the complete window needs to be scrolled, else the ID of the bounding box to scroll in.
        direction (Literal['up','down']): The direction to scroll in.
        bboxes (Sequence[BBox]): The list of bounding boxes on the page.

    Returns:
        str: The message that the scroll was successful.
    """
    if target == "WINDOW":
        scroll_amount = 500
        scroll_direction = (
            -scroll_amount if direction == "up" else scroll_amount
        )
        await page.evaluate(f"window.scrollBy(0, {scroll_direction})")
    else:
        scroll_amount = 200
        target_id = int(target)
        bbox = bboxes[target_id]
        x, y = bbox["x"], bbox["y"]
        scroll_direction = (
            -scroll_amount if direction == "up" else scroll_amount
        )
        await page.mouse.move(x, y)
        await page.mouse.wheel(0, scroll_direction)

    return {"tool_output": f"Scrolled {direction} in {'WINDOW' if target == 'WINDOW' else 'element'} with ID {target}."}


async def wait()->str:
    """Used to wait for 5 seconds. This is generally used to wait for the page to load.

    Returns:
        str: The message that the wait is complete.
    """
    sleep_time = 5
    await asyncio.sleep(sleep_time)
    return {"tool_output": f"Waited for {sleep_time}s."}


async def go_back(page: Page)->str:
    """Used to navigate back a page.

    Args:
        page (Page): The page to navigate back in.

    Returns:
        str: The message that the navigation is complete.
    """
    await page.go_back()
    return {"tool_output": f"Navigated back a page to {page.url}."}


async def to_google(page: Page)->str:
    """Used to navigate to google.com.

    Args:
        page (Page): The page to navigate to google.com in.

    Returns:
        str: The message that the navigation is complete.
    """
    await page.goto("https://www.google.com/")
    return {"tool_output": "Navigated to google.com."}

functions_list = [click, type_text, scroll, wait, go_back, to_google]

class CompleteTask(BaseModel):
    """Call this when the given task is completed."""
    answer: str = Field("The answer when the task is complete. 'COMPLETED' when the task is a navigation task, else it will be the content of the answer which the user requested for.")

In [5]:
# helper functions
from langchain_core.runnables import RunnableLambda, chain, RunnablePassthrough
import base64
import asyncio

with open("mark_page.js") as f:
    mark_page_script = f.read()

@chain
async def mark_page(page):
    await page.evaluate(mark_page_script)
    for _ in range(10):
        try:
            bboxes = await page.evaluate("markPage()")
            break
        except:
            asyncio.sleep(3)
    screenshot = await page.screenshot()
    await page.evaluate("unmarkPage()")
    print(bboxes)
    return {
        "img": base64.b64encode(screenshot).decode(),
        "bboxes": bboxes,
    }

In [6]:
from langchain_openai import ChatOpenAI

@chain
async def annotate(state):
    marked_page = await mark_page.with_retry().ainvoke(state["page"])
    return {**state, **marked_page}


@chain
def format_descriptions(state):
    labels = []
    for i, bbox in enumerate(state["bboxes"]):
        text = bbox.get("ariaLabel","")
        if not text.strip():
            text = bbox["text"]
        el_type = bbox.get("type","")
        labels.append(f'{i} (<{el_type}/>): "{text}"')
    bbox_descriptions = "Valid Bounding Boxes:\n" + "\n".join(labels)
    return {**state, "bbox_descriptions": bbox_descriptions}


with open('system_prompt.txt','r') as f:
    system_prompt = f.read()


prompt = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template(system_prompt),
    HumanMessagePromptTemplate.from_template('{input}'),
    MessagesPlaceholder(variable_name='scratchpad', optional=True),
    HumanMessagePromptTemplate(prompt=[
        ImagePromptTemplate(input_variables=['img'], template={'url': 'data:image/png;base64,{img}'}),
        PromptTemplate(input_variables=['bbox_descriptions'], template='{bbox_descriptions}'),
    ])
])

llm = ChatOpenAI(model="gpt-4o", max_tokens=4096)
# agent = annotate | RunnablePassthrough.assign(
#     prediction = format_descriptions | prompt | llm.bind_functions(functions_list+[CompleteTask])
# )
agent = annotate | format_descriptions | prompt | llm.bind_functions(functions_list+[CompleteTask])

In [7]:
CompleteTask.__name__

'CompleteTask'

In [8]:
@chain
def update_scratchpad(state: AgentState):
    """Update the scratchpad with the latest message."""
    message = state.get('scratchpad')[-1]
    tool_output = state.get('tool_output')
    if message.additional_kwargs:
        tool_call_id = message['additional_kwargs']['tool_calls'][0]['id']
        return_message = ToolMessage(content=tool_output, id=tool_call_id)
    else:
        return_message = HumanMessage(content="No tool was called in your last action. Please recheck your last action.")
    return {**state, "scratchpad": [return_message]}


def agent_router(state: AgentState):
    """Routing function for the agent"""
    message = state.get('scratchpad')[-1]
    if message.additional_kwargs:
        function_name = message.additional_kwargs['function_call']['name']
        if function_name == CompleteTask.__name__:
            return '__end__'
        else:
            return function_name
    else:
        return '__end__'

In [9]:
from langchain_core.runnables import Runnable, RunnableConfig
import json

class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable
    
    async def __call__(self, state: AgentState, config: RunnableConfig):
        message = await self.runnable.ainvoke(state)
        print(message)
        if not message.tool_calls and (
            not message.content
            or isinstance(message.content,list)
            and not message.content[0].get('text')
        ):
            messages = [('user','Respond with a real output or correct tool call')]
            return {**state, 'scratchpad': messages}
        
        else:
            return {**state, 'scratchpad': [message]}

In [10]:
@chain
async def type_text_node(state: AgentState):
    message = state['scratchpad'][-1]
    ai_kwargs = eval(message.additional_kwargs['function_call']['arguments'])
    state_kwargs = {
        'page': state['page'],
        'bboxes': state['bboxes']
    }
    print(state['bboxes'])
    result = await type_text(**ai_kwargs, **state_kwargs)
    return result

@chain
async def click_node(state: AgentState):
    message = state['scratchpad'][-1]
    ai_kwargs = eval(message.additional_kwargs['function_call']['arguments'])
    state_kwargs = {
        'page': state['page'],
        'bboxes': state['bboxes']
    }
    print(state['bboxes'])
    result = await click(**ai_kwargs, **state_kwargs)
    return result

In [11]:
# building the graph

builder = StateGraph(AgentState)


builder.add_node('agent', Assistant(agent))
builder.set_entry_point('agent')


builder.add_node('update_scratchpad', update_scratchpad)
builder.add_edge('update_scratchpad', 'agent')


tools_dict = {
    'click':click_node,
    'type_text':type_text_node,
    'scroll':scroll,
    'wait':wait,
    'go_back':go_back,
    'to_google':to_google
}


for node_name, tool in tools_dict.items():
    builder.add_node(node_name, tool)
    builder.add_edge(node_name, 'update_scratchpad')


builder.add_conditional_edges(
    'agent',
    agent_router,
    {
        '__end__': END,
        'CompleteTask': END,
        'click':'click',
        'type_text':'type_text',
        'scroll':'scroll',
        'wait':'wait',
        'go_back':'go_back',
        'to_google':'to_google',
    }
)

graph = builder.compile()

In [12]:
from IPython import display
from playwright.async_api import async_playwright

browser = await async_playwright().start()
browser = await browser.chromium.launch(headless=False, args=None)
page = await browser.new_page()
_ = await page.goto("https://www.google.com")

async def call_agent(question:str, page:Page, max_steps:int=7):
    event_stream = graph.astream(
        input = {
            'page':page,
            'input':question,
            'scratchpad':[],
        },
        config={
            'recursion_limit':max_steps
        }
    )

    steps = []
    async for event in event_stream:
        print("="*30)
        print(event)
        if "agent" not in event:
            continue
        message = event['agent']['scratchpad'][-1]
        content = message.content
        addition_kwargs = message.additional_kwargs
        
        print(f"AGENT: {content}")
        print(addition_kwargs)

In [13]:
user_input = "What is the price of iphone 15 on flipkart?"
res = await call_agent(user_input, page)

[{'x': 1020.390625, 'y': 30, 'type': 'a', 'text': 'Gmail', 'ariaLabel': 'Gmail '}, {'x': 1073.68359375, 'y': 30, 'type': 'a', 'text': 'Images', 'ariaLabel': 'Search for Images '}, {'x': 1134, 'y': 30, 'type': 'a', 'text': '', 'ariaLabel': 'Google apps'}, {'x': 1214, 'y': 30, 'type': 'span', 'text': 'Sign in', 'ariaLabel': ''}, {'x': 1064, 'y': 166, 'type': 'iframe', 'text': '', 'ariaLabel': ''}, {'x': 640, 'y': 140, 'type': 'img', 'text': '', 'ariaLabel': ''}, {'x': 863, 'y': 196, 'type': 'svg', 'text': '', 'ariaLabel': ''}, {'x': 617.5, 'y': 271.5, 'type': 'textarea', 'text': '', 'ariaLabel': 'Search'}, {'x': 841, 'y': 269, 'type': 'div', 'text': '', 'ariaLabel': ''}, {'x': 863, 'y': 265.5, 'type': 'path', 'text': '', 'ariaLabel': ''}, {'x': 903.0087280273438, 'y': 270, 'type': 'circle', 'text': '', 'ariaLabel': ''}, {'x': 562.80078125, 'y': 341, 'type': 'input', 'text': '', 'ariaLabel': 'Google Search'}, {'x': 709.640625, 'y': 341, 'type': 'input', 'text': '', 'ariaLabel': "I'm Feeli

TypeError: 'NoneType' object is not subscriptable

In [14]:
await page.close()
await browser.close()

# Experiments

In [None]:
# from langchain_openai import ChatOpenAI
# from langchain_core.prompts import ChatPromptTemplate

# def add_two_numbers(a:int, b:int)->int:
#     """Use this to add two numbers"""
#     return a+b

# all_functions = [add_two_numbers]

# prompt = ChatPromptTemplate.from_messages([
#     ('system','You are a helpful assistant. Please help the user with their query. You are not very good with maths. So if you encounter a match problem, remember to call a tool'),
#     ('user','{input}')
# ])

# llm = ChatOpenAI(model='gpt-3.5-turbo')

# agent = prompt | llm.bind_functions(all_functions)

In [None]:
# result = agent.invoke({'input':'What is 2+5? Use a tool to calculate this.'})

In [None]:
# result

In [None]:
# result.additional_kwargs

In [None]:
# if {1:1}:
#     print('hello')

In [None]:
# type(result)

In [None]:
# from langchain_core.messages import AIMessage
# message = AIMessage(content="", additional_kwargs={
#         "tool_calls": [
#             {
#                 "id": "call_oRSmfiNKrJBfCCBP7y0uglry",
#                 "function": {
#                     "arguments": {"objective":"Create a SQL agent who is an expert in writing SQL queries. The agent should be able to capture the context of the problem, ask clarifying questions, and generate SQL queries.","variables":[],"constraints":["Output should not be plain text"],"requirements":["Output must be a SQL query"]},
#                     "name": "PromptInstructions"
#                 },
#                 "type": "function"
#             }
#         ]
#     }, response_metadata={
#         "token_usage": {
#             "completion_tokens": 70,
#             "prompt_tokens": 436,
#             "total_tokens": 506
#         },
#         "model_name": "gpt-3.5-turbo",
#         "system_fingerprint": None,
#         "finish_reason": "tool_calls",
#         "logprobs": None
#     }, id="run-4c15f1fe-6142-4fbc-810f-079e61314ebd-0", tool_calls=[
#         {
#             "name": "PromptInstructions",
#             "args": {
#                 "objective": "Create a SQL agent who is an expert in writing SQL queries. The agent should be able to capture the context of the problem, ask clarifying questions, and generate SQL queries.",
#                 "variables": [],
#                 "constraints": [
#                     "Output should not be plain text"
#                 ],
#                 "requirements": [
#                     "Output must be a SQL query"
#                 ]
#             },
#             "id": "call_oRSmfiNKrJBfCCBP7y0uglry"
#         }
#     ])

In [None]:
# message.tool_calls

[{'name': 'PromptInstructions',
  'args': {'objective': 'Create a SQL agent who is an expert in writing SQL queries. The agent should be able to capture the context of the problem, ask clarifying questions, and generate SQL queries.',
   'variables': [],
   'constraints': ['Output should not be plain text'],
   'requirements': ['Output must be a SQL query']},
  'id': 'call_oRSmfiNKrJBfCCBP7y0uglry'}]

In [None]:
# message.additional_kwargs['tool_calls']

[{'id': 'call_oRSmfiNKrJBfCCBP7y0uglry',
  'function': {'arguments': {'objective': 'Create a SQL agent who is an expert in writing SQL queries. The agent should be able to capture the context of the problem, ask clarifying questions, and generate SQL queries.',
    'variables': [],
    'constraints': ['Output should not be plain text'],
    'requirements': ['Output must be a SQL query']},
   'name': 'PromptInstructions'},
  'type': 'function'}]