In [7]:
"""
Agentic sampling loop that calls the Anthropic API and local implementation of computer use tools.
"""
import time
import json
import asyncio
import platform
from collections.abc import Callable
from datetime import datetime
from enum import StrEnum
from typing import Any, cast, Dict

from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex, APIResponse
from anthropic.types import (
    ToolResultBlockParam,
    TextBlock,
)
from anthropic.types.beta import (
    BetaContentBlock,
    BetaContentBlockParam,
    BetaImageBlockParam,
    BetaMessage,
    BetaMessageParam,
    BetaTextBlockParam,
    BetaToolResultBlockParam,
)
from computer_use_ootb_internal.computer_use_demo.tools import BashTool, ComputerTool, EditTool, ToolCollection, ToolResult

import torch

from computer_use_ootb_internal.computer_use_demo.gui_agent.anthropic_agent import AnthropicActor
from computer_use_ootb_internal.computer_use_demo.executor.anthropic_executor import AnthropicExecutor
from computer_use_ootb_internal.computer_use_demo.gui_agent.planner.api_vlm_planner import APIVLMPlanner
from computer_use_ootb_internal.computer_use_demo.gui_agent.planner.local_vlm_planner import LocalVLMPlanner
from computer_use_ootb_internal.computer_use_demo.gui_agent.planner.teach_mode_vlm_planner import TeachModeVLMPlanner, split_button_reference

from computer_use_ootb_internal.computer_use_demo.gui_agent.showui_agent import ShowUIActor
from computer_use_ootb_internal.computer_use_demo.gui_agent.actor.llm_actor import LLMActor
from computer_use_ootb_internal.computer_use_demo.gui_agent.gui_parser.simple_parser.gui_parser import parse_gui

from computer_use_ootb_internal.computer_use_demo.executor.showui_executor import ShowUIExecutor
from computer_use_ootb_internal.computer_use_demo.tools.colorful_text import colorful_text_showui, colorful_text_vlm
from computer_use_ootb_internal.computer_use_demo.tools.screen_capture import get_screenshot
from computer_use_ootb_internal.computer_use_demo.gui_agent.llm_utils.oai import encode_image


BETA_FLAG = "computer-use-2024-10-22"


class APIProvider(StrEnum):
    ANTHROPIC = "anthropic"
    BEDROCK = "bedrock"
    VERTEX = "vertex"
    OPENAI = "openai"
    QWEN = "qwen"


PROVIDER_TO_DEFAULT_MODEL_NAME: dict[APIProvider, str] = {
    APIProvider.ANTHROPIC: "claude-3-5-sonnet-20241022",
    APIProvider.BEDROCK: "anthropic.claude-3-5-sonnet-20241022-v2:0",
    APIProvider.VERTEX: "claude-3-5-sonnet-v2@20241022",
    # APIProvider.OPENAI: "gpt-4o",
    # APIProvider.QWEN: "qwen2vl",
}

def sampling_loop_sync(
    model: str,
    provider: APIProvider | None,
    system_prompt_suffix: str,
    messages: list[BetaMessageParam],
    output_callback: Callable[[BetaContentBlock], None],
    tool_output_callback: Callable[[ToolResult, str], None],
    api_response_callback: Callable[[APIResponse[BetaMessage]], None],
    api_key: str,
    only_n_most_recent_images: int | None = None,
    max_tokens: int = 4096,
    selected_screen: int = 0,
    user_id: str = None,
    trace_id: str = None
):
    """
    Synchronous agentic sampling loop for the assistant/tool interaction of computer use.
    """
    
    if torch.cuda.is_available(): device = torch.device("cuda")
    elif torch.backends.mps.is_available(): device = torch.device("mps")
    else: device = torch.device("cpu") # support: 'cpu', 'mps', 'cuda'
    print(f"Model inited on device: {device}.")
    

    # TODO: More advanced way for building the model
    if model == "claude-3-5-sonnet-20241022":
        # Register Actor and Executor
        actor = AnthropicActor(
            model=model, 
            provider=provider, 
            system_prompt_suffix=system_prompt_suffix, 
            api_key=api_key, 
            api_response_callback=api_response_callback,
            max_tokens=max_tokens,
            only_n_most_recent_images=only_n_most_recent_images,
            selected_screen=selected_screen
        )

        # from IPython.core.debugger import Pdb; Pdb().set_trace()
        executor = AnthropicExecutor(
            output_callback=output_callback,
            tool_output_callback=tool_output_callback,
            selected_screen=selected_screen
        )

    elif model in ["teach-mode-gpt-4o-mini", "teach-mode-gpt-4o","teach-mode-qwen-vl-7b-instruct"]: # Teach mode loop
        executor = ShowUIExecutor(
        output_callback=output_callback,
        tool_output_callback=tool_output_callback,
        selected_screen=0
        )
        model_mapping = {
            "teach-mode-gpt-4o-mini": "gpt-4o-mini",
            "teach-mode-gpt-4o": "gpt-4o",
            "teach-mode-qwen-vl-7b-instruct": "qwen-vl-7b-instruct"
        }

        llm_mode = model_mapping.get(model)
        actor = LLMActor(llm_model=llm_mode, output_callback=output_callback, device=device, selected_screen=selected_screen)
        planner = TeachModeVLMPlanner(model=llm_mode, output_callback=output_callback, api_response_callback=api_response_callback, 
                                      user_id=user_id, trace_id=trace_id, max_tokens=max_tokens, only_n_most_recent_images=only_n_most_recent_images, 
                                      selected_screen=selected_screen, print_usage=True, device=device, system_prompt_suffix=system_prompt_suffix)
        
        while True:
            # TODO: Only support Windows Platform for now, check the platform
            print("platform.system():", platform.system())
            if platform.system() != "Windows":
                raise ValueError("Teach mode only supports Windows Platform for now.")
            print("messages:", messages)
            vlm_response = planner(messages=messages)

            next_action = json.loads(vlm_response)['Next Action']
            next_action = split_button_reference(next_action)

            yield next_action

            # Screenshot for final step
            if next_action == None or next_action == "" or next_action == "None":
                final_sc, final_sc_path = get_screenshot(selected_screen=selected_screen)
                output_callback(f'No more actions from {colorful_text_vlm}. End of task. Final State:\n<img src="data:image/png;base64,{encode_image(str(final_sc_path))}">',
                                sender="bot")
                yield None

            output_callback(f"{colorful_text_vlm} sending action to ootb-small:\n{next_action}", sender="bot")

            # TODO: optimize the screenshot capture, too many times
            uia_data, screenshot_path = get_screenshot(selected_screen=selected_screen)

            # TODO: only gui as output
            gui = parse_gui(user_id=user_id, trace_id=trace_id, screenshot_path=screenshot_path, user_scaleFactor="auto", uia_data=uia_data, query=next_action, mode="teach", ocr_mode="googleocr")
            actor_response = actor(next_action, gui, screenshot_path)
            
            yield actor_response

            for message, tool_result_content in executor(actor_response, messages):
                time.sleep(0.5)
                yield message

            messages.append({"role": "user",
                             "content": ["History plan:" + str(json.loads(vlm_response)) + 
                                        "History actions:" + str(actor_response["content"])]
                            })
            print(f"End of loop. Messages: {str(messages)[:100000]}. Total cost: $USD{planner.total_cost:.5f}")
            
def output_callback(response, sender):
    # print(response)
    print("aa")
def api_response_callback(response, sender):
    print("bb")

def tool_output_callback(tool_result, sender):
    print(tool_result)
            

In [9]:
import os
def get_trace_information(user_id, trace_id):
    ootb_path = os.getenv("OOTB_PATH")
    trace_path = os.path.join(ootb_path, "ootbdatabase", user_id, trace_id,"trace_information.json")
    trace_information = json.load(open(trace_path, "r"))
    return trace_information


def post_process_trace_information(trace_information, trace_description):
    result = trace_description
    for idx, step in enumerate(trace_information["trajectory"]):
        result += f"\n{idx}: {step["action_discription"]}"
    return result

trace_description = "You need to complete the 'Echo of War - Inner Beast's Battlefield' dungeon in the game 'Honkai: Star Rail' to receive the weekly rewards."
sys_prompt = post_process_trace_information(get_trace_information("user_id", "trace_id1"), trace_description)
print(sys_prompt)


SyntaxError: f-string: unmatched '[' (4238654522.py, line 12)

NameError: name 'trace_description' is not defined

In [6]:
response = '```json\n{"action": "CLICK", "text": "Dungeon Menu", "coordinate": [945, 12]}\n```'
# 从response中提取json内容
json_str = response.strip('`json\n').strip()
json_data = json.loads(json_str)
print(json_data)


{'action': 'CLICK', 'text': 'Dungeon Menu', 'coordinate': [945, 12]}


In [None]:
'```json\n{"action": "CLICK", "text": "Dungeon Menu", "coordinate": [945, 12]}\n```'

In [5]:

import os
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:10809'
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:10809'
os.environ["OPENAI_API_KEY"] = "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"
os.environ["GEMINI_API_KEY"] = "AIzaSyDgjkdmiKIzbDk3QjXoviLMQDMtpYps7gM"
os.environ["OOTB_PATH"] = r"D:\develop\computer_use_ootb_internal-main\computer_use_demo"
os.environ['GOOGLE_API_KEY'] = "AIzaSyA5v66c7lIhpAjpxiflKY4VU41AGHcENHM"

import time
time.sleep(5)
trace_description = "You need to complete the 'Echo of War - Inner Beast's Battlefield' dungeon in the game 'Honkai: Star Rail' to receive the weekly rewards."
messages = [
    {
        "role": "user",
        "content": trace_description
    }
]

# trace_information = get_trace_information("user_id", "trace_id1")
a = sampling_loop_sync(model="teach-mode-gpt-4o-mini", provider=None, system_prompt_suffix="", messages=messages, output_callback=output_callback, tool_output_callback=tool_output_callback, api_response_callback=api_response_callback, api_key="", only_n_most_recent_images=None, max_tokens=4096, selected_screen=0, user_id="user_id", trace_id="trace_id1")

for i in a:
    print(i)


Model inited on device: cuda.
Screen BBox: (0, 0, 3840, 2160)
platform.system(): Windows
messages: [{'role': 'user', 'content': "You need to complete the 'Echo of War - Inner Beast's Battlefield' dungeon in the game 'Honkai: Star Rail' to receive the weekly rewards."}]
filtered_messages: ["You need to complete the 'Echo of War - Inner Beast's Battlefield' dungeon in the game 'Honkai: Star Rail' to receive the weekly rewards."]
aa
Sending messages to VLMPlanner: ["You need to complete the 'Echo of War - Inner Beast's Battlefield' dungeon in the game 'Honkai: Star Rail' to receive the weekly rewards."]
[litellm]-[gpt-4o-mini] sending messages: [{'role': 'system', 'content': '\nYou are using an Windows device.\nYou are able to use a mouse and keyboard to interact with the computer based on the given task and screenshot.\nYou can only interact with the desktop GUI (no terminal or application menu access).\n\nYou may be given some history plan and actions, this is the response from the prev

KeyboardInterrupt: 

In [11]:
import ast

def parse_showui_output( output_text: str):
    try:
        # Remove markdown code block indicators
        output_text = output_text.strip().strip('```').strip('json').strip()

        # Process single dictionary
        if output_text.startswith("{") and output_text.endswith("}"):
            output_text = f"[{output_text}]"

        # Validate if the output resembles a list of dictionaries
        if not (output_text.startswith("[") and output_text.endswith("]")):
            raise ValueError("Output does not look like a valid list or dictionary.")

        print("Output Text:", output_text)

        # Safely evaluate the string to a Python object
        parsed_output = ast.literal_eval(output_text)

        print("Parsed Output:", parsed_output)

        if isinstance(parsed_output, dict):
            parsed_output = [parsed_output]
        elif not isinstance(parsed_output, list):
            raise ValueError("Parsed output is neither a dictionary nor a list.")

        if not all(isinstance(item, dict) for item in parsed_output):
            raise ValueError("Not all items in the parsed output are dictionaries.")

        # Refine key: value pairs, mapping to the Anthropic's format
        refined_output = []

        for action_item in parsed_output:
            print("Action Item:", action_item)
            # Sometimes showui returns lowercase action names
            action_item["action"] = action_item["action"].upper()

            if action_item["action"] not in self.supported_action_type:
                raise ValueError(f"Action {action_item['action']} not supported. Check the output from ShowUI: {output_text}")

            elif action_item["action"] == "CLICK":  # 1. click -> mouse_move + left_click
                x, y = action_item["coordinate"]
                action_item["coordinate"] = (int(x * (self.screen_bbox[2] - self.screen_bbox[0])),
                                            int(y * (self.screen_bbox[3] - self.screen_bbox[1])))
                refined_output.append({"action": "mouse_move", "text": None, "coordinate": tuple(action_item["coordinate"])})
                refined_output.append({"action": "left_click", "text": None, "coordinate": None})

            elif action_item["action"] == "INPUT":  # 2. input -> type
                refined_output.append({"action": "type", "text": action_item["text"], "coordinate": None})

            elif action_item["action"] == "ENTER":  # 3. enter -> key, enter
                refined_output.append({"action": "key", "text": "Enter", "coordinate": None})

            elif action_item["action"] == "ESC" or action_item["action"] == "ESCAPE":  # 4. esc -> key, escape
                refined_output.append({"action": "key", "text": "Escape", "coordinate": None})

            elif action_item["action"] == "HOVER":  # 5. hover -> mouse_move
                x, y = action_item["coordinate"]
                action_item["coordinate"] = (int(x * (self.screen_bbox[2] - self.screen_bbox[0])),
                                            int(y * (self.screen_bbox[3] - self.screen_bbox[1])))
                refined_output.append({"action": "mouse_move", "text": None, "coordinate": tuple(action_item["coordinate"])})

            elif action_item["action"] == "SCROLL":  # 6. scroll -> key: pagedown or pageup
                if action_item["text"] == "up":
                    refined_output.append({"action": "key", "text": "pageup", "coordinate": None})
                elif action_item["text"] == "down":
                    refined_output.append({"action": "key", "text": "pagedown", "coordinate": None})
                else:
                    raise ValueError(f"Scroll direction {action_item['text']} not supported.")

            elif action_item["action"] == "PRESS":  # 7. press
                x, y = action_item["coordinate"]
                action_item["coordinate"] = (int(x * (self.screen_bbox[2] - self.screen_bbox[0])),
                                            int(y * (self.screen_bbox[3] - self.screen_bbox[1])))
                refined_output.append({"action": "mouse_move", "text": None, "coordinate": tuple(action_item["coordinate"])})
                refined_output.append({"action": "left_press", "text": None, "coordinate": None})

        return refined_output

    except Exception as e:
        print(f"Error parsing output: {e}")
        return None
parse_showui_output('```json\n{"action": "CLICK", "text": "Echo of War - Inner Beast\'s Battlefield", "coordinate": [946, 12]}\n```')

Output Text: [{"action": "CLICK", "text": "Echo of War - Inner Beast's Battlefield", "coordinate": [946, 12]}]
Parsed Output: [{'action': 'CLICK', 'text': "Echo of War - Inner Beast's Battlefield", 'coordinate': [946, 12]}]
Action Item: {'action': 'CLICK', 'text': "Echo of War - Inner Beast's Battlefield", 'coordinate': [946, 12]}
Error parsing output: name 'self' is not defined


In [14]:
a =  {"action": "CLICK", "text": "Echo of War - Inner Beast\'s Battlefield", "coordinate": [946, 12]}
print(json.dumps(a))

{"action": "CLICK", "text": "Echo of War - Inner Beast's Battlefield", "coordinate": [946, 12]}
