diff --git a/server/server.py b/server/server.py index a3aed2b..a89aa6c 100644 --- a/server/server.py +++ b/server/server.py @@ -1,10 +1,11 @@ import anyio import click -import httpx import asyncio +import uuid +from datetime import datetime from langchain_openai import ChatOpenAI from browser_use import Agent -from browser_use.browser.browser import Browser, BrowserConfig +from browser_use.browser.browser import Browser import mcp.types as types from mcp.server.lowlevel import Server from dotenv import load_dotenv @@ -16,6 +17,7 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + # Browser context configuration config = BrowserContextConfig( wait_for_network_idle_page_load_time=0.6, @@ -38,6 +40,9 @@ # Flag to track browser context health browser_context_healthy = True +# Task storage for async operations +task_store = {} + async def reset_browser_context(): """Reset the browser context to a clean state.""" @@ -89,43 +94,74 @@ async def check_browser_health(): return browser_context_healthy -async def browser_use( - url: str, - action: str, -) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: - """Perform a browser action and return the results.""" - global browser_context_healthy +async def run_browser_task_async(task_id, url, action): + """Run a browser task asynchronously and store the result.""" + try: + # Update task status to running + task_store[task_id]["status"] = "running" + task_store[task_id]["start_time"] = datetime.now().isoformat() + task_store[task_id]["progress"] = { + "current_step": 0, + "total_steps": 0, + "steps": [], + } - headers = { - "User-Agent": "browser-use (github.com/co-browser/browser-use-mcp-server)", - } + # Check browser health + if not await check_browser_health(): + task_store[task_id]["status"] = "failed" + task_store[task_id]["end_time"] = datetime.now().isoformat() + task_store[task_id][ + "error" + ] = "Browser context is unhealthy and could not be reset" + return + + # Define step callback function with the correct signature + async def step_callback(browser_state, agent_output, step_number): + # Update progress in task store + task_store[task_id]["progress"]["current_step"] = step_number + task_store[task_id]["progress"]["total_steps"] = max( + task_store[task_id]["progress"]["total_steps"], step_number + ) - # Check browser health before proceeding - if not await check_browser_health(): - return [ - types.TextContent( - type="text", - text=json.dumps( - { - "final_result": "Browser context is unhealthy and could not be reset", - "success": False, - "has_errors": True, - "errors": [ - "Browser context is unhealthy and could not be reset" - ], - "urls_visited": [], - "actions_performed": [], - "extracted_content": [], - "steps_taken": 0, - }, - indent=2, - ), + # Add step info with minimal details + step_info = {"step": step_number, "time": datetime.now().isoformat()} + + # Add goal if available + if agent_output and hasattr(agent_output, "current_state"): + if hasattr(agent_output.current_state, "next_goal"): + step_info["goal"] = agent_output.current_state.next_goal + + # Add to progress steps + task_store[task_id]["progress"]["steps"].append(step_info) + + # Log progress + logger.info(f"Task {task_id}: Step {step_number} completed") + + # Define done callback function with the correct signature + async def done_callback(history): + # Log completion + logger.info(f"Task {task_id}: Completed with {len(history.history)} steps") + + # Add final step + current_step = task_store[task_id]["progress"]["current_step"] + 1 + task_store[task_id]["progress"]["steps"].append( + { + "step": current_step, + "time": datetime.now().isoformat(), + "status": "completed", + } ) - ] - try: - # Use the existing browser context - agent = Agent(task=action, llm=llm, browser_context=context) + # Use the existing browser context with callbacks + agent = Agent( + task=action, + llm=llm, + browser_context=context, + register_new_step_callback=step_callback, + register_done_callback=done_callback, + ) + + # Run the agent ret = await agent.run(max_steps=10) # Get the final result @@ -161,47 +197,34 @@ async def browser_use( "steps_taken": steps_taken, } - # Convert to JSON string - response_json = json.dumps(response_data, indent=2) - - return [types.TextContent(type="text", text=response_json)] + # Store the result + task_store[task_id]["status"] = "completed" + task_store[task_id]["end_time"] = datetime.now().isoformat() + task_store[task_id]["result"] = response_data except Exception as e: - logger.error(f"Error in browser_use: {str(e)}") + logger.error(f"Error in async browser task: {str(e)}") import traceback tb = traceback.format_exc() # Mark the browser context as unhealthy + global browser_context_healthy browser_context_healthy = False - # Return error information - error_message = { - "final_result": f"Error: {str(e)}", - "success": False, - "has_errors": True, - "errors": [str(e), tb], - "urls_visited": [], - "actions_performed": [], - "extracted_content": [], - "steps_taken": 0, - } - - return [ - types.TextContent(type="text", text=json.dumps(error_message, indent=2)) - ] + # Store the error + task_store[task_id]["status"] = "failed" + task_store[task_id]["end_time"] = datetime.now().isoformat() + task_store[task_id]["error"] = str(e) + task_store[task_id]["traceback"] = tb finally: # Always try to reset the browser context to a clean state after use - # This helps prevent issues with subsequent requests try: - # For now, we'll just navigate to about:blank to reset the page state - # This is less resource-intensive than creating a new context each time current_page = await context.get_current_page() await current_page.goto("about:blank") except Exception as e: logger.warning(f"Error resetting page state: {str(e)}") - # If we can't reset the page state, mark the context as unhealthy browser_context_healthy = False @@ -213,12 +236,7 @@ async def browser_use( default="stdio", help="Transport type", ) -@click.option( - "--timeout", - default=120, - help="Timeout in seconds for tool execution", -) -def main(port: int, transport: str, timeout: int) -> int: +def main(port: int, transport: str) -> int: load_dotenv() app = Server("browser_use") @@ -228,91 +246,111 @@ async def call_tool( ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: global browser_context_healthy - if name != "browser_use": - raise ValueError(f"Unknown tool: {name}") - if "url" not in arguments: - raise ValueError("Missing required argument 'url'") - if "action" not in arguments: - raise ValueError("Missing required argument 'action'") - - # Create a task for the browser_use function - browser_task = None + # Handle different tool types + if name == "browser_use": + # Check required arguments + if "url" not in arguments: + raise ValueError("Missing required argument 'url'") + if "action" not in arguments: + raise ValueError("Missing required argument 'action'") + + # Generate a task ID + task_id = str(uuid.uuid4()) + + # Initialize task in store + task_store[task_id] = { + "id": task_id, + "status": "pending", + "url": arguments["url"], + "action": arguments["action"], + "created_at": datetime.now().isoformat(), + } - try: - # Create a task for the browser_use function - browser_task = asyncio.create_task( - browser_use(arguments["url"], arguments["action"]) + # Start task in background + asyncio.create_task( + run_browser_task_async(task_id, arguments["url"], arguments["action"]) ) - # Wait for the task to complete with a timeout - try: - result = await asyncio.wait_for(browser_task, timeout=timeout) - return result - except asyncio.TimeoutError: - # Cancel the task if it times out - if browser_task and not browser_task.done(): - browser_task.cancel() + # Estimate task duration + estimated_seconds = 60 # Default estimate of 60 seconds - # Mark the browser context as unhealthy - browser_context_healthy = False + # Return task ID immediately with explicit sleep instruction + return [ + types.TextContent( + type="text", + text=json.dumps( + { + "task_id": task_id, + "status": "pending", + "message": f"Browser task started. Please wait for {estimated_seconds} seconds, then check the result using browser_get_result or the resource URI. Always wait exactly 5 seconds between status checks.", + "estimated_time": f"{estimated_seconds} seconds", + "resource_uri": f"resource://browser_task/{task_id}", + "sleep_command": f"sleep 5", + "instruction": f"Use the terminal command 'sleep 5' to wait 5 seconds between status checks. IMPORTANT: Always use exactly 5 seconds, no more and no less.", + }, + indent=2, + ), + ) + ] - # Schedule a reset of the browser context - asyncio.create_task(reset_browser_context()) + elif name == "browser_get_result": + # Get result of async task + if "task_id" not in arguments: + raise ValueError("Missing required argument 'task_id'") - # Return a meaningful error message if the operation times out - error_message = { - "final_result": "Operation timed out", - "success": False, - "has_errors": True, - "errors": [ - f"The operation exceeded the {timeout} second timeout limit" - ], - "urls_visited": [], - "actions_performed": [], - "extracted_content": [], - "steps_taken": 0, - } + task_id = arguments["task_id"] + + if task_id not in task_store: return [ types.TextContent( - type="text", text=json.dumps(error_message, indent=2) + type="text", + text=json.dumps( + {"error": "Task not found", "task_id": task_id}, indent=2 + ), ) ] - except Exception as e: - # Cancel the task if it's still running - if browser_task and not browser_task.done(): - browser_task.cancel() - # Handle other exceptions gracefully - import traceback + # Get the current task data + task_data = task_store[task_id].copy() - tb = traceback.format_exc() + # If task is still running, add simple guidance + if task_data["status"] == "running": + # Add a simple next check suggestion + progress = task_data.get("progress", {}) + current_step = progress.get("current_step", 0) - # Mark the browser context as unhealthy - browser_context_healthy = False + if current_step > 0: + # Simple message based on current step + task_data["message"] = ( + f"Task is running (step {current_step}). Wait 5 seconds before checking again." + ) + task_data["sleep_command"] = "sleep 5" + task_data["instruction"] = ( + "Use the terminal command 'sleep 5' to wait 5 seconds before checking again. IMPORTANT: Always use exactly 5 seconds, no more and no less." + ) + else: + task_data["message"] = ( + "Task is starting. Wait 5 seconds before checking again." + ) + task_data["sleep_command"] = "sleep 5" + task_data["instruction"] = ( + "Use the terminal command 'sleep 5' to wait 5 seconds before checking again. IMPORTANT: Always use exactly 5 seconds, no more and no less." + ) - # Schedule a reset of the browser context - asyncio.create_task(reset_browser_context()) - - error_message = { - "final_result": f"Error: {str(e)}", - "success": False, - "has_errors": True, - "errors": [str(e), tb], - "urls_visited": [], - "actions_performed": [], - "extracted_content": [], - "steps_taken": 0, - } + # Return current task status and result if available return [ - types.TextContent(type="text", text=json.dumps(error_message, indent=2)) + types.TextContent(type="text", text=json.dumps(task_data, indent=2)) ] + else: + raise ValueError(f"Unknown tool: {name}") + @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="browser_use", - description="takes a prompt representing an action to perform in the browser and returns detailed information about the execution", + description="Performs a browser action and returns a task ID for async execution", inputSchema={ "type": "object", "required": ["url", "action"], @@ -327,47 +365,64 @@ async def list_tools() -> list[types.Tool]: }, }, }, - outputSchema={ + ), + types.Tool( + name="browser_get_result", + description="Gets the result of an asynchronous browser task", + inputSchema={ "type": "object", + "required": ["task_id"], "properties": { - "final_result": { + "task_id": { "type": "string", - "description": "The final result of the browser action", - }, - "success": { - "type": "boolean", - "description": "Whether the action was successful", - }, - "has_errors": { - "type": "boolean", - "description": "Whether any errors occurred during execution", - }, - "errors": { - "type": "array", - "items": {"type": "string"}, - "description": "List of errors that occurred during execution", - }, - "urls_visited": { - "type": "array", - "items": {"type": "string"}, - "description": "List of URLs visited during execution", - }, - "actions_performed": { - "type": "array", - "items": {"type": "string"}, - "description": "List of actions performed during execution", - }, - "extracted_content": { - "type": "array", - "items": {"type": "string"}, - "description": "Content extracted during execution", - }, - "steps_taken": { - "type": "integer", - "description": "Number of steps taken during execution", - }, + "description": "ID of the task to get results for", + } }, }, + ), + ] + + @app.list_resources() + async def list_resources() -> list[types.Resource]: + # List all completed tasks as resources + resources = [] + for task_id, task_data in task_store.items(): + if task_data["status"] in ["completed", "failed"]: + resources.append( + types.Resource( + uri=f"resource://browser_task/{task_id}", + title=f"Browser Task Result: {task_id[:8]}", + description=f"Result of browser task for URL: {task_data.get('url', 'unknown')}", + ) + ) + return resources + + @app.read_resource() + async def read_resource(uri: str) -> list[types.ResourceContents]: + # Extract task ID from URI + if not uri.startswith("resource://browser_task/"): + return [ + types.ResourceContents( + type="text", + text=json.dumps( + {"error": f"Invalid resource URI: {uri}"}, indent=2 + ), + ) + ] + + task_id = uri.replace("resource://browser_task/", "") + if task_id not in task_store: + return [ + types.ResourceContents( + type="text", + text=json.dumps({"error": f"Task not found: {task_id}"}, indent=2), + ) + ] + + # Return task data + return [ + types.ResourceContents( + type="text", text=json.dumps(task_store[task_id], indent=2) ) ] @@ -405,24 +460,52 @@ async def handle_sse(request): # Add a startup event to initialize the browser @starlette_app.on_event("startup") async def startup_event(): - global browser, context, browser_context_healthy - try: - # Ensure browser and context are initialized - if not browser_context_healthy: - await reset_browser_context() - except Exception as e: - logger.error(f"Error during startup: {str(e)}") + logger.info("Starting browser context...") + await reset_browser_context() + logger.info("Browser context started") + + # Start background task cleanup + asyncio.create_task(cleanup_old_tasks()) - # Add a shutdown event to clean up resources @starlette_app.on_event("shutdown") async def shutdown_event(): - global browser, context - try: - # Close the browser and context - await context.close() - await browser.close() - except Exception as e: - logger.error(f"Error during shutdown: {str(e)}") + logger.info("Shutting down browser context...") + await browser.close() + logger.info("Browser context closed") + + async def cleanup_old_tasks(): + """Periodically clean up old completed tasks to prevent memory leaks.""" + while True: + try: + # Sleep first to avoid cleaning up tasks too early + await asyncio.sleep(3600) # Run cleanup every hour + + current_time = datetime.now() + tasks_to_remove = [] + + # Find completed tasks older than 1 hour + for task_id, task_data in task_store.items(): + if ( + task_data["status"] in ["completed", "failed"] + and "end_time" in task_data + ): + end_time = datetime.fromisoformat(task_data["end_time"]) + hours_elapsed = ( + current_time - end_time + ).total_seconds() / 3600 + + if hours_elapsed > 1: # Remove tasks older than 1 hour + tasks_to_remove.append(task_id) + + # Remove old tasks + for task_id in tasks_to_remove: + del task_store[task_id] + + if tasks_to_remove: + logger.info(f"Cleaned up {len(tasks_to_remove)} old tasks") + + except Exception as e: + logger.error(f"Error in task cleanup: {str(e)}") uvicorn.run(starlette_app, host="0.0.0.0", port=port) else: