Send a request to the model
Send a request to create a Response with the computer-use-preview model equipped with the computer_use_preview tool. This request should include details about your environment, along with an initial input prompt.

If you want to show a summary of the reasoning performed by the model, you can include the summary parameter in the request. This can be helpful if you want to debug or show what's happening behind the scenes in your interface. The summary can either be concise or detailed.

Optionally, you can include a screenshot of the initial state of the environment.

In [ ]:
# install deps
%pip install -q -U anthropic python-dotenv nest_asyncio PyPDF2
# API key removed for security

In [ ]:
# env setup
from anthropic import Anthropic
from dotenv import load_dotenv
import os

# Load API Keys from environment variables
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
# API keys should be stored in .env file, not in code
os.environ['OPENAI_API_KEY'] = os.getenv("OPENAI_API_KEY", "")
os.environ['ANTHROPIC_API_KEY'] = os.getenv("ANTHROPIC_API_KEY", "")

# api key must be in .env file in project
load_dotenv()
if os.getenv("ANTHROPIC_API_KEY") is None:
    raise ValueError("ANTHROPIC_API_KEY not found in .env file")

client = Anthropic()

In [1]:
import sys 
import os

# Check if the repo already exists
if not os.path.exists('/tmp/anthropic-quickstarts'):
    # Clone the agents quickstart implementation
    !git clone https://github.com/anthropics/anthropic-quickstarts.git /tmp/anthropic-quickstarts
else:
    print("Repository already exists at /tmp/anthropic-quickstarts")

# IMPORTANT: Insert at the beginning of sys.path to override any existing 'agents' modules
if '/tmp/anthropic-quickstarts' not in sys.path:
    sys.path.insert(0, '/tmp/anthropic-quickstarts')

# Clear any cached imports of 'agents' module
if 'agents' in sys.modules:
    del sys.modules['agents']
if 'agents.agent' in sys.modules:
    del sys.modules['agents.agent']

Cloning into '/tmp/anthropic-quickstarts'...
remote: Enumerating objects: 542, done.[K
remote: Counting objects: 100% (307/307), done.[K
remote: Compressing objects: 100% (171/171), done.[K
remote: Total 542 (delta 220), reused 136 (delta 136), pack-reused 235 (from 3)[K
Receiving objects: 100% (542/542), 3.14 MiB | 8.28 MiB/s, done.
Resolving deltas: 100% (258/258), done.


In [2]:
import nest_asyncio
nest_asyncio.apply()

from agents.agent import Agent

agent = Agent(
    name="MyAgent",
    system="You are an extremely cynical, snarky, and quick-witted customer support agent. Provide short responses to user queries.",
)

response = agent.run("I'm having issues with my laptop. Can you help me?")
print(response.content[0].text)

TypeError: unsupported operand type(s) for |: 'type' and 'types.GenericAlias'

In [None]:
# SIMPLE MEMORY TOOL
from agents.tools.base import Tool

class SimpleMemory(Tool):
    """String-based memory tool for storing and modifying persistent text.

    This tool maintains a single in-memory string that can be read,
    replaced, or selectively edited using string replacement. It provides safety
    warnings when overwriting content or when edit operations would affect
    multiple occurrences.
    """

    name = "simple_memory"

    #TODO: Provide additional domain context to guide Claude on the types of items that should be stored
    description = """Tool for managing persistent text memory with read, write and edit operations.
        Read: Retrieves full memory contents as a string
        Write: Replaces entire memory (warns when overwriting existing data)
        Edit: Performs targeted string replacement (warns on multiple matches)"""

    # single tool that exposes 3 distinct abilities
    input_schema = {
        "type": "object",
        "properties": {
            "action": {
                "type": "string",
                "enum": ["read", "write", "edit"],
                "description": "The memory operation to perform: read retrieves current content, write replaces everything, edit performs string replacement",
            },
            "content": {
                "type": "string",
                "description": "Full text content to store when using write action (ignored for read/edit)",
            },
            "old_string": {
                "type": "string",
                "description": "Exact text to find and replace when using edit action (must be unique in memory)",
            },
            "new_string": {
                "type": "string",
                "description": "Replacement text to insert when using edit action",
            },
        },
        "required": ["action"],
    }

    def __init__(self):
        self.full_memory = ""
        self.compressed_memory = "" # not doing anything with this for now
        
    async def execute(self, **kwargs) -> str:
        """Execute the memory tool with provided parameters."""
        action = kwargs.get("action")
        content = kwargs.get("content", "")
        old_string = kwargs.get("old_string", "")
        new_string = kwargs.get("new_string", "")

        if action == "read":
            return self._read_memory()
        elif action == "write":
            print("Writing to memory...")
            return self._write_memory(content)
        elif action == "edit":
            return self._edit_memory(old_string, new_string)
        else:
            return f"Error: Unknown action '{action}'. Valid actions are read, write, edit."

    def _read_memory(self) -> str:
        """Read the current memory contents."""
        return self.full_memory

    def _write_memory(self, content: str) -> str:
        """Replace the entire memory with new content."""
        if self.full_memory:
            previous = self.full_memory
            self.full_memory = content
            return f"Warning: Overwriting existing content. Previous content was:\n{previous}\n\nMemory has been updated successfully."
        self.full_memory = content
        return "Memory updated successfully."

    def _edit_memory(self, old_string: str, new_string: str) -> str:
        """Replace occurrences of old string with new string."""
        if old_string not in self.full_memory:
            return f"Error: '{old_string}' not found in memory."

        old_memory = self.full_memory
        count = old_memory.count(old_string)

        if count > 1:
            return f"Warning: Found {count} occurrences of '{old_string}'. Please confirm which occurrence to replace or use more specific context."

        self.full_memory = self.full_memory.replace(old_string, new_string)
        return f"Edited memory: 1 occurrence replaced."

    def __str__(self) -> str:
        return self.full_memory

In [None]:
# COMPACTIFY MEMORY TOOL
from agents.utils.history_util import MessageHistory

class CompactifyMemory(Tool):
    """Memory summarization tool.
    
    Summarizes and replaces the existing message history.
    Expects to have access to a message_history object that is shared with the request handler.
    Descriptions should be modified to introduce use-case specific guidance.
    """
    
    name = "compactify_memory"
    description = """The memory compactifier tool will compress the current conversation history (replaces message history entirely). 
    Should be used when there is sufficient information that requires summarization.
    The summary should keep relevant information from any previous summaries.
    """

    input_schema = {
        "type": "object",
        "properties": {},
        "required": []
    }
        
    def __init__(self, client: Anthropic):
        self.client = client
        self.full_memory = ''
        self.compressed_memory = '' # not doing anything with this for now

    def run_compactify (self, message_history: MessageHistory):
        summary = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens = 10000, # modify as needed
            messages=[*message_history.messages, {
                "role": "user",
                "content": """Your task is to summarize the conversation using the previous summary as well as the messages since the last summary. Note that this will replace the previous summary entirely, so be sure to include the most relevant information that should be persisted."""
            }]
        )

        # modify the message history object in place
        message_history.messages = [
            {
                "role": "assistant",
                "content": "Conversation Summary: " +  summary.content[0].text
            }
        ]
        
    async def execute(self, **kwargs) -> str:
        # ATTN: note that we're breaking tool encapsulation here and will be executing the function outside the agent loop (see agents.agent.py)
        # we do this because we don't have an elegant way to share message state between the agent and tool just yet (...stay tuned)
        return "pending_compactify"
        
    def __str__(self):
        return self.full_memory
        

In [None]:
import json
import re

# HELPER FUNCTION: Parse markdown string for JSON
def parse_markdown_json(markdown_string):
    """
    Parses a JSON string from a Markdown string.

    Args:
        markdown_string (str): The Markdown string containing JSON.

    Returns:
        dict or list or None: A Python object representing the parsed JSON, or None if parsing fails.
    """
    match = re.search(r"```(?:json)?\n(.*?)\n```", markdown_string, re.DOTALL)
    if match:
        json_string = match.group(1).strip()
    else:
        json_string = markdown_string.strip()
    try:
        parsed_json = json.loads(json_string)
        return parsed_json
    except json.JSONDecodeError:
        return None

# HELPER CLASS: Memory Node
class MemoryNode:
    def __init__(self, name, is_directory=False, parent=None, content=None):
        self.name = name
        self.is_directory = is_directory
        self.parent = parent
        self.content = content if not is_directory else None
        self.children = {} if is_directory else None
    
    def add_child(self, name, is_directory=False, content=None):
        """Add a child node to the current node."""
        if not self.is_directory:
            raise ValueError(f"Cannot add child to file '{self.name}'")
        
        if name in self.children:
            raise ValueError(f"Child '{name}' already exists")
        
        child = MemoryNode(name, is_directory, parent=self, content=content)
        self.children[name] = child
        return child
    
    def remove_child(self, name):
        """Remove a child node from the current node."""
        if not self.is_directory:
            raise ValueError(f"Cannot remove child from file '{self.name}'")
            
        if name not in self.children:
            raise ValueError(f"Child '{name}' not found")
        
        del self.children[name]
    
    def find(self, path):
        """Find a node by path (ex: 'folder1/folder2/file.txt')."""
        if not path:
            return self
        
        parts = path.strip('/').split('/', 1)
        child_name = parts[0]
        
        if not self.is_directory or child_name not in self.children:
            return None
            
        child = self.children[child_name]
        
        if len(parts) == 1:
            return child
        else:
            return child.find(parts[1])
    
    def __repr__(self):
        return f"MemoryNode(name='{self.name}', is_directory={self.is_directory})"

# HELPER CLASS: Memory Tree
class MemoryTree:
    def __init__(self):
        self.root = MemoryNode("memory", is_directory=True)

    def add(self, path, content):
        """Add content to a node at the given path (ex: 'folder1/folder2/file.txt')."""
        node = self.root.find(path)
        if node:
            node.content = content
        else:
            raise ValueError(f"Path '{path}' not found")

    def get(self, path):
        """Get content from a node at the given path."""
        node = self.root.find(path)
        if node:
            return node.content
        else:
            raise ValueError(f"Path '{path}' not found")

    def edit(self, path, content):
        node = self.root.find(path)
        if node:
            node.content = content
        else:
            raise ValueError(f"Path '{path}' not found")

    def _build_from_json_recursive(self, json_obj, parent_node):
        """Recursively build the tree from a JSON object."""

        # handle root memory (already initialized)
        if len(json_obj) == 1 and 'memory' in json_obj:
            json_obj = json_obj['memory']

        for name, value in json_obj.items():
            if isinstance(value, dict):
                # Create a directory node
                child_node = parent_node.add_child(name, is_directory=True)
                self._build_from_json_recursive(value, child_node)
            else:
                # Create a file node with content
                parent_node.add_child(name, content=value)

    def build_from_json_string(self, str_json_obj):
        json_obj = parse_markdown_json(str_json_obj)
        self._build_from_json_recursive(json_obj, self.root)

    def print_tree(self, node=None, prefix=''):
        """Print a directory tree structure."""
        if node is None:
            node = self.root
        
        # Build list of children for proper indexing
        children = list(node.children.items()) if node.is_directory else []
        
        for index, (name, child) in enumerate(children):
            is_last = index == len(children) - 1
            
            # Create the appropriate connector
            if prefix == '' and node == self.root:
                # For root level items (direct children of root)
                connector = '└── ' if is_last else '├── '
                self.lines.append(f"{connector}{name}")
                
                # Recurse if this is a directory
                if child.is_directory:
                    extension = '    ' if is_last else '│   '
                    self.print_tree(child, extension)
            else:
                # For non-root level items
                connector = '└── ' if is_last else '├── '
                self.lines.append(f"{prefix}{connector}{name}")
                
                # Recurse if this is a directory
                if child.is_directory:
                    extension = '    ' if is_last else '│   '
                    self.print_tree(child, prefix + extension)

    def get_tree(self):
        """Return the tree as a string."""
        self.lines = []
        
        # Start with the root directory name
        self.lines.append(self.root.name)

        # Print the rest of the tree
        self.print_tree()
        return '\n'.join(self.lines)

    def __str__(self):
        return self.get_tree()

    def __repr__(self):
        return str(self)

In [None]:
import requests
import mimetypes

# HELPER CLASS FOR FILE STORAGE using the new files API!
class StorageManager:
    def __init__(self, api_key):
        if api_key is None:
            raise ValueError("ANTHROPIC_API_KEY not available.")
        self.api_key = api_key
        self.base_url = "https://api.anthropic.com/v1/files"
        self.headers = {
            "x-api-key": self.api_key,
            "anthropic-version": "2023-06-01",
            "anthropic-beta": "files-api-2025-04-14"
        }

    def _execute_request(self, method, endpoint, data=None, files=None):
        """Execute a request to the API."""
        url = f"{self.base_url}/{endpoint}"

        res = requests.request(method, url, headers=self.headers, data=data, files=files)
        if res.status_code == 200:
            return res.json()
        else:
            raise ValueError(f"Request failed: {res.status_code} - {res.text}")

    def list_files(self):
        """List all files. Direct curl request to the API."""
        res = requests.get(
            self.base_url,
            headers=self.headers
        )
        if res.status_code != 200:
            raise ValueError(f"Failed to retrieve files: {res.status_code} - {res.text}")
        res = res.json()
        return res['data']
        
        
    def get_file_metadata(self, file_id):
        """Get a file by ID. Direct curl request to the API."""
        res = requests.get(
            f"{self.base_url}/{file_id}",
            headers=self.headers
        )
        if res.status_code != 200:
            raise ValueError(f"Failed to retrieve file: {res.status_code} - {res.text}")
        res = res.json()
        return res 
        
    def upload_file(self, file_path):
        """Upload a file to the API."""        
        # Determine the file's MIME type
        mime_type, _ = mimetypes.guess_type(file_path)
        if mime_type is None:
            mime_type = "application/octet-stream"  # Fallback to binary if type unknown
        
        with open(file_path, "rb") as file_obj:
            files = {
                "file": (os.path.basename(file_path), file_obj, mime_type)
            }
            
            res = requests.post(
                self.base_url,
                headers=self.headers,
                files=files
            )
            
        if res.status_code == 200:
            return res.json()
        else:
            raise ValueError(f"Failed to upload file: {res.status_code} - {res.text}")
        
# example usage
#file_path = "/Users/user/Downloads/SB1029-ProjectUpdate-FINAL_020317-A11Y.pdf" # REPLACE
storage_manager = StorageManager(os.getenv("ANTHROPIC_API_KEY"))
#uploaded = storage_manager.upload_file(file_path)
#storage_manager.get_file_metadata(uploaded['id'])
storage_manager.list_files()[:2]

In [None]:
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import datetime
import textwrap
from typing import List
from anthropic import Anthropic

memory_tools = [
    SimpleMemory(),
    CompactifyMemory(client),
    FileBasedMemoryTool()
]

def process_memory_function(agent, tool):
    """Because some memory tools work with the agents message history object"""
    mem_tool_names = [tool.name for tool in memory_tools]
    for tool in agent.tools:
        if tool.name in mem_tool_names:
            # ATTN: bit of a hack, but we need to inject some additional functionality
            if tool.name == 'compactify_memory':
                tool.run_compactify(self.agent.message_history)
        

class ChatInterface:
    def __init__(self, agent: Agent, max_line_length=80):
        self.max_line_length = max_line_length
        self.agent = agent
        self.messages = [] # managing the window's messages separately from the Agent's messages
        self.memory = ''

        # Chat history container
        self.chat_output = widgets.Output(layout=widgets.Layout(
            height='400px', 
            overflow='auto',
            border='1px solid #ccc',
            padding='10px',
            display='flex',
            flex_flow='wrap-reverse'
        ))
        
        # Text input for new messages
        self.text_input = widgets.Text(
            placeholder='Type your message here...',
            layout=widgets.Layout(width='100%')
        )
        
        # Send button
        self.send_button = widgets.Button(
            description='Send',
            button_style='primary'
        )
        
        # Memory settings display
        self.memory_display = widgets.Output(layout=widgets.Layout(
            width='100%', 
            height='400px',
            border='1px solid #ccc',
            padding ='10px',

        ))  
        
        # Input container (text input + send button)
        input_box = widgets.HBox([
            self.text_input,
            self.send_button
        ], layout=widgets.Layout(width='100%'))
        
        # Left panel (chat)
        left_panel = widgets.VBox([
            widgets.Label('Chat'),
            self.chat_output,
            input_box
        ], layout=widgets.Layout(
            width='50%',
            padding='10px',
        ))
        
        # Right panel (memory settings)
        right_panel = widgets.VBox([
            widgets.Label('Memory'),
            self.memory_display
        ], layout=widgets.Layout(
            width='50%',
            padding='10px'
        ))
        
        # Main layout
        self.interface = widgets.HBox([
            left_panel,
            right_panel
        ], layout=widgets.Layout(
            width='100%',
            display='flex'
        ))
        
        # Event handlers
        self.send_button.on_click(self.on_send)
        self.text_input.on_submit(self.on_send)
        
        # Message history
        self.messages = []
    
    def on_send(self, _):
        """Handle sending a message"""
        message = self.text_input.value.strip()
        if message:
            self.add_message("user", message)
            self.text_input.value = ""

            # call the agent with the message
            response = self.agent.run(message)
            self.add_message("assistant", response.content[0].text)

            ## PROCESS

            self.update_memory_display()
    
    def wrap_text(self, text):
        """Wrap text to fit within max_line_length"""
        # Use textwrap to wrap long lines
        wrapped_lines = []
        for line in text.split('\n'):
            if len(line) > self.max_line_length:
                # Wrap this line
                wrapped = textwrap.fill(line, width=self.max_line_length)
                wrapped_lines.append(wrapped)
            else:
                wrapped_lines.append(line)
        return '\n'.join(wrapped_lines)
    
    def add_message(self, role, message):
        """Add a message to the chat history with text wrapping"""
        timestamp = datetime.datetime.now().strftime("%H:%M:%S")
        # Wrap the message text
        wrapped_message = self.wrap_text(message)
        
        self.messages.append({
            "role": role,
            "content": message,  # Store original message
            "wrapped_message": wrapped_message,  # Store wrapped version
            "timestamp": timestamp
        })
        
        with self.chat_output:
            clear_output()
            # Display all messages with HTML formatting
            for msg in self.messages:
                if msg['role'] == 'user':
                    color = '#0066cc'
                else:
                    color = '#000000'
                    
                display(HTML(
                    f"<div style='margin-bottom: 10px; color: {color};'>"
                    f"<strong>{msg['role']} [{msg['timestamp']}]:</strong> "
                    f"{msg['wrapped_message']}"
                    f"</div>"
                ))
            
    def update_memory_display(self):
        """Update the memory display with current memory content"""
        with self.memory_display:
            clear_output()
            display(HTML(f"<pre style='margin: 10px; padding: 0; white-space: pre-wrap;'>{self.memory}</pre>"))
    
    def display(self):
        """Display the interface"""
        return self.interface

In [None]:
	
memory_tool = FileBasedMemoryTool() # or SimpleMemory() or CompactifyMemory(client) or FileBasedMemoryTool(storage_manager)
model_config = {
    "model": "claude-sonnet-4-20250514",
}
agent = Agent(
    name="Assistant",
    system="You are a helpful assistant designed to work with a user.", # additional memory instructions can be added here
    tools=[memory_tool],
    config=model_config,
)

chat = ChatInterface(
    agent=agent,
)

chat.display()

In [None]:
# FILE BASED MEMORY TOOL

class FileBasedMemoryTool(Tool):
    """
    Manage memory as a nested file system. This is specifically designed around the new files API.

    This tool provides a simple interace for interacting with this memory system.
    We have only defined three actions: GET, EDIT, and BUILD. In practice, you likely would opt for a more opinionated file structure 
    and more fine-grained control over access to the memory. We will rely on the default message truncation mechanism of the request handler.
    """

    name = 'hierarchical_memory'
    description = 'Interact with file system for storing memories, retrieving memories, and rebuilding the memory state.'
    input_schema = {
        'type': 'object',
        'properties': {
            'action': {
                'type': 'string',
                'enum': ['get', 'edit', 'build']
            },
            'paths': {
                'type': 'array',
                'items': {
                    'type': 'string',
                    'description': 'Path to the memory item'
                },
                'description': 'List of paths for the associated action. Available with GET and EDIT actions. (GET can have multiple paths, EDIT should have one path)'
            },
            'content': {
                'type': 'string',
                'description': 'Content that will be written to the specified path. Only available with the EDIT action.'
            },
            'new_memory_object': {
                'type': 'object',
                'description': 'Full memory output object to rebuild the memory scaffold. Only available with the BUILD action. This should be a JSON object representing the desired tree structure for memories. The values should be None (as a placeholder for future content).'
            }
        },
        'required': ['action']
    }
    
    def __init__(self, storage_manager: StorageManager):
        self.full_memory = MemoryTree()
        self.compressed_memory = self.full_memory # including the compressed memory for standardizing the interface
        self.storage_manager = storage_manager

    async def execute(self, **kwargs) -> str:
        action = kwargs.get('action')
        paths = kwargs.get('paths')
        content = kwargs.get('content')
        new_memory_object = kwargs.get('new_memory_object')

        if action == 'get':
            # we need to build the file messages from the file metadata (https://docs.anthropic.com/en/docs/docs/build-with-claude/files)
            message_refs = [{"type": "document", "source": { "type": "file", "file_id": self.full_memory.get(path)}} for path in paths]
            return message_refs

        elif action == 'edit':
            path = paths[0]

            #create txt file in tmp dir with content
            with open(f'/tmp/{path}.txt', 'w') as f:
                f.write(content)

            # upload the file to the API
            uploaded = self.storage_manager.upload_file(f'/tmp/{path}.txt')

            # add the file to the memory tree (using the id)
            self.full_memory.edit(path, uploaded['id'])
            return 'Updated'
        
        elif action == 'build':
            self.full_memory.build_from_json_string(new_memory_object)
            return 'Updated'
        
        else:
            raise ValueError(f"Invalid action: {action}")
        
    def __str__(self):
        return str(self.memory)

In [ ]:
from openai import OpenAI
client = OpenAI()  # API key should be in environment variable

response = client.responses.create(
    model="computer-use-preview",
    tools=[{
        "type": "computer_use_preview",
        "display_width": 1024,
        "display_height": 768,
        "environment": "browser" # other possible values: "mac", "windows", "ubuntu"
    }],    
    input=[
        {
          "role": "user",
          "content": [
            {
              "type": "text",
              "input_text": "Check the latest OpenAI news on bing.com."
            }
            # Optional: include a screenshot of the initial state of the environment
            # {
            #     type: "input_image",
            #     image_url: f"data:image/png;base64,{screenshot_base64}"
            # }
          ]
        }
    ],
    reasoning={
        "summary": "concise",
    },
    truncation="auto"
)

print(response.output)

2. Receive a suggested action
The model returns an output that contains either a computer_call item, just text, or other tool calls, depending on the state of the conversation.

Examples of computer_call items are a click, a scroll, a key press, or any other event defined in the API reference. In our example, the item is a click action:

3. Execute the action in your environment
Execute the corresponding actions on your computer or browser. How you map a computer call to actions through code depends on your environment. This code shows example implementations for the most common computer actions.


Playwright

Docker

In [6]:
def handle_model_action(page, action):
    """
    Given a computer action (e.g., click, double_click, scroll, etc.),
    execute the corresponding operation on the Playwright page.
    """
    action_type = action.type
    
    try:
        match action_type:

            case "click":
                x, y = action.x, action.y
                button = action.button
                print(f"Action: click at ({x}, {y}) with button '{button}'")
                # Not handling things like middle click, etc.
                if button != "left" and button != "right":
                    button = "left"
                page.mouse.click(x, y, button=button)

            case "scroll":
                x, y = action.x, action.y
                scroll_x, scroll_y = action.scroll_x, action.scroll_y
                print(f"Action: scroll at ({x}, {y}) with offsets (scroll_x={scroll_x}, scroll_y={scroll_y})")
                page.mouse.move(x, y)
                page.evaluate(f"window.scrollBy({scroll_x}, {scroll_y})")

            case "keypress":
                keys = action.keys
                for k in keys:
                    print(f"Action: keypress '{k}'")
                    # A simple mapping for common keys; expand as needed.
                    if k.lower() == "enter":
                        page.keyboard.press("Enter")
                    elif k.lower() == "space":
                        page.keyboard.press(" ")
                    else:
                        page.keyboard.press(k)
            
            case "type":
                text = action.text
                print(f"Action: type text: {text}")
                page.keyboard.type(text)
            
            case "wait":
                print(f"Action: wait")
                time.sleep(2)

            case "screenshot":
                # Nothing to do as screenshot is taken at each turn
                print(f"Action: screenshot")

            # Handle other actions here

            case _:
                print(f"Unrecognized action: {action}")

    except Exception as e:
        print(f"Error handling action {action}: {e}")

In [7]:
def get_screenshot(page):
    """
    Take a full-page screenshot using Playwright and return the image bytes.
    """
    return page.screenshot()

In [8]:
import time
import base64
from openai import OpenAI
client = OpenAI()

def computer_use_loop(instance, response):
    """
    Run the loop that executes computer actions until no 'computer_call' is found.
    """
    while True:
        computer_calls = [item for item in response.output if item.type == "computer_call"]
        if not computer_calls:
            print("No computer call found. Output from model:")
            for item in response.output:
                print(item)
            break  # Exit when no computer calls are issued.

        # We expect at most one computer call per response.
        computer_call = computer_calls[0]
        last_call_id = computer_call.call_id
        action = computer_call.action

        # Execute the action (function defined in step 3)
        handle_model_action(instance, action)
        time.sleep(1)  # Allow time for changes to take effect.

        # Take a screenshot after the action (function defined in step 4)
        screenshot_bytes = get_screenshot(instance)
        screenshot_base64 = base64.b64encode(screenshot_bytes).decode("utf-8")

        # Send the screenshot back as a computer_call_output
        response = client.responses.create(
            model="computer-use-preview",
            previous_response_id=response.id,
            tools=[
                {
                    "type": "computer_use_preview",
                    "display_width": 1024,
                    "display_height": 768,
                    "environment": "browser"
                }
            ],
            input=[
                {
                    "call_id": last_call_id,
                    "type": "computer_call_output",
                    "output": {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{screenshot_base64}"
                    }
                }
            ],
            truncation="auto"
        )

    return response

In [9]:
from openai import OpenAI
client = OpenAI()

response = client.responses.create(
    model="computer-use-preview",
    previous_response_id="<previous_response_id>",
    tools=[{
        "type": "computer_use_preview",
        "display_width": 1024,
        "display_height": 768,
        "environment": "browser"
    }],
    input=[
        {
            "type": "computer_call_output",
            "call_id": "<call_id>",
            "acknowledged_safety_checks": [
                {
                    "id": "<safety_check_id>",
                    "code": "malicious_instructions",
                    "message": "We've detected instructions that may cause your application to perform malicious or unauthorized actions. Please acknowledge this warning if you'd like to proceed."
                }
            ],
            "output": {
                "type": "computer_screenshot",
                "image_url": "<image_url>"
            }
        }
    ],
    truncation="auto"
)

BadRequestError: Error code: 400 - {'error': {'message': "Invalid 'input[0].output.image_url'. Expected a valid URL, but got a value with an invalid format.", 'type': 'invalid_request_error', 'param': 'input[0].output.image_url', 'code': 'invalid_value'}}

In [None]:
%pip install scrapybara

In [11]:
import os
import requests
from dotenv import load_dotenv
import json
import base64
from PIL import Image
from io import BytesIO
import io
from urllib.parse import urlparse

load_dotenv(override=True)

BLOCKED_DOMAINS = [
    "maliciousbook.com",
    "evilvideos.com",
    "darkwebforum.com",
    "shadytok.com",
    "suspiciouspins.com",
    "ilanbigio.com",
]


def pp(obj):
    print(json.dumps(obj, indent=4))


def show_image(base_64_image):
    image_data = base64.b64decode(base_64_image)
    image = Image.open(BytesIO(image_data))
    image.show()


def calculate_image_dimensions(base_64_image):
    image_data = base64.b64decode(base_64_image)
    image = Image.open(io.BytesIO(image_data))
    return image.size


def sanitize_message(msg: dict) -> dict:
    """Return a copy of the message with image_url omitted for computer_call_output messages."""
    if msg.get("type") == "computer_call_output":
        output = msg.get("output", {})
        if isinstance(output, dict):
            sanitized = msg.copy()
            sanitized["output"] = {**output, "image_url": "[omitted]"}
            return sanitized
    return msg


def create_response(**kwargs):
    url = "https://api.openai.com/v1/responses"
    headers = {
        "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
        "Content-Type": "application/json"
    }

    openai_org = os.getenv("OPENAI_ORG")
    if openai_org:
        headers["Openai-Organization"] = openai_org

    response = requests.post(url, headers=headers, json=kwargs)

    if response.status_code != 200:
        print(f"Error: {response.status_code} {response.text}")

    return response.json()


def check_blocklisted_url(url: str) -> None:
    """Raise ValueError if the given URL (including subdomains) is in the blocklist."""
    hostname = urlparse(url).hostname or ""
    if any(
        hostname == blocked or hostname.endswith(f".{blocked}")
        for blocked in BLOCKED_DOMAINS
    ):
        raise ValueError(f"Blocked URL: {url}")

In [12]:
import time
import base64
from typing import List, Dict, Literal
from playwright.sync_api import sync_playwright, Browser, Page


# Optional: key mapping if your model uses "CUA" style keys
CUA_KEY_TO_PLAYWRIGHT_KEY = {
    "/": "Divide",
    "\\": "Backslash",
    "alt": "Alt",
    "arrowdown": "ArrowDown",
    "arrowleft": "ArrowLeft",
    "arrowright": "ArrowRight",
    "arrowup": "ArrowUp",
    "backspace": "Backspace",
    "capslock": "CapsLock",
    "cmd": "Meta",
    "ctrl": "Control",
    "delete": "Delete",
    "end": "End",
    "enter": "Enter",
    "esc": "Escape",
    "home": "Home",
    "insert": "Insert",
    "option": "Alt",
    "pagedown": "PageDown",
    "pageup": "PageUp",
    "shift": "Shift",
    "space": " ",
    "super": "Meta",
    "tab": "Tab",
    "win": "Meta",
}


class BasePlaywrightComputer:
    """
    Abstract base for Playwright-based computers:

      - Subclasses override `_get_browser_and_page()` to do local or remote connection,
        returning (Browser, Page).
      - This base class handles context creation (`__enter__`/`__exit__`),
        plus standard "Computer" actions like click, scroll, etc.
      - We also have extra browser actions: `goto(url)` and `back()`.
    """

    def get_environment(self):
        return "browser"

    def get_dimensions(self):
        return (1024, 768)

    def __init__(self):
        self._playwright = None
        self._browser: Browser | None = None
        self._page: Page | None = None

    def __enter__(self):
        # Start Playwright and call the subclass hook for getting browser/page
        self._playwright = sync_playwright().start()
        self._browser, self._page = self._get_browser_and_page()

        # Set up network interception to flag URLs matching domains in BLOCKED_DOMAINS
        def handle_route(route, request):

            url = request.url
            if check_blocklisted_url(url):
                print(f"Flagging blocked domain: {url}")
                route.abort()
            else:
                route.continue_()

        self._page.route("**/*", handle_route)

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._browser:
            self._browser.close()
        if self._playwright:
            self._playwright.stop()

    def get_current_url(self) -> str:
        return self._page.url

    # --- Common "Computer" actions ---
    def screenshot(self) -> str:
        """Capture only the viewport (not full_page)."""
        png_bytes = self._page.screenshot(full_page=False)
        return base64.b64encode(png_bytes).decode("utf-8")

    def click(self, x: int, y: int, button: str = "left") -> None:
        match button:
            case "back":
                self.back()
            case "forward":
                self.forward()
            case "wheel":
                self._page.mouse.wheel(x, y)
            case _:
                button_mapping = {"left": "left", "right": "right"}
                button_type = button_mapping.get(button, "left")
                self._page.mouse.click(x, y, button=button_type)

    def double_click(self, x: int, y: int) -> None:
        self._page.mouse.dblclick(x, y)

    def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
        self._page.mouse.move(x, y)
        self._page.evaluate(f"window.scrollBy({scroll_x}, {scroll_y})")

    def type(self, text: str) -> None:
        self._page.keyboard.type(text)

    def wait(self, ms: int = 1000) -> None:
        time.sleep(ms / 1000)

    def move(self, x: int, y: int) -> None:
        self._page.mouse.move(x, y)

    def keypress(self, keys: List[str]) -> None:
        mapped_keys = [CUA_KEY_TO_PLAYWRIGHT_KEY.get(key.lower(), key) for key in keys]
        for key in mapped_keys:
            self._page.keyboard.down(key)
        for key in reversed(mapped_keys):
            self._page.keyboard.up(key)

    def drag(self, path: List[Dict[str, int]]) -> None:
        if not path:
            return
        self._page.mouse.move(path[0]["x"], path[0]["y"])
        self._page.mouse.down()
        for point in path[1:]:
            self._page.mouse.move(point["x"], point["y"])
        self._page.mouse.up()

    # --- Extra browser-oriented actions ---
    def goto(self, url: str) -> None:
        try:
            return self._page.goto(url)
        except Exception as e:
            print(f"Error navigating to {url}: {e}")

    def back(self) -> None:
        return self._page.go_back()

    def forward(self) -> None:
        return self._page.go_forward()

    # --- Subclass hook ---
    def _get_browser_and_page(self) -> tuple[Browser, Page]:
        """Subclasses must implement, returning (Browser, Page)."""
        raise NotImplementedError

In [None]:
%pip install browserbase

In [15]:
import os
from typing import Tuple, Dict, List, Union, Optional
from playwright.sync_api import Browser, Page, BrowserContext, Error as PlaywrightError

from browserbase import Browserbase
from dotenv import load_dotenv
import base64

load_dotenv()


class BrowserbaseBrowser(BasePlaywrightComputer):
    """
    Browserbase is a headless browser platform that offers a remote browser API. You can use it to control thousands of browsers from anywhere.
    You can find more information about Browserbase at https://www.browserbase.com/computer-use or view our OpenAI CUA Quickstart at https://docs.browserbase.com/integrations/openai-cua/introduction.

    IMPORTANT: This Browserbase computer requires the use of the `goto` tool defined in playwright_with_custom_functions.py.
    Make sure to include this tool in your configuration when using the Browserbase computer.
    """

    def get_dimensions(self):
        return self.dimensions

    def __init__(
        self,
        width: int = 1024,
        height: int = 768,
        region: str = "us-west-2",
        proxy: bool = False,
        virtual_mouse: bool = True,
        ad_blocker: bool = False,
    ):
        """
        Initialize the Browserbase instance. Additional configuration options for features such as persistent cookies, ad blockers, file downloads and more can be found in the Browserbase API documentation: https://docs.browserbase.com/reference/api/create-a-session

        Args:
            width (int): The width of the browser viewport. Default is 1024.
            height (int): The height of the browser viewport. Default is 768.
            region (str): The region for the Browserbase session. Default is "us-west-2". Pick a region close to you for better performance. https://docs.browserbase.com/guides/multi-region
            proxy (bool): Whether to use a proxy for the session. Default is False. Turn on proxies if you're browsing is frequently interrupted. https://docs.browserbase.com/features/proxies
            virtual_mouse (bool): Whether to enable the virtual mouse cursor. Default is True.
            ad_blocker (bool): Whether to enable the built-in ad blocker. Default is False.
        """
        super().__init__()
        self.bb = Browserbase(api_key=os.getenv("BROWSERBASE_API_KEY"))
        self.project_id = os.getenv("BROWSERBASE_PROJECT_ID")
        self.session = None
        self.dimensions = (width, height)
        self.region = region
        self.proxy = proxy
        self.virtual_mouse = virtual_mouse
        self.ad_blocker = ad_blocker

    def _get_browser_and_page(self) -> Tuple[Browser, Page]:
        """
        Create a Browserbase session and connect to it.

        Returns:
            Tuple[Browser, Page]: A tuple containing the connected browser and page objects.
        """
        # Create a session on Browserbase with specified parameters
        width, height = self.dimensions
        session_params = {
            "project_id": self.project_id,
            "browser_settings": {
                "viewport": {"width": width, "height": height},
                "blockAds": self.ad_blocker,
            },
            "region": self.region,
            "proxies": self.proxy,
        }
        self.session = self.bb.sessions.create(**session_params)

        # Print the live session URL
        print(
            f"Watch and control this browser live at https://www.browserbase.com/sessions/{self.session.id}"
        )

        # Connect to the remote session
        browser = self._playwright.chromium.connect_over_cdp(
            self.session.connect_url, timeout=60000
        )
        context = browser.contexts[0]

        # Add event listeners for page creation and closure
        context.on("page", self._handle_new_page)

        # Only add the init script if virtual_mouse is True
        if self.virtual_mouse:
            context.add_init_script(
                """
            // Only run in the top frame
            if (window.self === window.top) {
                function initCursor() {
                    const CURSOR_ID = '__cursor__';

                    // Check if cursor element already exists
                    if (document.getElementById(CURSOR_ID)) return;

                    const cursor = document.createElement('div');
                    cursor.id = CURSOR_ID;
                    Object.assign(cursor.style, {
                        position: 'fixed',
                        top: '0px',
                        left: '0px',
                        width: '20px',
                        height: '20px',
                        backgroundImage: 'url("data:image/svg+xml;utf8,<svg xmlns=\\'http://www.w3.org/2000/svg\\' viewBox=\\'0 0 24 24\\' fill=\\'black\\' stroke=\\'white\\' stroke-width=\\'1\\' stroke-linejoin=\\'round\\' stroke-linecap=\\'round\\'><polygon points=\\'2,2 2,22 8,16 14,22 17,19 11,13 20,13\\'/></svg>")',
                        backgroundSize: 'cover',
                        pointerEvents: 'none',
                        zIndex: '99999',
                        transform: 'translate(-2px, -2px)',
                    });

                    document.body.appendChild(cursor);

                    document.addEventListener("mousemove", (e) => {
                        cursor.style.top = e.clientY + "px";
                        cursor.style.left = e.clientX + "px";
                    });
                }

                // Use requestAnimationFrame for early execution
                requestAnimationFrame(function checkBody() {
                    if (document.body) {
                        initCursor();
                    } else {
                        requestAnimationFrame(checkBody);
                    }
                });
            }
            """
            )

        page = context.pages[0]
        page.on("close", self._handle_page_close)

        page.goto("https://bing.com")

        return browser, page

    def _handle_new_page(self, page: Page):
        """Handle the creation of a new page."""
        print("New page created")
        self._page = page
        page.on("close", self._handle_page_close)

    def _handle_page_close(self, page: Page):
        """Handle the closure of a page."""
        print("Page closed")
        if self._page == page:
            if self._browser.contexts[0].pages:
                self._page = self._browser.contexts[0].pages[-1]
            else:
                print("Warning: All pages have been closed.")
                self._page = None

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Clean up resources when exiting the context manager.

        Args:
            exc_type: The type of the exception that caused the context to be exited.
            exc_val: The exception instance that caused the context to be exited.
            exc_tb: A traceback object encapsulating the call stack at the point where the exception occurred.
        """
        if self._page:
            self._page.close()
        if self._browser:
            self._browser.close()
        if self._playwright:
            self._playwright.stop()

        if self.session:
            print(
                f"Session completed. View replay at https://browserbase.com/sessions/{self.session.id}"
            )

    def screenshot(self) -> str:
        """
        Capture a screenshot of the current viewport using CDP.

        Returns:
            str: A base64 encoded string of the screenshot.
        """
        try:
            # Get CDP session from the page
            cdp_session = self._page.context.new_cdp_session(self._page)

            # Capture screenshot using CDP
            result = cdp_session.send(
                "Page.captureScreenshot", {"format": "png", "fromSurface": True}
            )

            return result["data"]
        except PlaywrightError as error:
            print(
                f"CDP screenshot failed, falling back to standard screenshot: {error}"
            )
            return super().screenshot()

In [16]:
import subprocess
import time
import shlex


class DockerComputer:
    def get_environment(self):
        return "linux"

    def get_dimensions(self):
        return (1280, 720)  # Default fallback; will be updated in __enter__.

    def __init__(
        self,
        container_name="cua-sample-app",
        image="ghcr.io/openai/openai-cua-sample-app:latest",
        display=":99",
        port_mapping="5900:5900",
    ):
        self.container_name = container_name
        self.image = image
        self.display = display
        self.port_mapping = port_mapping

    def __enter__(self):
        # Check if the container is running
        result = subprocess.run(
            ["docker", "ps", "-q", "-f", f"name={self.container_name}"],
            capture_output=True,
            text=True,
        )

        if not result.stdout.strip():
            raise RuntimeError(
                f"Container {self.container_name} is not running. Build and run with:\n"
                f"docker build -t {self.container_name} .\n"
                f"docker run --rm -it --name {self.container_name} "
                f"-p {self.port_mapping} -e DISPLAY={self.display} {self.container_name}"
            )

        # Fetch display geometry
        geometry = self._exec(
            f"DISPLAY={self.display} xdotool getdisplaygeometry"
        ).strip()
        if geometry:
            w, h = geometry.split()
            self.dimensions = (int(w), int(h))
        # print("Starting Docker container...")
        # # Run the container detached, removing it automatically when it stops
        # subprocess.check_call(
        #     [
        #         "docker",
        #         "run",
        #         "-d",
        #         "--rm",
        #         "--name",
        #         self.container_name,
        #         "-p",
        #         self.port_mapping,
        #         self.image,
        #     ]
        # )
        # # Give the container a moment to start
        # time.sleep(3)
        # print("Entering DockerComputer context")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # print("Stopping Docker container...")
        # subprocess.check_call(["docker", "stop", self.container_name])
        # print("Exiting DockerComputer context")
        pass

    def _exec(self, cmd: str) -> str:
        """
        Run 'cmd' in the container.
        We wrap cmd in double quotes and escape any double quotes inside it,
        so spaces or quotes don't break the shell call.
        """
        # Escape any existing double quotes in cmd
        safe_cmd = cmd.replace('"', '\\"')

        # Then wrap the entire cmd in double quotes for `sh -c`
        docker_cmd = f'docker exec {self.container_name} sh -c "{safe_cmd}"'

        return subprocess.check_output(docker_cmd, shell=True).decode(
            "utf-8", errors="ignore"
        )

    def screenshot(self) -> str:
        """
        Takes a screenshot with ImageMagick (import), returning base64-encoded PNG.
        Requires 'import'.
        """
        # cmd = (
        #     f"export DISPLAY={self.display} && "
        #     "import -window root /tmp/screenshot.png && "
        #     "base64 /tmp/screenshot.png"
        # )
        cmd = (
            f"export DISPLAY={self.display} && "
            "import -window root png:- | base64 -w 0"
        )

        return self._exec(cmd)

    def click(self, x: int, y: int, button: str = "left") -> None:
        button_map = {"left": 1, "middle": 2, "right": 3}
        b = button_map.get(button, 1)
        self._exec(f"DISPLAY={self.display} xdotool mousemove {x} {y} click {b}")

    def double_click(self, x: int, y: int) -> None:
        self._exec(
            f"DISPLAY={self.display} xdotool mousemove {x} {y} click --repeat 2 1"
        )

    def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
        """
        For simple vertical scrolling: xdotool click 4 (scroll up) or 5 (scroll down).
        """
        self._exec(f"DISPLAY={self.display} xdotool mousemove {x} {y}")
        clicks = abs(scroll_y)
        button = 4 if scroll_y < 0 else 5
        for _ in range(clicks):
            self._exec(f"DISPLAY={self.display} xdotool click {button}")

    def type(self, text: str) -> None:
        """
        Type the given text via xdotool, preserving spaces and quotes.
        """
        # Escape single quotes in the user text: ' -> '\'\''
        safe_text = text.replace("'", "'\\''")
        # Then wrap everything in single quotes for xdotool
        cmd = f"DISPLAY={self.display} xdotool type -- '{safe_text}'"
        self._exec(cmd)

    def wait(self, ms: int = 1000) -> None:
        time.sleep(ms / 1000)

    def move(self, x: int, y: int) -> None:
        self._exec(f"DISPLAY={self.display} xdotool mousemove {x} {y}")

    def keypress(self, keys: list[str]) -> None:
        mapping = {
            "ENTER": "Return",
            "LEFT": "Left",
            "RIGHT": "Right",
            "UP": "Up",
            "DOWN": "Down",
            "ESC": "Escape",
            "SPACE": "space",
            "BACKSPACE": "BackSpace",
            "TAB": "Tab",
        }
        mapped_keys = [mapping.get(key, key) for key in keys]
        combo = "+".join(mapped_keys)
        self._exec(f"DISPLAY={self.display} xdotool key {combo}")

    def drag(self, path: list[dict[str, int]]) -> None:
        if not path:
            return
        start_x = path[0]["x"]
        start_y = path[0]["y"]
        self._exec(
            f"DISPLAY={self.display} xdotool mousemove {start_x} {start_y} mousedown 1"
        )
        for point in path[1:]:
            self._exec(
                f"DISPLAY={self.display} xdotool mousemove {point['x']} {point['y']}"
            )
        self._exec(f"DISPLAY={self.display} xdotool mouseup 1")

    def get_current_url(self):
        return None

In [17]:
from playwright.sync_api import Browser, Page



class LocalPlaywrightBrowser(BasePlaywrightComputer):
    """Launches a local Chromium instance using Playwright."""

    def __init__(self, headless: bool = False):
        super().__init__()
        self.headless = headless

    def _get_browser_and_page(self) -> tuple[Browser, Page]:
        width, height = self.get_dimensions()
        launch_args = [
            f"--window-size={width},{height}",
            "--disable-extensions",
            "--disable-file-system",
        ]
        browser = self._playwright.chromium.launch(
            chromium_sandbox=True,
            headless=self.headless,
            args=launch_args,
            env={"DISPLAY": ":0"},
        )

        context = browser.new_context()

        # Add event listeners for page creation and closure
        context.on("page", self._handle_new_page)

        page = context.new_page()
        page.set_viewport_size({"width": width, "height": height})
        page.on("close", self._handle_page_close)

        page.goto("https://bing.com")

        return browser, page

    def _handle_new_page(self, page: Page):
        """Handle the creation of a new page."""
        print("New page created")
        self._page = page
        page.on("close", self._handle_page_close)

    def _handle_page_close(self, page: Page):
        """Handle the closure of a page."""
        print("Page closed")
        if self._page == page:
            if self._browser.contexts[0].pages:
                self._page = self._browser.contexts[0].pages[-1]
            else:
                print("Warning: All pages have been closed.")
                self._page = None

In [25]:
import os
import time
from dotenv import load_dotenv
from scrapybara import Scrapybara
from playwright.sync_api import sync_playwright, Browser, Page


load_dotenv()

CUA_KEY_TO_SCRAPYBARA_KEY = {
    "/": "slash",
    "\\": "backslash",
    "arrowdown": "Down",
    "arrowleft": "Left",
    "arrowright": "Right",
    "arrowup": "Up",
    "backspace": "BackSpace",
    "capslock": "Caps_Lock",
    "cmd": "Meta_L",
    "delete": "Delete",
    "end": "End",
    "enter": "Return",
    "esc": "Escape",
    "home": "Home",
    "insert": "Insert",
    "option": "Alt_L",
    "pagedown": "Page_Down",
    "pageup": "Page_Up",
    "tab": "Tab",
    "win": "Meta_L",
}


class ScrapybaraBrowser:
    """
    Scrapybara provides virtual desktops and browsers in the cloud. https://scrapybara.com
    You can try OpenAI CUA for free at https://computer.new or read our CUA Quickstart at https://computer.new/cua.
    """

    def get_environment(self):
        return "browser"

    def get_dimensions(self):
        return (1024, 768)

    def __init__(self):
        self.client = Scrapybara(api_key="SCRAPYBARA_API_KEY=scrapy-a5472ff4-07ea-4288-b508-6784274a1afd")
        self._playwright = None
        self._browser: Browser | None = None
        self._page: Page | None = None

    def __enter__(self):
        print("Starting scrapybara browser")
        blocked_domains = [
            domain.replace("https://", "").replace("www.", "")
            for domain in BLOCKED_DOMAINS
        ]
        self.instance = self.client.start_browser(blocked_domains=blocked_domains)
        print("Scrapybara browser started ₍ᐢ•(ܫ)•ᐢ₎")
        print(
            f"You can view and interact with the stream at {self.instance.get_stream_url().stream_url}"
        )
        self._playwright = sync_playwright().start()
        self._browser = self._playwright.chromium.connect_over_cdp(
            self.instance.get_cdp_url().cdp_url
        )
        self._page = self._browser.contexts[0].pages[0]
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Stopping scrapybara browser")
        self.instance.stop()
        print("Scrapybara browser stopped ₍ᐢ-(ｪ)-ᐢ₎")

    def goto(self, url: str) -> None:
        self._page.goto(url)

    def get_current_url(self) -> str:
        return self.instance.get_current_url().current_url

    def screenshot(self) -> str:
        return self.instance.screenshot().base_64_image

    def click(self, x: int, y: int, button: str = "left") -> None:
        button = "middle" if button == "wheel" else button
        self.instance.computer(
            action="click_mouse",
            click_type="click",
            button=button,
            coordinates=[x, y],
            num_clicks=1,
        )

    def double_click(self, x: int, y: int) -> None:
        self.instance.computer(
            action="click_mouse",
            click_type="click",
            button="left",
            coordinates=[x, y],
            num_clicks=2,
        )

    def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
        self.instance.computer(
            action="scroll",
            coordinates=[x, y],
            delta_x=scroll_x // 20,
            delta_y=scroll_y // 20,
        )

    def type(self, text: str) -> None:
        self.instance.computer(action="type_text", text=text)

    def wait(self, ms: int = 1000) -> None:
        time.sleep(ms / 1000)
        # Scrapybara also has `self.instance.computer(action="wait", duration=ms / 1000)`

    def move(self, x: int, y: int) -> None:
        self.instance.computer(action="move_mouse", coordinates=[x, y])

    def keypress(self, keys: list[str]) -> None:
        mapped_keys = [
            CUA_KEY_TO_SCRAPYBARA_KEY.get(key.lower(), key.lower()) for key in keys
        ]
        self.instance.computer(action="press_key", keys=mapped_keys)

    def drag(self, path: list[dict[str, int]]) -> None:
        if not path:
            return
        path = [[point["x"], point["y"]] for point in path]
        self.instance.computer(action="drag_mouse", path=path)


class ScrapybaraUbuntu:
    """
    Scrapybara provides virtual desktops and browsers in the cloud.
    You can try OpenAI CUA for free at https://computer.new or read our CUA Quickstart at https://computer.new/cua.
    """

    def get_environment(self):
        return "linux"

    def get_dimensions(self):
        return (1024, 768)

    def __init__(self):
        self.client = Scrapybara(api_key="SCRAPYBARA_API_KEY=scrapy-a5472ff4-07ea-4288-b508-6784274a1afd")

    def __enter__(self):
        print("Starting Scrapybara Ubuntu instance")
        blocked_domains = [
            domain.replace("https://", "").replace("www.", "")
            for domain in BLOCKED_DOMAINS
        ]
        self.instance = self.client.start_ubuntu(blocked_domains=blocked_domains)
        print("Scrapybara Ubuntu instance started ₍ᐢ•(ܫ)•ᐢ₎")
        print(
            f"You can view and interact with the stream at {self.instance.get_stream_url().stream_url}"
        )
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Stopping Scrapybara Ubuntu instance")
        self.instance.stop()
        print("Scrapybara Ubuntu instance stopped ₍ᐢ-(ｪ)-ᐢ₎")

    def screenshot(self) -> str:
        return self.instance.screenshot().base_64_image

    def click(self, x: int, y: int, button: str = "left") -> None:
        button = "middle" if button == "wheel" else button
        self.instance.computer(
            action="click_mouse",
            click_type="click",
            button=button,
            coordinates=[x, y],
            num_clicks=1,
        )

    def double_click(self, x: int, y: int) -> None:
        self.instance.computer(
            action="click_mouse",
            click_type="click",
            button="left",
            coordinates=[x, y],
            num_clicks=2,
        )

    def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
        self.instance.computer(
            action="scroll",
            coordinates=[x, y],
            delta_x=scroll_x // 20,
            delta_y=scroll_y // 20,
        )

    def type(self, text: str) -> None:
        self.instance.computer(action="type_text", text=text)

    def wait(self, ms: int = 1000) -> None:
        time.sleep(ms / 1000)
        # Scrapybara also has `self.instance.computer(action="wait", duration=ms / 1000)`

    def move(self, x: int, y: int) -> None:
        self.instance.computer(action="move_mouse", coordinates=[x, y])

    def keypress(self, keys: list[str]) -> None:
        mapped_keys = [
            CUA_KEY_TO_SCRAPYBARA_KEY.get(key.lower(), key.lower()) for key in keys
        ]
        self.instance.computer(action="press_key", keys=mapped_keys)

    def drag(self, path: list[dict[str, int]]) -> None:
        if not path:
            return
        path = [[point["x"], point["y"]] for point in path]
        self.instance.computer(action="drag_mouse", path=path)

    def get_current_url(self):
        return None

In [19]:
from typing import Protocol, List, Literal, Dict


class Computer(Protocol):
    """Defines the 'shape' (methods/properties) our loop expects."""

    def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]: ...

    def get_dimensions(self) -> tuple[int, int]: ...

    def screenshot(self) -> str: ...

    def click(self, x: int, y: int, button: str = "left") -> None: ...

    def double_click(self, x: int, y: int) -> None: ...

    def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: ...

    def type(self, text: str) -> None: ...

    def wait(self, ms: int = 1000) -> None: ...

    def move(self, x: int, y: int) -> None: ...

    def keypress(self, keys: List[str]) -> None: ...

    def drag(self, path: List[Dict[str, int]]) -> None: ...

    def get_current_url() -> str: ...


computers_config = {
    "local-playwright": LocalPlaywrightBrowser,
    "docker": DockerComputer,
    "browserbase": BrowserbaseBrowser,
    "scrapybara-browser": ScrapybaraBrowser,
    "scrapybara-ubuntu": ScrapybaraUbuntu,
}

## Agent

In [20]:

import json
from typing import Callable


class Agent:
    """
    A sample agent class that can be used to interact with a computer.

    (See simple_cua_loop.py for a simple example without an agent.)
    """

    def __init__(
        self,
        model="computer-use-preview",
        computer: Computer = None,
        tools: list[dict] = [],
        acknowledge_safety_check_callback: Callable = lambda: False,
    ):
        self.model = model
        self.computer = computer
        self.tools = tools
        self.print_steps = True
        self.debug = False
        self.show_images = False
        self.acknowledge_safety_check_callback = acknowledge_safety_check_callback

        if computer:
            dimensions = computer.get_dimensions()
            self.tools += [
                {
                    "type": "computer-preview",
                    "display_width": dimensions[0],
                    "display_height": dimensions[1],
                    "environment": computer.get_environment(),
                },
            ]

    def debug_print(self, *args):
        if self.debug:
            pp(*args)

    def handle_item(self, item):
        """Handle each item; may cause a computer action + screenshot."""
        if item["type"] == "message":
            if self.print_steps:
                print(item["content"][0]["text"])

        if item["type"] == "function_call":
            name, args = item["name"], json.loads(item["arguments"])
            if self.print_steps:
                print(f"{name}({args})")

            if hasattr(self.computer, name):  # if function exists on computer, call it
                method = getattr(self.computer, name)
                method(**args)
            return [
                {
                    "type": "function_call_output",
                    "call_id": item["call_id"],
                    "output": "success",  # hard-coded output for demo
                }
            ]

        if item["type"] == "computer_call":
            action = item["action"]
            action_type = action["type"]
            action_args = {k: v for k, v in action.items() if k != "type"}
            if self.print_steps:
                print(f"{action_type}({action_args})")

            method = getattr(self.computer, action_type)
            method(**action_args)

            screenshot_base64 = self.computer.screenshot()
            if self.show_images:
                show_image(screenshot_base64)

            # if user doesn't ack all safety checks exit with error
            pending_checks = item.get("pending_safety_checks", [])
            for check in pending_checks:
                message = check["message"]
                if not self.acknowledge_safety_check_callback(message):
                    raise ValueError(
                        f"Safety check failed: {message}. Cannot continue with unacknowledged safety checks."
                    )

            call_output = {
                "type": "computer_call_output",
                "call_id": item["call_id"],
                "acknowledged_safety_checks": pending_checks,
                "output": {
                    "type": "input_image",
                    "image_url": f"data:image/png;base64,{screenshot_base64}",
                },
            }

            # additional URL safety checks for browser environments
            if self.computer.get_environment() == "browser":
                current_url = self.computer.get_current_url()
                check_blocklisted_url(current_url)
                call_output["output"]["current_url"] = current_url

            return [call_output]
        return []

    def run_full_turn(
        self, input_items, print_steps=True, debug=False, show_images=False
    ):
        self.print_steps = print_steps
        self.debug = debug
        self.show_images = show_images
        new_items = []

        # keep looping until we get a final response
        while new_items[-1].get("role") != "assistant" if new_items else True:
            self.debug_print([sanitize_message(msg) for msg in input_items + new_items])

            response = create_response(
                model=self.model,
                input=input_items + new_items,
                tools=self.tools,
                truncation="auto",
            )
            self.debug_print(response)

            if "output" not in response and self.debug:
                print(response)
                raise ValueError("No output from model")
            else:
                new_items += response["output"]
                for item in response["output"]:
                    new_items += self.handle_item(item)

        return new_items

In [21]:

tools = [
    {
        "type": "function",
        "name": "get_weather",
        "description": "Determine weather in my location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "The city and state e.g. San Francisco, CA",
                },
                "unit": {"type": "string", "enum": ["c", "f"]},
            },
            "additionalProperties": False,
            "required": ["location", "unit"],
        },
    }
]


def main():
    with ScrapybaraBrowser() as computer:
        agent = Agent(tools=tools, computer=computer)
        items = []
        while True:
            user_input = input("> ")
            items.append({"role": "user", "content": user_input})
            output_items = agent.run_full_turn(items)
            items += output_items



In [None]:
if __name__ == "__main__":
    main()

https://computer.new 
https://scrapybara.com/playground



In [None]:


tools = [
    {
        "type": "function",
        "name": "back",
        "description": "Go back to the previous page.",
        "parameters": {},
    },
    {
        "type": "function",
        "name": "goto",
        "description": "Go to a specific URL.",
        "parameters": {
            "type": "object",
            "properties": {
                "url": {
                    "type": "string",
                    "description": "Fully qualified URL to navigate to.",
                },
            },
            "additionalProperties": False,
            "required": ["url"],
        },
    },
]


def main():
    with LocalPlaywrightComputer() as computer:
        agent = Agent(computer=computer, tools=tools)
        items = [
            {
                "role": "developer",
                "content": "Use the additional back() and goto() functions to navigate the browser. If you see nothing, try going to bing.com.",
            }
        ]
        while True:
            user_input = input("> ")
            items.append({"role": "user", "content": user_input})
            output_items = agent.run_full_turn(items, show_images=False)
            items += output_items


if __name__ == "__main__":
    main()

In [26]:


with ScrapybaraBrowser() as computer:
    agent = Agent(computer=computer)
    input_items = [{"role": "user", "content": "what is the weather in sf"}]
    response_items = agent.run_full_turn(input_items, debug=True, show_images=True)
    print(response_items[-1]["content"][0]["text"])

Starting scrapybara browser


ApiError: status_code: 401, body: {'detail': 'Authentication failed: 401: Invalid API key'}

In [None]:
from openai import OpenAI
client = OpenAI()

response = client.responses.create(
  model="gpt-4.1",
  input=[
    {
      "role": "system",
      "content": [
        {
          "type": "input_text",
          "text": "you are a world class travel agent able to find the best deals for your clients."
        }
      ]
    }
  ],
  text={
    "format": {
      "type": "text"
    }
  },
  reasoning={},
  tools=[
    {
      "type": "web_search_preview",
      "user_location": {
        "type": "approximate",
        "country": "US",
        "region": "Cal",
        "city": "Orange"
      },
      "search_context_size": "medium"
    },
    {
      "type": "mcp",
      "server_label": "cloudflare",
      "server_url": "https://browser.mcp.cloudflare.com/sse",
      "headers": {
        "Authorization": "Bearer 742bda4e1f6aba015fecbb0e0b7a76b38c449"
      },
      "allowed_tools": [],
      "require_approval": "always"
    }
  ],
  temperature=1,
  max_output_tokens=2048,
  top_p=1,
  store=True
)

In [None]:
%pip install mcp 
%pip install openai 
%pip install gemini 
%pip install cohere 
%pip install anthropic 
%pip install mongodb 
%pip install vectordb 
%pip install langgraph 
%pip install guardrails 
%pip install ui

In [None]:
%pip install crewai 
%pip install crewai[tools]
%pip install html2text 
%pip install playwright 
%pip install python-dotenv

In [ ]:
# .env file should contain:
# OPENAI_API_KEY=your_openai_api_key_here
# HYPERBROWSER_API_KEY=your_hyperbrowser_key_here
# BROWSERBASE_API_KEY=your_browserbase_key_here
# BROWSERBASE_PROJECT_ID=your_project_id_here
# OPENAI_MODEL_NAME=gpt-4-turbo

In [None]:
%pip install pysqlite3-wheels

In [31]:
import os
from crewai_tools import BrowserbaseLoadTool
from playwright.sync_api import sync_playwright
from html2text import html2text
from time import sleep



def BrowserbaseLoadToolTest(url: str):
    """
    Loads a URL using a headless webbrowser

    :param url: The URL to load
    :return: The text content of the page
    """
    with sync_playwright() as playwright:
        browser = playwright.chromium.connect_over_cdp(
            "wss://connect.browserbase.com?apiKey="
            + os.environ["BROWSERBASE_API_KEY"]
        )
        context = browser.contexts[0]
        page = context.pages[0]
        page.goto(url)

        # Wait for the flight search to finish
        sleep(25)

        content = html2text(page.content())
        browser.close()
        return content
    
# Initialize the tool with the Browserbase API key and Project ID
tool = BrowserbaseLoadTool()


In [None]:
#%pip install chromadb-client
import chromadb
# Example setup of the client to connect to your chroma server
client = chromadb.HttpClient(host='localhost', port=8000)

# Or for async usage:
async def main():
    client = await chromadb.AsyncHttpClient(host='localhost', port=8000)


In [None]:
from crewai import Agent
from crewai import Task
from crewai import Crew

researcher = Agent(
  role='Senior Researcher',
  goal='Uncover groundbreaking technologies in {topic}',
  backstory=(
    "Driven by curiosity, you're at the forefront of"
    "innovation, eager to explore and share knowledge that could change"
    "the world."
  ),
  tools=[search_tool],
)

writer = Agent(
  role='Writer',
  goal='Narrate compelling tech stories about {topic}',
  backstory=(
    "With a flair for simplifying complex topics, you craft"
    "engaging narratives that captivate and educate, bringing new"
    "discoveries to light in an accessible manner."
  ),
  tools=[search_tool]
)

In [None]:
research_task = Task(
  description=(
    "Identify the next big trend in {topic}."
    "Focus on identifying pros and cons and the overall narrative."
    "Your final report should clearly articulate the key points,"
    "its market opportunities, and potential risks."
  ),
  expected_output='A comprehensive 3 paragraphs long report on the latest AI trends.',
  agent=researcher,
)

write_task = Task(
  description=(
    "Compose an insightful article on {topic}."
    "Focus on the latest trends and how it's impacting the industry."
    "This article should be easy to understand, engaging, and positive."
  ),
  expected_output='A 4 paragraph article on {topic} advancements formatted as markdown.',
  agent=writer,
  output_file='new-blog-post.md'  # Example of output customization
)

In [None]:
crew = Crew(
  agents=[researcher, writer],
  tasks=[research_task, write_task],
  memory=True,
  cache=True,
  max_rpm=100,
)

result = crew.kickoff(inputs={'topic': 'AI in healthcare'})
print(result)

In [None]:
from crewai_tools import t
from typing import Optional

@tool("Kayak tool")
def kayak(
    departure: str, destination: str, date: str, return_date: Optional[str] = None
) -> str:
    """
    Generates a Kayak URL for flights between departure and destination on the specified date.

    :param departure: The IATA code for the departure airport (e.g., 'SOF' for Sofia)
    :param destination: The IATA code for the destination airport (e.g., 'BER' for Berlin)
    :param date: The date of the flight in the format 'YYYY-MM-DD'
    :return_date: Only for two-way tickets. The date of return flight in the format 'YYYY-MM-DD'
    :return: The Kayak URL for the flight search
    """
    print(f"Generating Kayak URL for {departure} to {destination} on {date}")
    URL = f"https://www.kayak.com/flights/{departure}-{destination}/{date}"
    if return_date:
        URL += f"/{return_date}"
    URL += "?currency=USD"
    return URL

In [None]:
from crewai import Agent
# import our tools
from browserbase import browserbase
from kayak import kayak


flights_agent = Agent(
    role="Flights",
    goal="Search flights",
    backstory="I am an agent that can search for flights.",
    tools=[kayak, browserbase],
    allow_delegation=False,
)

summarize_agent = Agent(
    role="Summarize",
    goal="Summarize content",
    backstory="I am an agent that can summarize text.",
    allow_delegation=False,
)

In [None]:
from crewai import Task

# Agents definitions...

output_search_example = """
Here are our top 5 flights from San Francisco to New York on 21st September 2024:
1. Delta Airlines: Departure: 21:35, Arrival: 03:50, Duration: 6 hours 15 minutes, Price: $125, Details: https://www.kayak.com/flights/sfo/jfk/2024-09-21/12:45/13:55/2:10/delta/airlines/economy/1
"""

search_task = Task(
    description=(
        "Search flights according to criteria {request}. Current year: {current_year}"
    ),
    expected_output=output_search_example,
    agent=flights_agent,
)

In [None]:
from crewai import Task

# Agents definitions...

output_providers_example = """
Here are our top 5 picks from San Francisco to New York on 21st September 2024:
1. Delta Airlines:
    - Departure: 21:35
    - Arrival: 03:50
    - Duration: 6 hours 15 minutes
    - Price: $125
    - Booking: [Delta Airlines](https://www.kayak.com/flights/sfo/jfk/2024-09-21/12:45/13:55/2:10/delta/airlines/economy/1)
    ...
"""

search_booking_providers_task = Task(
    description="Load every flight individually and find available booking providers",
    expected_output=output_providers_example,
    agent=flights_agent,
)

In [None]:
import sys
import datetime
from crewai import Crew, Process, Task, Agent
from browserbase import browserbase
from kayak import kayak
from dotenv import load_dotenv

load_dotenv()  # take environment variables from .env.

# Tasks and Agents definitions...

crew = Crew(
    agents=[flights_agent, summarize_agent],
    tasks=[search_task, search_booking_providers_task],
    # let's cap the number of OpenAI requests as the Agents
    #   may have to do multiple costly calls with large context
    max_rpm=100,
    # let's also set verbose=True and planning=True
    #   to see the progress of the Agents
    #   and the Task execution. Remove these lines
    #   if you want to run the script without
    #   seeing the progress (like in production).
    verbose=True,
    planning=True,
)

result = crew.kickoff(
    inputs={
        "request": sys.argv[1],
        "current_year": datetime.date.today().year,
    }
)

print(result)

python3 main.py "San Francisco to New York one-way on 21st September"

In [39]:
from crewai_tools import HyperbrowserLoadTool
from crewai import Agent

# Initialize the tool with your API key
tool = HyperbrowserLoadTool(api_key="742bda4e1f6aba015fecbb0e0b7a76b38c449")  # Or use environment variable

# Define an agent that uses the tool
@Agent
def web_researcher(self) -> Agent:
    '''
    This agent uses the HyperbrowserLoadTool to scrape websites
    and extract information.
    '''
    return Agent(
        config=self.agents_config["web_researcher"],
        tools=[tool]
    )

TypeError: BaseModel.__init__() takes 1 positional argument but 2 were given

In [40]:
%pip install -U langchain_community langchain-openai langchain-anthropic langchain langgraph bs4 langchain_core

Collecting langchain-openai
  Downloading langchain_openai-0.3.19-py3-none-any.whl.metadata (2.3 kB)
Collecting langchain-anthropic
  Downloading langchain_anthropic-0.3.15-py3-none-any.whl.metadata (1.9 kB)
Collecting bs4
  Downloading bs4-0.0.2-py2.py3-none-any.whl.metadata (411 bytes)
Downloading langchain_openai-0.3.19-py3-none-any.whl (64 kB)
Downloading langchain_anthropic-0.3.15-py3-none-any.whl (28 kB)
Downloading bs4-0.0.2-py2.py3-none-any.whl (1.2 kB)
Installing collected packages: bs4, langchain-openai, langchain-anthropic

  Attempting uninstall: langchain-openai

    Found existing installation: langchain-openai 0.2.14

    Uninstalling langchain-openai-0.2.14:

      Successfully uninstalled langchain-openai-0.2.14

   ------------- -------------------------- 1/3 [langchain-openai]
   ---------------------------------------- 3/3 [langchain-anthropic]

Successfully installed bs4-0.0.2 langchain-anthropic-0.3.15 langchain-openai-0.3.19
Note: you may need to restart the kern

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
embedchain 0.1.128 requires langchain-openai<0.3.0,>=0.2.1, but you have langchain-openai 0.3.19 which is incompatible.


In [None]:
from bs4 import BeautifulSoup as Soup
from langchain_community.document_loaders.recursive_url_loader import RecursiveUrlLoader

# LCEL docs
url = "https://python.langchain.com/docs/how_to/sequence/#related"
loader = RecursiveUrlLoader(
    url=url, max_depth=20, extractor=lambda x: Soup(x, "html.parser").text
)
docs = loader.load()

# Sort the list based on the URLs and get the text
d_sorted = sorted(docs, key=lambda x: x.metadata["source"])
d_reversed = list(reversed(d_sorted))
concatenated_content = "\n\n\n --- \n\n\n".join(
    [doc.page_content for doc in d_reversed]
)

In [9]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

### OpenAI

# Grader prompt
code_gen_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are a coding assistant with expertise in LCEL, LangChain expression language. \n
    Here is a full set of LCEL documentation:  \n ------- \n  {context} \n ------- \n Answer the user
    question based on the above provided documentation. Ensure any code you provide can be executed \n
    with all required imports and variables defined. Structure your answer with a description of the code solution. \n
    Then list the imports. And finally list the functioning code block. Here is the user question:""",
        ),
        ("placeholder", "{messages}"),
    ]
)


# Data model
class code(BaseModel):
    """Schema for code solutions to questions about LCEL."""

    prefix: str = Field(description="Description of the problem and approach")
    imports: str = Field(description="Code block import statements")
    code: str = Field(description="Code block not including import statements")


expt_llm = "gpt-4o"
llm = ChatOpenAI(temperature=0, model=expt_llm)
code_gen_chain_oai = code_gen_prompt | llm.with_structured_output(code)
question = "How do I build a RAG chain in LCEL?"
solution = code_gen_chain_oai.invoke(
    {"context": concatenated_content, "messages": [("user", question)]}
)
solution

code(prefix='To build a Retrieval Augmented Generation (RAG) chain in LangChain Expression Language (LCEL), you need to chain together components that handle retrieval and generation. Typically, this involves using a retriever to fetch relevant documents based on a query, and then using a language model to generate a response based on the retrieved documents. The process can be implemented by chaining runnables using the pipe operator `|` or the `.pipe()` method. Below is an example of how you might set up such a chain using a prompt template, a retriever, a chat model, and an output parser.', imports='from langchain_core.prompts import ChatPromptTemplate\nfrom langchain_core.retrievers import SimpleRetriever\nfrom langchain.chat_models import init_chat_model\nfrom langchain_core.output_parsers import StrOutputParser', code='# Initialize the chat model\nmodel = init_chat_model("gemini-2.0-flash", model_provider="google_genai")\n\n# Define a prompt template for the retrieval augmented g

In [10]:
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate

### Anthropic

# Prompt to enforce tool use
code_gen_prompt_claude = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """<instructions> You are a coding assistant with expertise in LCEL, LangChain expression language. \n
    Here is the LCEL documentation:  \n ------- \n  {context} \n ------- \n Answer the user  question based on the \n
    above provided documentation. Ensure any code you provide can be executed with all required imports and variables \n
    defined. Structure your answer: 1) a prefix describing the code solution, 2) the imports, 3) the functioning code block. \n
    Invoke the code tool to structure the output correctly. </instructions> \n Here is the user question:""",
        ),
        ("placeholder", "{messages}"),
    ]
)


# LLM
expt_llm = "claude-3-opus-20240229"
llm = ChatAnthropic(
    model=expt_llm,
    default_headers={"anthropic-beta": "tools-2024-04-04"},
)

structured_llm_claude = llm.with_structured_output(code, include_raw=True)


# Optional: Check for errors in case tool use is flaky
def check_claude_output(tool_output):
    """Check for parse error or failure to call the tool"""

    # Error with parsing
    if tool_output["parsing_error"]:
        # Report back output and parsing errors
        print("Parsing error!")
        raw_output = str(tool_output["raw"].content)
        error = tool_output["parsing_error"]
        raise ValueError(
            f"Error parsing your output! Be sure to invoke the tool. Output: {raw_output}. \n Parse error: {error}"
        )

    # Tool was not invoked
    elif not tool_output["parsed"]:
        print("Failed to invoke tool!")
        raise ValueError(
            "You did not use the provided tool! Be sure to invoke the tool to structure the output."
        )
    return tool_output


# Chain with output check
code_chain_claude_raw = (
    code_gen_prompt_claude | structured_llm_claude | check_claude_output
)


def insert_errors(inputs):
    """Insert errors for tool parsing in the messages"""

    # Get errors
    error = inputs["error"]
    messages = inputs["messages"]
    messages += [
        (
            "assistant",
            f"Retry. You are required to fix the parsing errors: {error} \n\n You must invoke the provided tool.",
        )
    ]
    return {
        "messages": messages,
        "context": inputs["context"],
    }


# This will be run as a fallback chain
fallback_chain = insert_errors | code_chain_claude_raw
N = 3  # Max re-tries
code_gen_chain_re_try = code_chain_claude_raw.with_fallbacks(
    fallbacks=[fallback_chain] * N, exception_key="error"
)


def parse_output(solution):
    """When we add 'include_raw=True' to structured output,
    it will return a dict w 'raw', 'parsed', 'parsing_error'."""

    return solution["parsed"]


# Optional: With re-try to correct for failure to invoke tool
code_gen_chain = code_gen_chain_re_try | parse_output

# No re-try
code_gen_chain = code_gen_prompt_claude | structured_llm_claude | parse_output

code_gen_chain = code_gen_chain_oai

### CrewAI Tool Flow

In [None]:
# Importing Crew related components
from crewai import Agent, Task, Crew

# Importing CrewAI Tools
from crewai_tools import WebsiteSearchTool

# Importing Pydantic
from pydantic import BaseModel, Field

class CodeSolution(BaseModel):
  prefix: str = Field(description="Description of the problem and approach")
  imports: str = Field(description="Code block import statements")
  code: str = Field(description="Code block not including import statements")

# Create the coding assistant agent
coding_assistant = Agent(
    role='Coding Assistant',
    goal='Provide accurate and executable code solutions using LCEL',
    backstory="""You are a coding assistant with expertise in LCEL, LangChain expression language. \n
    Here is the LCEL documentation:  \n ------- \n  {context} \n ------- \n
    Answer the user  question based on the \n
    above provided documentation. Ensure any code you provide can be executed with all required imports and variables \n
    defined.""",
    verbose=False,
    llm='gpt-4o'
)

# Create task for code generation
code_generation_task = Task(
    description="""Answer the user question based on the above provided documentation. Ensure any code you provide can be executed
    with all required imports and variables defined. Structure your answer:
    1) a prefix describing the code solution
    2) the imports
    3) the functioning code block

    Your coding task:
    {question}
    """,
    expected_output="Code solution with prefix description, imports, and executable code block",
    agent=coding_assistant,
    output_pydantic=CodeSolution
)

# Create the crew
code_crew = Crew(
    agents=[coding_assistant],
    tasks=[code_generation_task],
    verbose=False
)

In [None]:
from typing import List

class CodeGenState(BaseModel):
    """
    State for the code generation flow
    """
    error: str = ""
    question: str = ""
    messages: List = []
    generation: str = ""
    iterations: int = 0
    max_iterations: int = 3

In [None]:
# Importing CrewAI Flow related components
from crewai.flow.flow import Flow, listen, start, router

class CodeGenFlow(Flow[CodeGenState]):
  def check_code(self):
    print("---CHECKING CODE---")

    code_solution = self.state.generation
    imports = code_solution.imports
    code = code_solution.code

    try:
      exec(imports)
    except Exception as e:
      print("---CODE IMPORT CHECK: FAILED---")
      self.state.error = str(e)
      return "code_failed"

    try:
      exec(imports + "\n" + code)
    except Exception as e:
      print("---CODE BLOCK CHECK: FAILED---")
      self.state.error = str(e)
      return "code_failed"

    print("---NO CODE TEST FAILURES---")
    return "success"

  def fix_code(self):
    if self.state.error != "":
      print("---FIXING CODE---")
      # Create task for fixing code
      code_fix_task = Task(
          description="""You are a coding assistant with expertise in LCEL, LangChain expression language.
          Here is a full set of LCEL documentation:
          -------
          {context}
          -------

          The previous code attempt failed with the following error:
          {error}

          Your coding task:
          {question}

          Previous code attempt:
          {explanation}
          {imports}
          {code}

          Answer with a description of the code solution, followed by the imports, and finally the functioning code block.
          Ensure all imports are correct and the code is executable.""",
          expected_output= "A working code solution to the problem",
          agent=coding_assistant,
          output_pydantic=CodeSolution
      )

      # Create crew for fixing code
      fix_crew = Crew(
          agents=[coding_assistant],
          tasks=[code_fix_task]
      )

      # Execute fix
      result = fix_crew.kickoff(
          inputs={
              "error": self.state.error,
              "question": self.state.question,
              "explanation": self.state.generation.prefix,
              "imports": self.state.generation.imports,
              "code": self.state.generation.code,
              "context": concatenated_content
          }
      )
      self.state.generation = result.pydantic
      self.state.error = ""

  @start()
  def generate_code(self):
    print("---GENERATING CODE SOLUTION---")
    result = code_crew.kickoff(
      inputs={
        "question": self.state.question,
        "context": concatenated_content
      }
    )
    self.state.generation = result.pydantic
    self.state.error = ""

  @router(generate_code)
  def run_check(self):
    result = self.check_code()
    if result != "success":
      return "fix_code"

  @listen('fix_code')
  def run_fix(self):
    self.fix_code()

  @router(run_fix)
  def re_run_check(self):
    result = self.check_code()
    if result != "success":
      return "refix_code"

  @listen('refix_code')
  def re_run_fix(self):
    self.fix_code()

  @listen(re_run_fix)
  def re_re_run_check(self):
    self.check_code()

In [47]:
%pip install nest_asyncio

Note: you may need to restart the kernel to use updated packages.


In [6]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
%pip install jupyter ipywidgets

In [None]:

code_flow = CodeGenFlow()
code_flow.kickoff(inputs={"question": 'How do I build a RAG chain in LCEL?'})
code_flow.state.generation

Output()

CodeSolution(prefix='The code solution demonstrates how to build a Retrieval Augmented Generation (RAG) chain using LangChain Expression Language (LCEL). This involves defining a retriever that fetches relevant documents and a chat model to generate a response. The retrieved information and query are processed to provide an insightful answer. Ensure you replace `MockRetriever` with an actual retriever implementation suitable for your context.', imports='from langchain_core.prompts import ChatPromptTemplate\nfrom langchain_core.output_parsers import StrOutputParser\nfrom langchain_core.runnables import RunnableParallel\nfrom langchain.chat_models import init_chat_model\n', code='class MockRetriever:\n    def __call__(self, input):\n        return [{\'document\': \'AI is beneficial in healthcare for predictive analytics, personalized treatment, and administrative assistance.\'}]\n\ndef get_retriever():\n    return MockRetriever()\n\n# Initialize the retriever\nretriever = get_retriever()

In [None]:
%pip install jupyterthemes
%pip install --upgrade jupyterthemes

In [None]:
%pip install autots

In [None]:
%

In [None]:
def check_import(solution) -> dict:
    imports = solution.imports
    try:
        exec(imports)
        return {"key": "import_check", "score": 1}
    except Exception:
        return {"key": "import_check", "score": 0}


def check_execution(solution) -> dict:
    imports = solution.imports
    code = solution.code
    try:
        exec(imports + "\n" + code)
        return {"key": "code_execution_check", "score": 1}
    except Exception:
        return {"key": "code_execution_check", "score": 0}

In [None]:
import pandas as pd

# Load the evaluation data
df = pd.read_csv("eval.csv")

# Store evaluation results
results = []

for _, row in df.iterrows():
    question = row["question"]
    # Run the workflow for each question
    solution = app.invoke({"messages": [("user", question)], "iterations": 0, "error": ""})

    # Run evaluations
    import_check = check_import(solution["generation"])
    execution_check = check_execution(solution["generation"])

    # Store results
    result = {
        "question": question,
        "import_check": import_check["score"],
        "execution_check": execution_check["score"]
    }
    results.append(result)

# Convert results to dataframe
lg_df = pd.DataFrame(results)
print("\nEvaluation Results:")
print(lg_df)