In [1]:
import asyncio
import time
import base64
from playwright.async_api import async_playwright
from openai import OpenAI

In [2]:
async def handle_model_action(page, action):
    """
    Ejecuta la acción sugerida por el modelo en la página usando API asíncrona.
    """
    action_type = action.get("type")
    try:
        if action_type == "click":
            x, y = action.get("x"), action.get("y")
            button = action.get("button", "left")
            print(f"Acción: click en ({x}, {y}) con botón '{button}'")
            if button not in ["left", "right"]:
                button = "left"
            await page.mouse.click(x, y, button=button)

        elif action_type == "scroll":
            x, y = action.get("x"), action.get("y")
            scroll_x, scroll_y = action.get("scroll_x", 0), action.get("scroll_y", 0)
            print(f"Acción: scroll en ({x}, {y}) con offset (x={scroll_x}, y={scroll_y})")
            await page.mouse.move(x, y)
            await page.evaluate(f"window.scrollBy({scroll_x}, {scroll_y})")

        elif action_type == "keypress":
            keys = action.get("keys", [])
            for k in keys:
                print(f"Acción: pulsación de tecla '{k}'")
                if k.lower() == "enter":
                    await page.keyboard.press("Enter")
                elif k.lower() == "space":
                    await page.keyboard.press(" ")
                else:
                    await page.keyboard.press(k)

        elif action_type == "type":
            text = action.get("text", "")
            print(f"Acción: escribir texto: {text}")
            await page.keyboard.type(text)

        elif action_type == "wait":
            print("Acción: espera de 2 segundos")
            await asyncio.sleep(2)

        elif action_type == "screenshot":
            print("Acción: capturar screenshot (se hace automáticamente)")
            
        else:
            print(f"Acción no reconocida: {action}")
    except Exception as e:
        print(f"Error al ejecutar la acción {action}: {e}")


In [3]:
async def get_screenshot(page):
    """
    Toma un screenshot completo de la página y devuelve los bytes de la imagen.
    """
    return await page.screenshot()

In [4]:
async def computer_use_loop(page, client, response):
    """
    Ejecuta en bucle las acciones sugeridas por la API CUA hasta que no se
    solicite ninguna acción más.
    """
    while True:
        computer_calls = [item for item in response.get("output", []) if item.get("type") == "computer_call"]
        if not computer_calls:
            print("No se encontró ninguna llamada de computadora. Salida del modelo:")
            for item in response.get("output", []):
                print(item)
            break

        computer_call = computer_calls[0]
        last_call_id = computer_call.get("call_id")
        action = computer_call.get("action", {})

        await handle_model_action(page, action)
        await asyncio.sleep(1)  # Espera para que la acción surta efecto

        screenshot_bytes = await get_screenshot(page)
        screenshot_base64 = base64.b64encode(screenshot_bytes).decode("utf-8")

        response = client.responses.create(
            model="computer-use-preview",
            previous_response_id=response.get("id"),
            tools=[{
                "type": "computer_use_preview",
                "display_width": 1024,
                "display_height": 768,
                "environment": "browser"
            }],
            input=[{
                "call_id": last_call_id,
                "type": "computer_call_output",
                "output": {
                    "type": "input_image",
                    "image_url": f"data:image/png;base64,{screenshot_base64}"
                }
            }],
            truncation="auto"
        )
    return response

In [None]:
async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=False,
            chromium_sandbox=True,
            env={},
            args=[
                "--disable-extensions",
                "--disable-file-system"
            ]
        )
        page = await browser.new_page()
        await page.set_viewport_size({"width": 1024, "height": 768})
        await page.goto("https://bing.com")
        await asyncio.sleep(2)

        client = OpenAI()

        response = client.responses.create(
            model="computer-use-preview",
            tools=[{
                "type": "computer_use_preview",
                "display_width": 1024,
                "display_height": 768,
                "environment": "browser"
            }],
            input=[{
                "role": "user",
                "content": "Check the latest OpenAI news on bing.com."
            }],
            reasoning={
                "generate_summary": "concise"
            },
            truncation="auto"
        )
        print("Respuesta inicial del modelo:")
        print(response.get("output"))

        final_response = await computer_use_loop(page, client, response)
        print("Respuesta final del modelo:")
        print(final_response)

        await browser.close()




In [7]:
await main()

NotImplementedError: 