From 13b7cff87e55d633003220494b705e950044190f Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 04:41:12 +0600 Subject: [PATCH 01/13] feat: removed deploy-local command --- runagent/cli/commands.py | 122 ++++++--------------------------------- 1 file changed, 18 insertions(+), 104 deletions(-) diff --git a/runagent/cli/commands.py b/runagent/cli/commands.py index 4c21898..c2ec6fc 100644 --- a/runagent/cli/commands.py +++ b/runagent/cli/commands.py @@ -511,105 +511,19 @@ def template(action_list, action_info, framework, template, filter_framework, fo @click.command() -@click.option("--folder", required=True, help="Folder containing agent files") -@click.option("--framework", help="Framework type (auto-detected if not specified)") -@click.option("--replace", help="Agent ID to replace (for capacity management)") -@click.option("--port", type=int, help="Preferred port (auto-allocated if unavailable)") -@click.option("--host", default="127.0.0.1", help="Preferred host") -def deploy_local(folder, framework, replace, port, host): - """Deploy agent locally for testing with automatic port allocation""" - - try: - sdk = RunAgent() - - # Validate folder - if not Path(folder).exists(): - raise click.ClickException(f"Folder not found: {folder}") - - console.print(f"šŸš€ [bold]Deploying agent locally with auto port allocation...[/bold]") - console.print(f"šŸ“ Source: [cyan]{folder}[/cyan]") - - if replace: - # Replace existing agent - result = sdk.db_service.replace_agent( - old_agent_id=replace, - new_agent_id=str(uuid.uuid4()), # Generate new ID - agent_path=folder, - host=host, - port=port, - framework=framework or detect_framework(folder), - ) - else: - # Add new agent with auto port allocation - import uuid - agent_id = str(uuid.uuid4()) - result = sdk.db_service.add_agent_with_auto_port( - agent_id=agent_id, - agent_path=folder, - framework=framework or detect_framework(folder), - status="deployed", - preferred_host=host, - preferred_port=port, - ) - - if result.get("success"): - agent_id = result.get("new_agent_id") if replace else result.get("agent_id") - allocated_host = result.get("allocated_host", host) - allocated_port = result.get("allocated_port", port) - - console.print(f"\nāœ… [green]Local deployment successful![/green]") - console.print(f"šŸ†” Agent ID: [bold magenta]{agent_id}[/bold magenta]") - console.print(f"šŸ”Œ Allocated Address: [bold blue]{allocated_host}:{allocated_port}[/bold blue]") - console.print(f"🌐 Endpoint: [link]http://{allocated_host}:{allocated_port}[/link]") - - if replace: - console.print(f"šŸ”„ Replaced agent: [yellow]{replace}[/yellow]") - - # Show capacity info - # capacity = sdk.get_local_capacity() - capacity_info = sdk.db_service.get_database_capacity_info() - - console.print( - f"šŸ“Š Capacity: [cyan]{capacity.get('current_count', 1)}/5[/cyan] slots used" - ) - - console.print(f"\nšŸ’” [bold]Next steps:[/bold]") - console.print(f" • Start server: [cyan]runagent serve {folder}[/cyan]") - console.print(f" • Test agent: [cyan]runagent run --id {agent_id} --local[/cyan]") - console.print(f" • Or use Python SDK:") - console.print(f" [dim]from runagent import RunAgentClient[/dim]") - console.print(f" [dim]client = RunAgentClient(agent_id='{agent_id}', local=True)[/dim]") - else: - error_code = result.get("error_code") - if error_code == "DATABASE_FULL": - capacity_info = result.get("capacity_info", {}) - console.print(f"\nāŒ [red]Database at full capacity![/red]") - console.print( - f"šŸ“Š Current: {capacity_info.get('current_count', 0)}/5 agents" - ) - - oldest_agent = capacity_info.get("oldest_agent", {}).get("agent_id") - if oldest_agent: - console.print(f"\nšŸ’” [yellow]Suggested command:[/yellow]") - console.print( - f"[cyan]runagent deploy-local --folder {folder} --replace {oldest_agent}[/cyan]" - ) - - raise click.ClickException(result.get("error")) - else: - raise click.ClickException(result.get("error")) - - except ValidationError as e: - console.print(f"āŒ [red]Validation error:[/red] {e}") - raise click.ClickException("Deployment failed") - except Exception as e: - console.print(f"āŒ [red]Deployment error:[/red] {e}") - raise click.ClickException("Deployment failed") - -@click.command() -@click.option("--folder", required=True, help="Folder containing agent files") -@click.option("--framework", help="Framework type (auto-detected if not specified)") -def upload(folder, framework): +@click.argument( + "path", + type=click.Path( + exists=True, + file_okay=False, + dir_okay=True, + readable=True, + resolve_path=True, + path_type=Path, + ), + default=".", +) +def upload(path: Path): """Upload agent to remote server""" try: @@ -623,14 +537,14 @@ def upload(folder, framework): raise click.ClickException("Authentication required") # Validate folder - if not Path(folder).exists(): - raise click.ClickException(f"Folder not found: {folder}") + if not Path(path).exists(): + raise click.ClickException(f"Folder not found: {path}") console.print(f"šŸ“¤ [bold]Uploading agent...[/bold]") - console.print(f"šŸ“ Source: [cyan]{folder}[/cyan]") + console.print(f"šŸ“ Source: [cyan]{path}[/cyan]") - # Upload agent - result = sdk.upload_agent(folder=folder, framework=framework) + # Upload agent (framework auto-detected) + result = sdk.upload_agent(folder=path) if result.get("success"): agent_id = result["agent_id"] From 9e893a98f798643f87068a5bd73607e11741ff65 Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 04:41:36 +0600 Subject: [PATCH 02/13] feat: deploy-local fully removed --- runagent/cli/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/runagent/cli/main.py b/runagent/cli/main.py index fbbce6f..8cf5907 100644 --- a/runagent/cli/main.py +++ b/runagent/cli/main.py @@ -14,7 +14,6 @@ def runagent(): runagent.add_command(commands.teardown) runagent.add_command(commands.init) runagent.add_command(commands.template) -runagent.add_command(commands.deploy_local) runagent.add_command(commands.upload) runagent.add_command(commands.start) runagent.add_command(commands.deploy) From c72be32d6eb891ad2df3c2fb7cc633b0cd280569 Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 04:42:21 +0600 Subject: [PATCH 03/13] feat: updated same upload_agent function names --- runagent/sdk/deployment/remote.py | 11 +++++------ runagent/sdk/rest_client.py | 3 +-- runagent/sdk/sdk.py | 4 ++-- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/runagent/sdk/deployment/remote.py b/runagent/sdk/deployment/remote.py index 0b69906..20a9a11 100644 --- a/runagent/sdk/deployment/remote.py +++ b/runagent/sdk/deployment/remote.py @@ -51,8 +51,8 @@ def deploy_agent( return self.client.deploy_agent(folder_path=str(folder_path), metadata=metadata) - def upload_agent( - self, folder_path: str, framework: t.Optional[str] = None + def upload_agent_to_server( + self, folder_path: Path, framework: t.Optional[str] = None ) -> t.Dict[str, t.Any]: """ Upload an agent without starting it. @@ -63,16 +63,15 @@ def upload_agent( Returns: Upload result """ - folder_path = Path(folder_path) if not folder_path.exists(): raise ValidationError(f"Folder not found: {folder_path}") # Auto-detect framework if not provided - if not framework: - framework = self._detect_framework(folder_path) + # if not framework: + # framework = self._detect_framework(folder_path) # metadata = {"framework": framework} - return self.client.upload_agent(folder_path=str(folder_path)) + return self.client.upload_agent_metadata_and_zip(folder_path=folder_path) def start_agent( self, agent_id: str, config: t.Optional[t.Dict[str, t.Any]] = None diff --git a/runagent/sdk/rest_client.py b/runagent/sdk/rest_client.py index 1264f28..60864b5 100644 --- a/runagent/sdk/rest_client.py +++ b/runagent/sdk/rest_client.py @@ -532,10 +532,9 @@ def _process_upload_result(self, result: Dict, upload_metadata: Dict) -> Dict: } return result - def upload_agent(self, folder_path: str) -> Dict: + def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: """Upload agent folder to middleware server with validation""" try: - folder_path = Path(folder_path) if not folder_path.exists(): return {"success": False, "error": f"Folder not found: {folder_path}"} diff --git a/runagent/sdk/sdk.py b/runagent/sdk/sdk.py index 3ba3fe3..013055f 100644 --- a/runagent/sdk/sdk.py +++ b/runagent/sdk/sdk.py @@ -232,7 +232,7 @@ def deploy_remote( ) def upload_agent( - self, folder: str, framework: t.Optional[str] = None + self, folder: Path ) -> t.Dict[str, t.Any]: """ Upload an agent to the remote server (without starting). @@ -248,7 +248,7 @@ def upload_agent( AuthenticationError: If not properly authenticated """ self._require_authentication() - return self.remote.upload_agent(folder_path=folder, framework=framework) + return self.remote.upload_agent_to_server(folder_path=folder) def start_remote_agent( self, agent_id: str, config: t.Optional[t.Dict[str, t.Any]] = None From 154381dcef71964244a2166b4e5e16850ee8d6be Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 04:56:06 +0600 Subject: [PATCH 04/13] feat: fingerprint feature added back to db --- runagent/sdk/db.py | 155 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/runagent/sdk/db.py b/runagent/sdk/db.py index 2337310..be291b9 100644 --- a/runagent/sdk/db.py +++ b/runagent/sdk/db.py @@ -45,6 +45,8 @@ class Agent(Base): port = Column(Integer, nullable=False, default=8000) framework = Column(String) status = Column(String, default="deployed") + is_local = Column(Boolean, default=True) # True for local agents, False for remote uploads + fingerprint = Column(String, nullable=True) # Agent folder fingerprint for duplicate detection deployed_at = Column(DateTime, default=func.current_timestamp()) last_run = Column(DateTime) run_count = Column(Integer, default=0) @@ -1259,6 +1261,8 @@ def get_agent_by_path(self, agent_path: str) -> Optional[Dict]: "port": agent.port, "framework": agent.framework, "status": agent.status, + "is_local": agent.is_local, + "fingerprint": agent.fingerprint, "deployed_at": ( agent.deployed_at.isoformat() if agent.deployed_at else None ), @@ -1277,6 +1281,157 @@ def get_agent_by_path(self, agent_path: str) -> Optional[Dict]: console.print(f"Error getting agent by path from database: {e}") return None + + def get_agent_by_fingerprint(self, fingerprint: str) -> Optional[Dict]: + """Get agent information by fingerprint""" + with self.db_manager.get_session() as session: + try: + agent = session.query(Agent).filter( + Agent.fingerprint == fingerprint + ).first() + + if not agent: + return None + + return { + "agent_id": agent.agent_id, + "agent_path": agent.agent_path, + "host": agent.host, + "port": agent.port, + "framework": agent.framework, + "status": agent.status, + "is_local": agent.is_local, + "fingerprint": agent.fingerprint, + "deployed_at": ( + agent.deployed_at.isoformat() if agent.deployed_at else None + ), + "last_run": agent.last_run.isoformat() if agent.last_run else None, + "run_count": agent.run_count, + "success_count": agent.success_count, + "error_count": agent.error_count, + "created_at": ( + agent.created_at.isoformat() if agent.created_at else None + ), + "updated_at": ( + agent.updated_at.isoformat() if agent.updated_at else None + ), + } + except Exception as e: + console.print(f"Error getting agent by fingerprint from database: {e}") + return None + + def add_remote_agent( + self, + agent_id: str, + agent_path: str, + framework: str = None, + fingerprint: str = None, + status: str = "uploaded" + ) -> Dict: + """ + Add a remote uploaded agent to the database + + Args: + agent_id: Unique agent identifier + agent_path: Path to agent directory (local path for reference) + framework: Framework type + fingerprint: Agent fingerprint for duplicate detection + status: Agent status (uploaded, uploading, deployed) + + Returns: + Dictionary with success status and details + """ + if not agent_id or not agent_path: + return { + "success": False, + "error": "Missing required fields", + "code": "INVALID_INPUT", + } + + with self.db_manager.get_session() as session: + try: + # Check if agent already exists by ID + existing_agent = ( + session.query(Agent).filter(Agent.agent_id == agent_id).first() + ) + if existing_agent: + return { + "success": False, + "error": f"Agent {agent_id} already exists", + "code": "AGENT_EXISTS", + "existing_agent": { + "agent_id": existing_agent.agent_id, + "status": existing_agent.status, + "is_local": existing_agent.is_local, + } + } + + # Check if agent with same fingerprint exists (for duplicate detection) + if fingerprint: + duplicate_agent = ( + session.query(Agent).filter(Agent.fingerprint == fingerprint).first() + ) + if duplicate_agent: + return { + "success": False, + "error": f"Agent with same fingerprint already exists", + "code": "DUPLICATE_FINGERPRINT", + "existing_agent": { + "agent_id": duplicate_agent.agent_id, + "status": duplicate_agent.status, + "is_local": duplicate_agent.is_local, + "fingerprint": duplicate_agent.fingerprint, + } + } + + # Create new remote agent record + new_agent = Agent( + agent_id=agent_id, + agent_path=str(agent_path), + host="remote", # Remote agents don't have local host/port + port=0, # Remote agents don't have local port + framework=framework, + status=status, + is_local=False, # Mark as remote + fingerprint=fingerprint, + ) + + session.add(new_agent) + session.commit() + + return { + "success": True, + "message": f"Remote agent {agent_id} added successfully", + "agent_id": agent_id, + "status": status, + "is_local": False, + } + + except Exception as e: + session.rollback() + return { + "success": False, + "error": f"Database error: {str(e)}", + "code": "DATABASE_ERROR", + } + + def update_agent_fingerprint(self, agent_id: str, fingerprint: str) -> bool: + """Update agent fingerprint""" + with self.db_manager.get_session() as session: + try: + agent = session.query(Agent).filter(Agent.agent_id == agent_id).first() + if not agent: + return False + + agent.fingerprint = fingerprint + agent.updated_at = func.current_timestamp() + session.commit() + return True + except Exception as e: + session.rollback() + console.print(f"Error updating agent fingerprint: {e}") + return False + def list_agents(self, status: str = None) -> List[Dict]: """List all agents, optionally filtered by status""" with self.db_manager.get_session() as session: From 59f77ad3750bc5491568613440bb7e140a4c86e4 Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 04:56:41 +0600 Subject: [PATCH 05/13] feat: added git validation & efficient connectivity check --- runagent/sdk/template_manager.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/runagent/sdk/template_manager.py b/runagent/sdk/template_manager.py index 82260b5..07d8241 100644 --- a/runagent/sdk/template_manager.py +++ b/runagent/sdk/template_manager.py @@ -2,6 +2,8 @@ Template management for the SDK. """ import os +import shutil +import subprocess import typing as t from pathlib import Path @@ -21,11 +23,24 @@ def __init__(self): ) def check_connectivity(self) -> bool: - """Check if template repository is accessible""" + """Check if template repository is accessible using git ls-remote (lightweight approach)""" + # Check if git is available before shelling out + if not shutil.which("git"): + raise RuntimeError("git is not installed or not found in PATH. Please install git to use template features.") + try: - self.list_available() - return True - except Exception: + # Use git ls-remote to check repository accessibility without cloning + # This is much faster than the previous approach of calling list_available() + result = subprocess.run( + ["git", "ls-remote", "--heads", self.downloader.repo_url, self.downloader.branch], + capture_output=True, + text=True, + timeout=10, # 10 second timeout + check=True + ) + # Check if the branch exists in the remote repository + return bool(result.stdout.strip()) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): if os.getenv('DISABLE_TRY_CATCH'): raise return False From 98e184256fc6c48e1c06816e059dd40739716aa6 Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 13:17:28 +0600 Subject: [PATCH 06/13] feat: agent run cli working for remote+local --- runagent/cli/commands.py | 50 +- runagent/client/client.py | 59 ++- runagent/sdk/rest_client.py | 135 ++++-- runagent/sdk/server/local_server.py | 723 ++++++++++++++++++++-------- runagent/sdk/socket_client.py | 14 +- runagent/utils/schema.py | 74 ++- 6 files changed, 758 insertions(+), 297 deletions(-) diff --git a/runagent/cli/commands.py b/runagent/cli/commands.py index c2ec6fc..3a14507 100644 --- a/runagent/cli/commands.py +++ b/runagent/cli/commands.py @@ -28,6 +28,18 @@ console = Console() +def format_error_message(error_info): + """Format error information from API responses""" + if isinstance(error_info, dict) and "message" in error_info: + # New format with ErrorDetail object + error_message = error_info.get("message", "Unknown error") + error_code = error_info.get("code", "UNKNOWN_ERROR") + return f"[{error_code}] {error_message}" + else: + # Fallback to old format for backward compatibility + return str(error_info) if error_info else "Unknown error" + + def print_version(ctx, param, value): """Custom version callback with colored output""" if not value or ctx.resilient_parsing: @@ -242,14 +254,16 @@ def delete(agent_id, yes): capacity_info = sdk.db_service.get_database_capacity_info() console.print(f"šŸ“Š Updated capacity: [cyan]{capacity_info.get('current_count', 0)}/5[/cyan] agents") else: - console.print(f"āŒ [red]Failed to delete agent: {result.get('error')}[/red]") - raise click.ClickException("Deletion failed") + console.print(f"āŒ [red]Failed to delete agent:[/red] {format_error_message(result.get('error'))}") + import sys + sys.exit(1) except Exception as e: if os.getenv('DISABLE_TRY_CATCH'): raise console.print(f"āŒ [red]Delete error:[/red] {e}") - raise click.ClickException("Delete failed") + import sys + sys.exit(1) @click.command() @@ -553,14 +567,18 @@ def upload(path: Path): console.print(f"\nšŸ’” [bold]Next step:[/bold]") console.print(f"[cyan]runagent start --id {agent_id}[/cyan]") else: - raise click.ClickException(result.get("error")) + console.print(f"āŒ [red]Upload failed:[/red] {format_error_message(result.get('error'))}") + import sys + sys.exit(1) except AuthenticationError as e: console.print(f"āŒ [red]Authentication error:[/red] {e}") - raise click.ClickException("Upload failed") + import sys + sys.exit(1) except Exception as e: console.print(f"āŒ [red]Upload error:[/red] {e}") - raise click.ClickException("Upload failed") + import sys + sys.exit(1) @click.command() @@ -599,14 +617,18 @@ def start(agent_id, config): console.print(f"\nāœ… [green]Agent started successfully![/green]") console.print(f"🌐 Endpoint: [link]{result.get('endpoint')}[/link]") else: - raise click.ClickException(result.get("error")) + console.print(f"āŒ [red]Start failed:[/red] {format_error_message(result.get('error'))}") + import sys + sys.exit(1) except AuthenticationError as e: console.print(f"āŒ [red]Authentication error:[/red] {e}") - raise click.ClickException("Start failed") + import sys + sys.exit(1) except Exception as e: console.print(f"āŒ [red]Start error:[/red] {e}") - raise click.ClickException("Start failed") + import sys + sys.exit(1) @click.command() @@ -667,7 +689,9 @@ def deploy(folder, agent_id, local, framework, config): ) console.print(f"🌐 Endpoint: [link]{result.get('endpoint')}[/link]") else: - raise click.ClickException(result.get("error")) + console.print(f"āŒ [red]Deployment failed:[/red] {format_error_message(result.get('error'))}") + import sys + sys.exit(1) elif agent_id: # Start existing agent @@ -1040,7 +1064,11 @@ def run(ctx, agent_id, host, port, input_file, local, tag, timeout): except Exception as e: if os.getenv('DISABLE_TRY_CATCH'): raise - raise click.ClickException(f"Execution failed: {e}") + # Display error with red āŒ symbol + console.print(f"āŒ [bold red]Execution failed:[/bold red] {e}") + # Exit with error code 1 instead of raising ClickException to avoid duplicate message + import sys + sys.exit(1) @click.group() diff --git a/runagent/client/client.py b/runagent/client/client.py index e5c88da..6ae29f9 100644 --- a/runagent/client/client.py +++ b/runagent/client/client.py @@ -31,48 +31,59 @@ def __init__(self, agent_id: str, entrypoint_tag: str, local: bool = True, host: console.print(f"šŸ” [cyan]Auto-resolved address for agent {agent_id}: {agent_host}:{agent_port}[/cyan]") - agent_base_url = f"http://{agent_host}:{agent_port}" - agent_socket_url = f"ws://{agent_host}:{agent_port}" - - self.rest_client = RestClient(base_url=agent_base_url, api_prefix="/api/v1") - self.socket_client = SocketClient( - base_socket_url=agent_socket_url, - api_prefix="/api/v1" - ) + agent_base_url_local = f"http://{agent_host}:{agent_port}" + agent_socket_url_local = f"ws://{agent_host}:{agent_port}" + + self.rest_client = RestClient(base_url=agent_base_url_local, api_prefix="/api/v1") + self.socket_client = SocketClient(base_socket_url=agent_socket_url_local) else: self.rest_client = RestClient() self.socket_client = SocketClient() - self.agent_architecture = self.rest_client.get_agent_architecture(agent_id) + # self.agent_architecture = self.rest_client.get_agent_architecture(agent_id) - selected_entrypoint = next( - ( - entrypoint for entrypoint in self.agent_architecture['entrypoints'] - if entrypoint['tag'] == entrypoint_tag - ), None) + # selected_entrypoint = next( + # ( + # entrypoint for entrypoint in self.agent_architecture['entrypoints'] + # if entrypoint['tag'] == entrypoint_tag + # ), None) - if not selected_entrypoint: - raise ValueError(f"Entrypoint `{entrypoint_tag}` not found in agent {agent_id}") + # if not selected_entrypoint: + # raise ValueError(f"Entrypoint `{entrypoint_tag}` not found in agent {agent_id}") - def _run(self, *input_args, **input_kwargs): + def run(self, *input_args, **input_kwargs): response = self.rest_client.run_agent( self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs ) if response.get("success"): - response_data = response.get("output_data") + # Handle new response format with nested data + if "data" in response and "result_data" in response["data"]: + response_data = response["data"]["result_data"].get("data") + else: + # Fallback to old format for backward compatibility + response_data = response.get("output_data") return self.serializer.deserialize_object(response_data) else: - raise Exception(response.get("error")) + # Handle new error format with ErrorDetail object + error_info = response.get("error") + if isinstance(error_info, dict) and "message" in error_info: + # New format with ErrorDetail object + error_message = error_info.get("message", "Unknown error") + error_code = error_info.get("code", "UNKNOWN_ERROR") + raise Exception(f"[{error_code}] {error_message}") + else: + # Fallback to old format for backward compatibility + raise Exception(response.get("error", "Unknown error")) def _run_stream(self, *input_args, **input_kwargs): return self.socket_client.run_stream( self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs ) - def run(self, *input_args, **input_kwargs): - if self.entrypoint_tag.endswith("_stream"): - return self._run_stream(*input_args, **input_kwargs) - else: - return self._run(*input_args, **input_kwargs) + # def run(self, *input_args, **input_kwargs): + # if self.entrypoint_tag.endswith("_stream"): + # return self._run_stream(*input_args, **input_kwargs) + # else: + # return self._run(*input_args, **input_kwargs) diff --git a/runagent/sdk/rest_client.py b/runagent/sdk/rest_client.py index 60864b5..d124ebc 100644 --- a/runagent/sdk/rest_client.py +++ b/runagent/sdk/rest_client.py @@ -137,9 +137,9 @@ def _request( handle_errors: bool = True, ) -> Union[Dict[str, Any], requests.Response]: """Core request method""" - + url = self._get_url(path) - + # Prepare headers request_headers = {} if headers: @@ -838,6 +838,90 @@ def _get_local_deployment_info(self, agent_id: str) -> Optional[Dict]: return None return None + + def run_agent( + self, + agent_id: str, + entrypoint_tag: str, + input_args: list = None, + input_kwargs: dict = None, + timeout_seconds: int = 60, + async_execution: bool = False, + ) -> Dict: + """Execute an agent with given parameters""" + try: + console.print(f"šŸ¤– Executing agent: [bold magenta]{agent_id}[/bold magenta]") + + # Prepare request data according to API specification + request_data = { + "entrypoint_tag": entrypoint_tag, + "input_args": input_args or [], + "input_kwargs": input_kwargs or {}, + "timeout_seconds": timeout_seconds, + "async_execution": async_execution + } + + # Execute the agent + try: + response = self.http.post( + f"/agents/{agent_id}/run", + data=request_data, + timeout=timeout_seconds + 10, # Add buffer to request timeout + ) + result = response.json() + + if result.get("success", True): # Assume success if not explicitly false + console.print("āœ… [bold green]Agent execution completed![/bold green]") + return result + else: + # Handle new error format with ErrorDetail object + error_info = result.get('error') + if isinstance(error_info, dict) and "message" in error_info: + # New format with ErrorDetail object - don't print here, let client handle it + pass + else: + # Fallback to old format for backward compatibility - don't print, let CLI handle it + pass + return result + + except (ClientError, ServerError, ConnectionError) as e: + return { + "success": False, + "data": None, + "message": None, + "error": { + "code": "CONNECTION_ERROR", + "message": f"Agent execution failed: {e.message}", + "details": None, + "field": None + }, + "timestamp": None, + "request_id": None + } + + except Exception as e: + return { + "success": False, + "data": None, + "message": None, + "error": { + "code": "INTERNAL_ERROR", + "message": f"Execute agent failed: {str(e)}", + "details": None, + "field": None + }, + "timestamp": None, + "request_id": None + } + + def get_agent_architecture(self, agent_id: str) -> Dict: + """Get the architecture information for a specific agent""" + try: + response = self.http.get(f"/agents/{agent_id}/architecture") + return response.json() + except Exception as e: + return {"success": False, "error": f"Failed to get architecture: {str(e)}"} + # runagent/sdk/rest_client.py - FIXED RestClient initialization # class RestClient: @@ -1007,53 +1091,6 @@ def _get_local_deployment_info(self, agent_id: str) -> Optional[Dict]: # except Exception as e: # raise Exception(f"Request error: {e}") -# def run_agent( -# self, -# agent_id: str, -# entrypoint_tag: str, -# input_args: list = None, -# input_kwargs: dict = None, -# execution_type: str = "generic", -# ) -> Dict: -# """Execute an agent with given parameters""" -# try: -# console.print(f"šŸ¤– Executing agent: [bold magenta]{agent_id}[/bold magenta]") - -# # Prepare request data -# request_data = { -# "input_data": {"input_args": input_args, "input_kwargs": input_kwargs} -# } - -# # Execute the agent -# try: -# response = self.http.post( -# f"/agents/{agent_id}/execute/{entrypoint_tag}", -# data=request_data, -# timeout=120, # Longer timeout for agent execution -# ) -# result = response.json() - -# if result.get("success", True): # Assume success if not explicitly false -# console.print("āœ… [bold green]Agent execution completed![/bold green]") -# return result -# else: -# console.print(f"āŒ [bold red]Agent execution failed: {result.get('error', 'Unknown error')}[/bold red]") -# return result - -# except (ClientError, ServerError, ConnectionError) as e: -# return {"success": False, "error": f"Agent execution failed: {e.message}"} - -# except Exception as e: -# return {"success": False, "error": f"Execute agent failed: {str(e)}"} - -# def get_agent_architecture(self, agent_id: str) -> Dict: -# """Get the architecture information for a specific agent""" -# try: -# response = self.http.get(f"/agents/{agent_id}/architecture") -# return response.json() -# except Exception as e: -# return {"success": False, "error": f"Failed to get architecture: {str(e)}"} - # def sync_local_agent(self, agent_data: Dict[str, Any]) -> Dict[str, Any]: # """Sync local agent to middleware""" diff --git a/runagent/sdk/server/local_server.py b/runagent/sdk/server/local_server.py index 05c627d..7525241 100644 --- a/runagent/sdk/server/local_server.py +++ b/runagent/sdk/server/local_server.py @@ -20,7 +20,7 @@ from runagent.sdk.db import DBService from runagent.sdk.server.framework import get_executor from runagent.utils.agent import detect_framework, get_agent_config -from runagent.utils.schema import AgentInfo, AgentRunRequest, AgentRunResponse, WebSocketActionType, WebSocketAgentRequest +from runagent.utils.schema import AgentInfo, AgentRunRequest, AgentRunResponse, AgentRunResponseV2, ExecutionData, ErrorDetail, WebSocketActionType, WebSocketAgentRequest from runagent.utils.imports import PackageImporter from runagent.utils.schema import MessageType from runagent.sdk.server.socket_utils import AgentWebSocketHandler @@ -140,198 +140,199 @@ async def _sync_agent_to_middleware_and_wait(self): return False - def create_endpoint_handler_with_tracking(self, runner, agent_id, entrypoint_tag): - """Create endpoint handler with invocation tracking and middleware sync - FIXED for simplified ID structure""" + # def create_endpoint_handler_with_tracking(self, runner, agent_id, entrypoint_tag): + # """Create endpoint handler with invocation tracking and middleware sync - FIXED for simplified ID structure""" - async def run_agent(request: AgentRunRequest): - """Run a deployed agent with full invocation tracking and middleware sync""" + # async def run_agent(request: AgentRunRequest): + # """Run a deployed agent with full invocation tracking and middleware sync""" - # Start local invocation tracking - invocation_id = self.db_service.start_invocation( - agent_id=agent_id, - input_data={ - "input_args": request.input_data.input_args, - "input_kwargs": request.input_data.input_kwargs - }, - entrypoint_tag=entrypoint_tag, - sdk_type="local_server", - client_info={ - "server_host": self.host, - "server_port": self.port, - "agent_name": self.agent_name, - "agent_framework": self.agent_framework.value - } - ) - - self.log_execution_start(invocation_id, entrypoint_tag) - - # Sync invocation start to middleware - FIXED for simplified structure - middleware_invocation_id = None - if (hasattr(self, 'middleware_sync') and - self.middleware_sync and - self.middleware_sync.is_sync_enabled()): + # # Start local invocation tracking + # invocation_id = self.db_service.start_invocation( + # agent_id=agent_id, + # input_data={ + # "input_args": request.input_args, + # "input_kwargs": request.input_kwargs + # }, + # entrypoint_tag=entrypoint_tag, + # sdk_type="local_server", + # client_info={ + # "server_host": self.host, + # "server_port": self.port, + # "agent_name": self.agent_name, + # "agent_framework": self.agent_framework.value + # } + # ) + + # self.log_execution_start(invocation_id, entrypoint_tag) + + # # Sync invocation start to middleware - FIXED for simplified structure + # middleware_invocation_id = None + # if (hasattr(self, 'middleware_sync') and + # self.middleware_sync and + # self.middleware_sync.is_sync_enabled()): - try: - # FIXED: Use simplified invocation structure - sync_payload = { - "agent_id": agent_id, # Main agent ID - "local_execution_id": invocation_id, # This becomes main execution ID in middleware - "input_data": { - "input_args": request.input_data.input_args, - "input_kwargs": request.input_data.input_kwargs - }, - "entrypoint_tag": entrypoint_tag, - "sdk_type": "local_server", - "client_info": { - "server_host": self.host, - "server_port": self.port, - "agent_name": self.agent_name, - "agent_framework": self.agent_framework.value - } - } + # try: + # # FIXED: Use simplified invocation structure + # sync_payload = { + # "agent_id": agent_id, # Main agent ID + # "local_execution_id": invocation_id, # This becomes main execution ID in middleware + # "input_data": { + # "input_args": request.input_data.input_args, + # "input_kwargs": request.input_data.input_kwargs + # }, + # "entrypoint_tag": entrypoint_tag, + # "sdk_type": "local_server", + # "client_info": { + # "server_host": self.host, + # "server_port": self.port, + # "agent_name": self.agent_name, + # "agent_framework": self.agent_framework.value + # } + # } - middleware_invocation_id = await self.middleware_sync.sync_invocation_start(sync_payload) - except Exception as e: - console.print(f"Middleware sync start failed: {e}") - - start_time = time.time() - execution_success = False - error_detail = None - result_data = None - - try: - console.print(f"Running agent: {agent_id} (invocation: {invocation_id}...)") - - result_data = await runner( - *request.input_data.input_args, **request.input_data.input_kwargs - ) - - result_str = self.serializer.serialize_object(result_data) - execution_time = time.time() - start_time - execution_success = True - self.log_execution_complete(invocation_id, True, execution_time) - - # Complete local invocation tracking with success - try: - serializable_output = self._convert_to_serializable(result_data) + # middleware_invocation_id = await self.middleware_sync.sync_invocation_start(sync_payload) + # except Exception as e: + # console.print(f"Middleware sync start failed: {e}") + + # start_time = time.time() + # execution_success = False + # error_detail = None + # result_data = None + + # try: + # console.print(f"Running agent: {agent_id} (invocation: {invocation_id}...)") + + # result_data = await runner( + # *request.input_data.input_args, **request.input_data.input_kwargs + # ) + + # result_str = self.serializer.serialize_object(result_data) + # execution_time = time.time() - start_time + # execution_success = True + # self.log_execution_complete(invocation_id, True, execution_time) + + # # Complete local invocation tracking with success + # try: + # serializable_output = self._convert_to_serializable(result_data) - self.db_service.complete_invocation( - invocation_id=invocation_id, - output_data=serializable_output, - execution_time_ms=execution_time * 1000 - ) - console.print("Local invocation tracking completed successfully") + # self.db_service.complete_invocation( + # invocation_id=invocation_id, + # output_data=serializable_output, + # execution_time_ms=execution_time * 1000 + # ) + # console.print("Local invocation tracking completed successfully") - except Exception as e: - console.print(f"Failed to complete local invocation tracking: {str(e)}") - try: - self.db_service.complete_invocation( - invocation_id=invocation_id, - output_data={ - "execution_completed": True, - "result_type": str(type(result_data)), - "result_length": len(str(result_data)) if result_data else 0, - "serialization_note": f"Could not serialize result: {str(e)}" - }, - execution_time_ms=execution_time * 1000 - ) - console.print("Local invocation tracking completed with safe fallback") - except Exception as e2: - console.print(f"Critical: Could not complete local invocation tracking: {str(e2)}") - - # Sync invocation completion to middleware - FIXED for simplified structure - if middleware_invocation_id: - try: - await self.middleware_sync.sync_invocation_complete( - middleware_invocation_id, # This is now the main execution ID in middleware - { - "output_data": serializable_output, - "execution_time_ms": execution_time * 1000 - } - ) - except Exception as e: - console.print(f"Failed to sync completion to middleware: {e}") - - # Record in original agent_runs table for backward compatibility - try: - serializable_input = { - "input_args": request.input_data.input_args, - "input_kwargs": request.input_data.input_kwargs - } + # except Exception as e: + # console.print(f"Failed to complete local invocation tracking: {str(e)}") + # try: + # self.db_service.complete_invocation( + # invocation_id=invocation_id, + # output_data={ + # "execution_completed": True, + # "result_type": str(type(result_data)), + # "result_length": len(str(result_data)) if result_data else 0, + # "serialization_note": f"Could not serialize result: {str(e)}" + # }, + # execution_time_ms=execution_time * 1000 + # ) + # console.print("Local invocation tracking completed with safe fallback") + # except Exception as e2: + # console.print(f"Critical: Could not complete local invocation tracking: {str(e2)}") + + # # Sync invocation completion to middleware - FIXED for simplified structure + # if middleware_invocation_id: + # try: + # await self.middleware_sync.sync_invocation_complete( + # middleware_invocation_id, # This is now the main execution ID in middleware + # { + # "output_data": serializable_output, + # "execution_time_ms": execution_time * 1000 + # } + # ) + # except Exception as e: + # console.print(f"Failed to sync completion to middleware: {e}") + + # # Record in original agent_runs table for backward compatibility + # try: + # serializable_input = { + # "input_args": request.input_args, + # "input_kwargs": request.input_kwargs + # } - self.db_service.record_agent_run( - agent_id=agent_id, - input_data=serializable_input, - output_data=result_str, - success=True, - execution_time=execution_time, - ) - except Exception as e: - console.print(f"Warning: Could not record to agent_runs table: {e}") - - console.print( - f"Agent {agent_id} execution completed successfully in " - f"{execution_time:.2f}s (invocation: {invocation_id}...)" - ) - - return AgentRunResponse( - success=True, - output_data=result_str, - error=None, - execution_time=execution_time, - agent_id=agent_id, - ) - - except Exception as e: - error_detail = f"Server error running agent {agent_id}: {str(e)}" - self.log_execution_error(invocation_id, e) - execution_time = time.time() - start_time - - # Complete local invocation tracking with error - self.db_service.complete_invocation( - invocation_id=invocation_id, - error_detail=error_detail, - execution_time_ms=execution_time * 1000 - ) - - # Sync invocation error to middleware - FIXED for simplified structure - if middleware_invocation_id: - try: - await self.middleware_sync.sync_invocation_complete( - middleware_invocation_id, # This is now the main execution ID in middleware - { - "error_detail": error_detail, - "execution_time_ms": execution_time * 1000 - } - ) - except Exception as sync_error: - console.print(f"Failed to sync error to middleware: {sync_error}") - - # Record in original agent_runs table for backward compatibility - try: - serializable_input = { - "input_args": request.input_data.input_args, - "input_kwargs": request.input_data.input_kwargs - } + # self.db_service.record_agent_run( + # agent_id=agent_id, + # input_data=serializable_input, + # output_data=result_str, + # success=True, + # execution_time=execution_time, + # ) + # except Exception as e: + # console.print(f"Warning: Could not record to agent_runs table: {e}") + + # console.print( + # f"Agent {agent_id} execution completed successfully in " + # f"{execution_time:.2f}s (invocation: {invocation_id}...)" + # ) + + # return AgentRunResponse( + # success=True, + # output_data=result_str, + # error=None, + # execution_time=execution_time, + # agent_id=agent_id, + # ) + + # except Exception as e: + # error_detail = f"Server error running agent {agent_id}: {str(e)}" + # self.log_execution_error(invocation_id, e) + # execution_time = time.time() - start_time + + # # Complete local invocation tracking with error + # self.db_service.complete_invocation( + # invocation_id=invocation_id, + # error_detail=error_detail, + # execution_time_ms=execution_time * 1000 + # ) + + # # Sync invocation error to middleware - FIXED for simplified structure + # if middleware_invocation_id: + # try: + # await self.middleware_sync.sync_invocation_complete( + # middleware_invocation_id, # This is now the main execution ID in middleware + # { + # "error_detail": error_detail, + # "execution_time_ms": execution_time * 1000 + # } + # ) + # except Exception as sync_error: + # console.print(f"Failed to sync error to middleware: {sync_error}") + + # # Record in original agent_runs table for backward compatibility + # try: + # serializable_input = { + # "input_args": request.input_data.input_args, + # "input_kwargs": request.input_data.input_kwargs + # } - self.db_service.record_agent_run( - agent_id=agent_id, - input_data=serializable_input, - output_data=None, - success=False, - error_message=error_detail, - execution_time=execution_time, - ) - except Exception as e: - console.print(f"Warning: Could not record to agent_runs table: {e}") - - console.print(f"{error_detail} (invocation: {invocation_id[:8]}...)") - - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_detail - ) + # self.db_service.record_agent_run( + # agent_id=agent_id, + # input_data=serializable_input, + # output_data=None, + # success=False, + # error_message=error_detail, + # execution_time=execution_time, + # ) + # except Exception as e: + # console.print(f"Warning: Could not record to agent_runs table: {e}") + + # console.print(f"{error_detail} (invocation: {invocation_id[:8]}...)") + + # raise HTTPException( + # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_detail + # ) - return run_agent + # return run_agent + # Add this method to show sync status in server info def get_server_info(self) -> dict: """Get server information including middleware sync status""" @@ -684,18 +685,360 @@ async def get_invocation_details(invocation_id: str): detail=f"Failed to get invocation: {str(e)}", ) + entrypoint_runner_dict = dict() # Setup dynamic entrypoint routes with invocation tracking for entrypoint in self.agent_architecture.entrypoints: + if entrypoint.tag.endswith("_stream"): - continue + runner = self.agentic_executor.get_stream_runner(entrypoint) + else: + runner = self.agentic_executor.get_runner(entrypoint) + + entrypoint_runner_dict[entrypoint.tag] = runner + + + + @self.app.exception_handler(HTTPException) + async def http_exception_handler(request, exc): + """Convert HTTPException to the new error format""" + return JSONResponse( + status_code=exc.status_code, + content=AgentRunResponseV2( + success=False, + data=None, + message=None, + error=ErrorDetail( + code="VALIDATION_ERROR" if exc.status_code == 400 else "NOT_FOUND" if exc.status_code == 404 else "INTERNAL_ERROR", + message=exc.detail, + details=None, + field=None + ), + timestamp=datetime.now().isoformat(), + request_id=str(uuid.uuid4()) + ).model_dump() + ) + + @self.app.exception_handler(Exception) + async def general_exception_handler(request, exc): + """Convert general exceptions to the new error format""" + return JSONResponse( + status_code=500, + content=AgentRunResponseV2( + success=False, + data=None, + message=None, + error=ErrorDetail( + code="INTERNAL_ERROR", + message=f"Agent execution failed: {str(exc)}", + details=None, + field=None + ), + timestamp=datetime.now().isoformat(), + request_id=str(uuid.uuid4()) + ).model_dump() + ) + + @self.app.post( + f"/api/v1/agents/{self.agent_id}/run", + response_model=AgentRunResponseV2 + ) + # (self.create_endpoint_handler_with_tracking(runner, self.agent_id, entrypoint.tag)) + async def run_agent(request: AgentRunRequest): + """Run a deployed agent with full invocation tracking and middleware sync""" + + if request.entrypoint_tag not in entrypoint_runner_dict: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Entrypoint {request.entrypoint_tag} not found" + ) - runner = self.agentic_executor.get_runner(entrypoint) + if request.entrypoint_tag.endswith("_stream"): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Can't use Streaming Entrypoint `{request.entrypoint_tag}` through non-streaming endpoint." + ) - self.app.post( - f"/api/v1/agents/{self.agent_id}/execute/{entrypoint.tag}", - response_model=AgentRunResponse - )(self.create_endpoint_handler_with_tracking(runner, self.agent_id, entrypoint.tag)) + runner = entrypoint_runner_dict[request.entrypoint_tag] + + # Start local invocation tracking + invocation_id = self.db_service.start_invocation( + agent_id=self.agent_id, + input_data={ + "input_args": request.input_args, + "input_kwargs": request.input_kwargs + }, + entrypoint_tag=request.entrypoint_tag, + sdk_type="local_server", + client_info={ + "server_host": self.host, + "server_port": self.port, + "agent_name": self.agent_name, + "agent_framework": self.agent_framework.value + } + ) + + self.log_execution_start(invocation_id, request.entrypoint_tag) + + # Sync invocation start to middleware - FIXED for simplified structure + middleware_invocation_id = None + if (hasattr(self, 'middleware_sync') and + self.middleware_sync and + self.middleware_sync.is_sync_enabled()): + + try: + # FIXED: Use simplified invocation structure + sync_payload = { + "agent_id": self.agent_id, # Main agent ID + "local_execution_id": invocation_id, # This becomes main execution ID in middleware + "input_data": { + "input_args": request.input_args, + "input_kwargs": request.input_kwargs + }, + "entrypoint_tag": request.entrypoint_tag, + "sdk_type": "local_server", + "client_info": { + "server_host": self.host, + "server_port": self.port, + "agent_name": self.agent_name, + "agent_framework": self.agent_framework.value + } + } + + middleware_invocation_id = await self.middleware_sync.sync_invocation_start(sync_payload) + except Exception as e: + console.print(f"Middleware sync start failed: {e}") + start_time = time.time() + execution_success = False + error_detail = None + result_data = None + + try: + console.print(f"Running agent: {self.agent_id} (invocation: {invocation_id}...)") + + result_data = await runner( + *request.input_args, **request.input_kwargs + ) + + result_str = self.serializer.serialize_object(result_data) + execution_time = time.time() - start_time + execution_success = True + self.log_execution_complete(invocation_id, True, execution_time) + + # Complete local invocation tracking with success + try: + serializable_output = self._convert_to_serializable(result_data) + + self.db_service.complete_invocation( + invocation_id=invocation_id, + output_data=serializable_output, + execution_time_ms=execution_time * 1000 + ) + console.print("Local invocation tracking completed successfully") + + except Exception as e: + console.print(f"Failed to complete local invocation tracking: {str(e)}") + try: + self.db_service.complete_invocation( + invocation_id=invocation_id, + output_data={ + "execution_completed": True, + "result_type": str(type(result_data)), + "result_length": len(str(result_data)) if result_data else 0, + "serialization_note": f"Could not serialize result: {str(e)}" + }, + execution_time_ms=execution_time * 1000 + ) + console.print("Local invocation tracking completed with safe fallback") + except Exception as e2: + console.print(f"Critical: Could not complete local invocation tracking: {str(e2)}") + + # Sync invocation completion to middleware - FIXED for simplified structure + if middleware_invocation_id: + try: + await self.middleware_sync.sync_invocation_complete( + middleware_invocation_id, # This is now the main execution ID in middleware + { + "output_data": serializable_output, + "execution_time_ms": execution_time * 1000 + } + ) + except Exception as e: + console.print(f"Failed to sync completion to middleware: {e}") + + # Record in original agent_runs table for backward compatibility + try: + serializable_input = { + "input_args": request.input_args, + "input_kwargs": request.input_kwargs + } + + self.db_service.record_agent_run( + agent_id=self.agent_id, + input_data=serializable_input, + output_data=result_str, + success=True, + execution_time=execution_time, + ) + except Exception as e: + console.print(f"Warning: Could not record to agent_runs table: {e}") + + console.print( + f"Agent {self.agent_id} execution completed successfully in " + f"{execution_time:.2f}s (invocation: {invocation_id}...)" + ) + + # Create execution data for the new response format + execution_data = ExecutionData( + execution_id=invocation_id, + agent_id=self.agent_id, + user_id=None, # Not available in local mode + deployment_id=None, # Not available in local mode + entrypoint_id=request.entrypoint_tag, + status="completed", + started_at=datetime.fromtimestamp(start_time).isoformat(), + completed_at=datetime.now().isoformat(), + runtime_seconds=execution_time, + input_data={ + "input_args": request.input_args, + "input_kwargs": request.input_kwargs + }, + result_data={ + "message": "", + "data": result_str, + "vm_id": str(uuid.uuid4()), # Mock VM ID for local execution + "type": "result", + "timestamp": datetime.now().isoformat() + }, + execution_metadata={ + "entrypoint_tag": request.entrypoint_tag, + "project_id": None, + "deployment_id": None, + "timeout_seconds": 60, + "async_execution": False, + "execution_config": {} + }, + error_message=None, + is_local=True, + agent_name=self.agent_name, + project_id=None, + project_name=None, + endpoint=None, + priority="normal", + success=True, + result={ + "message": "", + "data": result_str, + "vm_id": str(uuid.uuid4()), + "type": "result", + "timestamp": datetime.now().isoformat() + }, + error=None + ) + + return AgentRunResponseV2( + success=True, + data=execution_data, + message="Agent execution completed successfully", + error=None, + timestamp=datetime.now().isoformat(), + request_id=str(uuid.uuid4()) + ) + + except Exception as e: + error_detail = f"Server error running agent {self.agent_id}: {str(e)}" + self.log_execution_error(invocation_id, e) + execution_time = time.time() - start_time + + # Complete local invocation tracking with error + self.db_service.complete_invocation( + invocation_id=invocation_id, + error_detail=error_detail, + execution_time_ms=execution_time * 1000 + ) + + # Sync invocation error to middleware - FIXED for simplified structure + if middleware_invocation_id: + try: + await self.middleware_sync.sync_invocation_complete( + middleware_invocation_id, # This is now the main execution ID in middleware + { + "error_detail": error_detail, + "execution_time_ms": execution_time * 1000 + } + ) + except Exception as sync_error: + console.print(f"Failed to sync error to middleware: {sync_error}") + + # Record in original agent_runs table for backward compatibility + try: + serializable_input = { + "input_args": request.input_args, + "input_kwargs": request.input_kwargs + } + + self.db_service.record_agent_run( + agent_id=self.agent_id, + input_data=serializable_input, + output_data=None, + success=False, + error_message=error_detail, + execution_time=execution_time, + ) + except Exception as e: + console.print(f"Warning: Could not record to agent_runs table: {e}") + + console.print(f"{error_detail} (invocation: {invocation_id[:8]}...)") + + # Create execution data for the new response format (error case) + execution_data = ExecutionData( + execution_id=invocation_id, + agent_id=self.agent_id, + user_id=None, # Not available in local mode + deployment_id=None, # Not available in local mode + entrypoint_id=request.entrypoint_tag, + status="failed", + started_at=datetime.fromtimestamp(start_time).isoformat(), + completed_at=datetime.now().isoformat(), + runtime_seconds=execution_time, + input_data={ + "input_args": request.input_args, + "input_kwargs": request.input_kwargs + }, + result_data=None, + execution_metadata={ + "entrypoint_tag": request.entrypoint_tag, + "project_id": None, + "deployment_id": None, + "timeout_seconds": 60, + "async_execution": False, + "execution_config": {} + }, + error_message=error_detail, + is_local=True, + agent_name=self.agent_name, + project_id=None, + project_name=None, + endpoint=None, + priority="normal", + success=False, + result=None, + error=error_detail + ) + + return AgentRunResponseV2( + success=False, + data=None, + message=None, + error=ErrorDetail( + code="INTERNAL_ERROR", + message=f"Agent execution failed: {error_detail}", + details=None, + field=None + ), + timestamp=datetime.now().isoformat(), + request_id=str(uuid.uuid4()) + ) def _convert_to_serializable(self, obj): """Convert objects to JSON-serializable format""" diff --git a/runagent/sdk/socket_client.py b/runagent/sdk/socket_client.py index 232973a..f34dd4d 100644 --- a/runagent/sdk/socket_client.py +++ b/runagent/sdk/socket_client.py @@ -1,7 +1,7 @@ import websockets import asyncio from typing import AsyncIterator, Iterator, Optional -from runagent.utils.schema import WebSocketActionType, WebSocketAgentRequest, AgentInputArgs, MessageType, SafeMessage +from runagent.utils.schema import WebSocketActionType, WebSocketAgentRequest, MessageType, SafeMessage import json import uuid from typing import Any @@ -36,10 +36,8 @@ async def run_stream_async(self, agent_id: str, entrypoint_tag: str, *input_args request = WebSocketAgentRequest( action=WebSocketActionType.START_STREAM, agent_id=agent_id, - input_data=AgentInputArgs( - input_args=input_args, - input_kwargs=input_kwargs - ) + input_args=input_args, + input_kwargs=input_kwargs, ) start_msg = SafeMessage( @@ -85,10 +83,8 @@ def run_stream(self, agent_id: str, entrypoint_tag: str, input_args, input_kwarg request = WebSocketAgentRequest( action=WebSocketActionType.START_STREAM, agent_id=agent_id, - input_data=AgentInputArgs( - input_args=input_args, - input_kwargs=input_kwargs - ) + input_args=input_args, + input_kwargs=input_kwargs, ) start_msg = SafeMessage( diff --git a/runagent/utils/schema.py b/runagent/utils/schema.py index 6ea0efe..c44eaa8 100644 --- a/runagent/utils/schema.py +++ b/runagent/utils/schema.py @@ -99,16 +99,6 @@ def to_dict(self) -> dict: return data -class AgentInputArgs(BaseModel): - """Request model for agent execution""" - - input_args: t.List[t.Any] = Field( - default={}, description="Input data for agent invocation" - ) - input_kwargs: t.Dict[str, t.Any] = Field( - default={}, description="Input data for agent invocation" - ) - class WebSocketActionType(str, Enum): START_STREAM = "start_stream" @@ -120,19 +110,28 @@ class WebSocketAgentRequest(BaseModel): """WebSocket request model for agent streaming""" action: WebSocketActionType agent_id: str - input_data: AgentInputArgs + input_args: t.List[t.Any] = Field( + default_factory=list, description="Input data for positional arguments" + ) + input_kwargs: t.Dict[str, t.Any] = Field( + default_factory=dict, description="Input data for keyword arguments" + ) stream_config: t.Optional[t.Dict[str, t.Any]] = Field(default_factory=dict) # Pydantic Models class AgentRunRequest(BaseModel): """Request model for agent execution""" - - input_data: AgentInputArgs = Field( - default={}, description="Input data for agent invocation" + entrypoint_tag: str = Field(..., description="Entrypoint tag") + input_args: t.List[t.Any] = Field( + default_factory=list, description="Input data for positional arguments" + ) + input_kwargs: t.Dict[str, t.Any] = Field( + default_factory=dict, description="Input data for keyword arguments" ) + class AgentRunResponse(BaseModel): """Response model for agent execution""" @@ -143,6 +142,53 @@ class AgentRunResponse(BaseModel): agent_id: str +class ExecutionData(BaseModel): + """Execution data for the new response format""" + + execution_id: str + agent_id: str + user_id: t.Optional[str] = None + deployment_id: t.Optional[str] = None + entrypoint_id: t.Optional[str] = None + status: str + started_at: str + completed_at: t.Optional[str] = None + runtime_seconds: t.Optional[float] = None + input_data: t.Dict[str, t.Any] + result_data: t.Optional[t.Dict[str, t.Any]] = None + execution_metadata: t.Optional[t.Dict[str, t.Any]] = None + error_message: t.Optional[str] = None + is_local: bool = True + agent_name: t.Optional[str] = None + project_id: t.Optional[str] = None + project_name: t.Optional[str] = None + endpoint: t.Optional[str] = None + priority: str = "normal" + success: bool + result: t.Optional[t.Dict[str, t.Any]] = None + error: t.Optional[str] = None + + +class ErrorDetail(BaseModel): + """Error detail structure for API responses""" + + code: str + message: str + details: t.Optional[t.Any] = None + field: t.Optional[str] = None + + +class AgentRunResponseV2(BaseModel): + """New response model for agent execution with detailed execution data""" + + success: bool + data: t.Optional[ExecutionData] = None + message: t.Optional[str] = None + error: t.Optional[ErrorDetail] = None + timestamp: str + request_id: str + + class CapacityInfo(BaseModel): """Database capacity information""" From 8fd32e7ca8b2896f73a1dd60cdbefb2afc78fb4b Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 13:34:06 +0600 Subject: [PATCH 07/13] fix: fixed examples on run cmd --- runagent/cli/commands.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/runagent/cli/commands.py b/runagent/cli/commands.py index 3a14507..1ca439c 100644 --- a/runagent/cli/commands.py +++ b/runagent/cli/commands.py @@ -912,8 +912,11 @@ def run(ctx, agent_id, host, port, input_file, local, tag, timeout): # Using host/port with input file runagent run --host localhost --port 8080 --input config.json - # Generic streaming mode with extra params - runagent run --agent-id my-agent --generic-stream --debug=true --retries=3 + # local agent + runagent run --id d33c497d-d3f5-462e-8ff4-c28d819b92d6 --tag minimal --local --message=something + + # remote agent + runagent run --id d33c497d-d3f5-462e-8ff4-c28d819b92d6 --tag minimal --message=something """ # ============================================ @@ -940,17 +943,12 @@ def run(ctx, agent_id, host, port, input_file, local, tag, timeout): ) # ============================================ - # # VALIDATION 2: Generic mode selection + # # VALIDATION 2: tag validation # # ============================================ - # if generic and generic_stream: - # raise click.UsageError( - # "Cannot specify both --generic and --generic-stream. Choose one." - # ) + if tag.endswith("_stream"): + console.print(f"āŒ [bold red]Execution failed:[/bold red] Cannot use streaming Entrypoint tag `{tag}` through non-streaming endpoint.") + return - # # Default to generic mode if neither specified - # if not generic and not generic_stream: - # generic = True - # console.print("šŸ”§ Defaulting to --generic mode") # ============================================ # VALIDATION 3: Input file OR extra params From da54058c5674561a6435850de50dd4d8245808cd Mon Sep 17 00:00:00 2001 From: sawradip Date: Fri, 26 Sep 2025 21:57:23 +0600 Subject: [PATCH 08/13] feat: run-stream working locally, proper json format. --- runagent/cli/commands.py | 156 ++++++++ runagent/cli/main.py | 1 + runagent/client/client.py | 21 +- runagent/sdk/server/local_server.py | 38 +- runagent/sdk/server/socket_utils.py | 540 +++++++++++++++------------- runagent/sdk/socket_client.py | 98 +++-- runagent/utils/schema.py | 2 +- 7 files changed, 517 insertions(+), 339 deletions(-) diff --git a/runagent/cli/commands.py b/runagent/cli/commands.py index 1ca439c..23b227a 100644 --- a/runagent/cli/commands.py +++ b/runagent/cli/commands.py @@ -1069,6 +1069,162 @@ def run(ctx, agent_id, host, port, input_file, local, tag, timeout): sys.exit(1) +@click.command( + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + )) +@click.option("--id", "agent_id", help="Agent ID to run") +@click.option("--host", help="Host to connect to (use with --port)") +@click.option("--port", type=int, help="Port to connect to (use with --host)") +@click.option( + "--input", + "input_file", + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path), + help="Path to input JSON file" +) +@click.option("--local", is_flag=True, help="Run agent locally") +@click.option("--tag", required=True, help="Entrypoint tag to be used") +@click.option("--timeout", type=int, help="Timeout in seconds") +@click.pass_context +def run_stream(ctx, agent_id, host, port, input_file, local, tag, timeout): + """ + Stream agent execution results in real-time. + + This command connects to an agent via WebSocket and streams the execution results + as they become available, providing real-time feedback. + + Examples: + # Local streaming agent + runagent run-stream --id d33c497d-d3f5-462e-8ff4-c28d819b92d6 --tag minimal_stream --local --message=something + + # Remote streaming agent + runagent run-stream --id d33c497d-d3f5-462e-8ff4-c28d819b92d6 --tag minimal_stream --message=something + + # With input file + runagent run-stream --id d33c497d-d3f5-462e-8ff4-c28d819b92d6 --tag minimal_stream --local --input config.json + """ + + # ============================================ + # PARAMETER PARSING + # ============================================ + + extra_params = {} + for item in ctx.args: + if '=' in item: + key, value = item.split('=', 1) + # Remove leading dashes + key = key.lstrip('-') + extra_params[key] = value + else: + # Handle boolean flags + key = item.lstrip('-') + extra_params[key] = True + + # ============================================ + # VALIDATION + # ============================================ + + # VALIDATION 1: Agent ID or host/port required + if not agent_id and not (host and port): + console.print(f"āŒ [bold red]Execution failed:[/bold red] Either --id or both --host and --port are required") + import sys + sys.exit(1) + + # VALIDATION 2: tag validation for streaming + if not tag.endswith("_stream"): + console.print(f"āŒ [bold red]Execution failed:[/bold red] Streaming command requires entrypoint tag ending with '_stream'. Got: {tag}") + import sys + sys.exit(1) + + # ============================================ + # DISPLAY CONFIGURATION + # ============================================ + + console.print("šŸš€ RunAgent Streaming Configuration:") + + # Connection info + if agent_id: + console.print(f" Agent ID: [cyan]{agent_id}[/cyan]") + else: + console.print(f" Host: [cyan]{host}[/cyan]") + console.print(f" Port: [cyan]{port}[/cyan]") + + # Tag + console.print(f" Tag: [magenta]{tag}[/magenta]") + + # Local execution + if local: + console.print(" Local: [green]Yes[/green]") + else: + console.print(" Local: [red]No (Deployed to RunAgent Cloud)[/red]") + + # Timeout + if timeout: + console.print(f" Timeout: [yellow]{timeout}s[/yellow]") + + # Input configuration + if input_file: + console.print(f" Input file: [blue]{input_file}[/blue]") + # Load and validate JSON file here + try: + import json + with open(input_file, 'r') as f: + input_params = json.load(f) + console.print(f" Config keys: [dim]{list(input_params.keys())}[/dim]") + except json.JSONDecodeError: + if os.getenv('DISABLE_TRY_CATCH'): + raise + console.print(f"āŒ [bold red]Execution failed:[/bold red] Invalid JSON in input file: {input_file}") + import sys + sys.exit(1) + except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + console.print(f"āŒ [bold red]Execution failed:[/bold red] Error reading input file: {e}") + import sys + sys.exit(1) + + elif extra_params: + console.print(" Extra parameters:") + for key, value in extra_params.items(): + console.print(f" --{key} = {value}") + input_params = extra_params + + else: + input_params = {} + + # ============================================ + # EXECUTION LOGIC + # ============================================ + + try: + ra_client = RunAgentClient( + agent_id=agent_id, + local=local, + host=host, + port=port, + entrypoint_tag=tag + ) + + console.print(f"\nšŸ”„ [bold]Starting streaming execution...[/bold]") + console.print(f"šŸ“” [dim]Connected to agent via WebSocket[/dim]") + console.print(f"šŸ“¤ [dim]Streaming results:[/dim]\n") + + # Stream the results + for chunk in ra_client.run_stream(**input_params): + console.print(chunk) + + except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + # Display error with red āŒ symbol + console.print(f"āŒ [bold red]Streaming failed:[/bold red] {e}") + # Exit with error code 1 instead of raising ClickException to avoid duplicate message + import sys + sys.exit(1) + + @click.group() def db(): """Database management and monitoring commands""" diff --git a/runagent/cli/main.py b/runagent/cli/main.py index 8cf5907..240e30a 100644 --- a/runagent/cli/main.py +++ b/runagent/cli/main.py @@ -19,6 +19,7 @@ def runagent(): runagent.add_command(commands.deploy) runagent.add_command(commands.serve) runagent.add_command(commands.run) +runagent.add_command(commands.run_stream) runagent.add_command(commands.delete) runagent.add_command(commands.db) runagent.add_command(commands.local_sync) diff --git a/runagent/client/client.py b/runagent/client/client.py index 6ae29f9..6d16708 100644 --- a/runagent/client/client.py +++ b/runagent/client/client.py @@ -77,13 +77,16 @@ def run(self, *input_args, **input_kwargs): # Fallback to old format for backward compatibility raise Exception(response.get("error", "Unknown error")) - def _run_stream(self, *input_args, **input_kwargs): - return self.socket_client.run_stream( - self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs - ) + def run_stream(self, *input_args, **input_kwargs): + """Stream agent execution results in real-time via WebSocket""" + try: + return self.socket_client.run_stream( + self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs + ) + except Exception as e: + # Handle streaming errors with proper formatting + raise Exception(f"Streaming failed: {str(e)}") - # def run(self, *input_args, **input_kwargs): - # if self.entrypoint_tag.endswith("_stream"): - # return self._run_stream(*input_args, **input_kwargs) - # else: - # return self._run(*input_args, **input_kwargs) + def _run_stream(self, *input_args, **input_kwargs): + """Legacy method - use run_stream instead""" + return self.run_stream(*input_args, **input_kwargs) diff --git a/runagent/sdk/server/local_server.py b/runagent/sdk/server/local_server.py index 7525241..81e11a7 100644 --- a/runagent/sdk/server/local_server.py +++ b/runagent/sdk/server/local_server.py @@ -90,7 +90,7 @@ def __init__( self._setup_logging() self.app = self._setup_fastapi_app() - self._setup_websocket_routes() + # self._setup_websocket_routes() self._setup_routes() self.agent_synced_to_middleware = False @@ -697,6 +697,12 @@ async def get_invocation_details(invocation_id: str): entrypoint_runner_dict[entrypoint.tag] = runner + @self.app.websocket(f"/api/v1/agents/{self.agent_id}/run-stream") + async def run_agent_stream(websocket: WebSocket, agent_id: str = self.agent_id): + + await self.websocket_handler.handle_agent_stream_with_tracking( + websocket, agent_id, entrypoint_runner_dict, self.db_service + ) @self.app.exception_handler(HTTPException) async def http_exception_handler(request, exc): @@ -1087,24 +1093,24 @@ def _convert_to_serializable(self, obj): "str": str(obj)[:500] # Limit length } - def _setup_websocket_routes(self): - """Setup WebSocket routes with invocation tracking""" - for entrypoint in self.agent_architecture.entrypoints: - if not entrypoint.tag.endswith("_stream"): - continue + # def _setup_websocket_routes(self): + # """Setup WebSocket routes with invocation tracking""" + # for entrypoint in self.agent_architecture.entrypoints: + # if not entrypoint.tag.endswith("_stream"): + # continue - stream_runner = self.agentic_executor.get_stream_runner(entrypoint) + # stream_runner = self.agentic_executor.get_stream_runner(entrypoint) - # Create a separate function for each entrypoint with invocation tracking - def make_websocket_handler_with_tracking(runner, entrypoint_tag): # Factory function - @self.app.websocket(f"/api/v1/agents/{self.agent_id}/execute/{entrypoint_tag}") - async def websocket_endpoint(websocket: WebSocket, agent_id: str = self.agent_id): - await self.websocket_handler.handle_agent_stream_with_tracking( - websocket, agent_id, runner, entrypoint_tag, self.db_service - ) - return websocket_endpoint + # # Create a separate function for each entrypoint with invocation tracking + # def make_websocket_handler_with_tracking(runner, entrypoint_tag): # Factory function + # @self.app.websocket(f"/api/v1/agents/{self.agent_id}/run-stream") + # async def websocket_endpoint(websocket: WebSocket, agent_id: str = self.agent_id): + # await self.websocket_handler.handle_agent_stream_with_tracking( + # websocket, agent_id, runner, entrypoint_tag, self.db_service + # ) + # return websocket_endpoint - make_websocket_handler_with_tracking(stream_runner, entrypoint.tag) + # make_websocket_handler_with_tracking(stream_runner, entrypoint.tag) def extract_endpoints(self): """Extract all endpoints from the FastAPI app""" diff --git a/runagent/sdk/server/socket_utils.py b/runagent/sdk/server/socket_utils.py index 075ebd3..84501cb 100644 --- a/runagent/sdk/server/socket_utils.py +++ b/runagent/sdk/server/socket_utils.py @@ -63,8 +63,7 @@ async def handle_agent_stream_with_tracking( self, websocket: WebSocket, agent_id: str, - stream_runner, - entrypoint_tag: str, + entrypoint_runner_dict: dict, db_service ): """Handle streaming execution - FIXED for middleware sync""" @@ -74,13 +73,58 @@ async def handle_agent_stream_with_tracking( middleware_invocation_id = None # FIXED: Add middleware invocation ID tracking try: - # Wait for start message + # Wait for start message - expect direct JSON format data = await websocket.receive_text() - start_message = self.serializer.deserialize_message(data) - request_data = start_message.data - input_args = request_data.get("input_data", {}).get("input_args", []) - input_kwargs = request_data.get("input_data", {}).get("input_kwargs", {}) + # Parse the direct JSON request format + try: + request_data = json.loads(data) + except json.JSONDecodeError as e: + await websocket.send_json({ + "type": "error", + "detail": f"Invalid JSON format: {str(e)}" + }) + await websocket.close(code=1003) + return + + # Validate required fields + required_fields = ["entrypoint_tag", "input_args", "input_kwargs"] + for field in required_fields: + if field not in request_data: + await websocket.send_json({ + "type": "error", + "detail": f"Missing required field: {field}" + }) + await websocket.close(code=1003) + return + + entrypoint_tag = request_data["entrypoint_tag"] + + # For WebSocket, we want to check if the entrypoint exists and is a streaming entrypoint. + if entrypoint_tag not in entrypoint_runner_dict: + await websocket.send_json({ + "type": "error", + "detail": f"Entrypoint {entrypoint_tag} not found" + }) + await websocket.close(code=1003) # 1003 = unsupported data + return + + # For WebSocket, we REQUIRE the entrypoint to be a streaming entrypoint + if not entrypoint_tag.endswith("_stream"): + await websocket.send_json({ + "type": "error", + "detail": f"Entrypoint `{entrypoint_tag}` is not a streaming entrypoint. Use a streaming entrypoint (ending with '_stream')." + }) + await websocket.close(code=1003) + return + + stream_runner = entrypoint_runner_dict[entrypoint_tag] + + # Extract input data from the new format + input_args = request_data.get("input_args", []) + input_kwargs = request_data.get("input_kwargs", {}) + timeout_seconds = request_data.get("timeout_seconds", 60) + async_execution = request_data.get("async_execution", False) # Start LOCAL invocation tracking first invocation_id = self.db_service.start_invocation( @@ -137,19 +181,13 @@ async def handle_agent_stream_with_tracking( except Exception as e: console.print(f"āŒ [red]Middleware sync start failed: {e}[/red]") - # Send stream started status to client - await websocket.send_text(self.serializer.serialize_message( - SafeMessage( - id="stream_status", - type=MessageType.STATUS, - timestamp=datetime.now(timezone.utc).isoformat(), - data={ - "status": "stream_started", - "invocation_id": invocation_id, - "middleware_invocation_id": middleware_invocation_id - } - ) - )) + # Send stream started status to client (simple JSON format) + await websocket.send_text(json.dumps({ + "type": "status", + "status": "stream_started", + "invocation_id": invocation_id, + "middleware_invocation_id": middleware_invocation_id + })) start_time = time.time() chunk_count = 0 @@ -174,16 +212,11 @@ async def handle_agent_stream_with_tracking( "chunk_preview": str(chunk) }) - # Send chunk to client - chunk_message = SafeMessage( - id=f"chunk_{uuid.uuid4()}", - type=MessageType.DATA, - timestamp=datetime.now(timezone.utc).isoformat(), - data={"content": chunk} - ) - - serialized_chunk = self.serializer.serialize_message(chunk_message) - await websocket.send_text(serialized_chunk) + # Send chunk to client (simple JSON format) + await websocket.send_text(json.dumps({ + "type": "data", + "content": chunk + })) # Streaming completed successfully execution_time = time.time() - start_time @@ -240,21 +273,15 @@ async def handle_agent_stream_with_tracking( except Exception as e: console.print(f"āŒ [red]Middleware sync completion error: {e}[/red]") - # Send completion status to client - await websocket.send_text(self.serializer.serialize_message( - SafeMessage( - id="stream_complete", - type=MessageType.STATUS, - timestamp=datetime.now(timezone.utc).isoformat(), - data={ - "status": "stream_completed", - "total_chunks": chunk_count, - "execution_time": execution_time, - "invocation_id": invocation_id, - "middleware_synced": middleware_invocation_id is not None - } - ) - )) + # Send completion status to client (simple JSON format) + await websocket.send_text(json.dumps({ + "type": "status", + "status": "stream_completed", + "total_chunks": chunk_count, + "execution_time": execution_time, + "invocation_id": invocation_id, + "middleware_synced": middleware_invocation_id is not None + })) except Exception as stream_error: # Streaming failed @@ -290,16 +317,11 @@ async def handle_agent_stream_with_tracking( except Exception as sync_error: console.print(f"āŒ [red]Middleware sync error failed: {sync_error}[/red]") - # Send error to client - await websocket.send_text(self.serializer.serialize_message( - SafeMessage( - id="stream_error", - type=MessageType.ERROR, - timestamp=datetime.now(timezone.utc).isoformat(), - data={"error": error_detail}, - error=error_detail - ) - )) + # Send error to client (simple JSON format) + await websocket.send_text(json.dumps({ + "type": "error", + "error": error_detail + })) except WebSocketDisconnect: console.print(f"WebSocket disconnected for agent {agent_id}") @@ -443,218 +465,218 @@ async def _handle_stream_start(self, websocket: WebSocket, request: WebSocketAge finally: self._cleanup_stream(connection_id) - async def _handle_stream_start_with_tracking( - self, - websocket: WebSocket, - request: WebSocketAgentRequest, - connection_id: str, - agent_execution_streamer: Callable, - invocation_id: str, - db_service, - middleware_invocation_id: str = None - ): - """Enhanced stream start with invocation tracking and middleware sync""" - start_time = time.time() - chunk_count = 0 - stream_output_data = [] - error_detail = None + # async def _handle_stream_start_with_tracking( + # self, + # websocket: WebSocket, + # request: WebSocketAgentRequest, + # connection_id: str, + # agent_execution_streamer: Callable, + # invocation_id: str, + # db_service, + # middleware_invocation_id: str = None + # ): + # """Enhanced stream start with invocation tracking and middleware sync""" + # start_time = time.time() + # chunk_count = 0 + # stream_output_data = [] + # error_detail = None - try: - # Send stream started status - await self._send_status(websocket, "stream_started", { - "agent_id": request.agent_id, - "invocation_id": invocation_id, - "middleware_invocation_id": middleware_invocation_id, # NEW: Include middleware ID - "input_args": request.input_data.input_args, - "input_kwargs": list(request.input_data.input_kwargs.keys()) - }) - - console.print(f"Starting tracked stream for agent: [cyan]{request.agent_id}[/cyan] (invocation: {invocation_id}...)") - if middleware_invocation_id: - console.print(f"[dim]Middleware invocation: {middleware_invocation_id}...[/dim]") - - console.print(f"Input args: [cyan]{request.input_data.input_args}[/cyan]") - console.print(f"Input kwargs: [cyan]{request.input_data.input_kwargs}[/cyan]") - - # Track active stream - self.active_streams[connection_id] = { - "agent_id": request.agent_id, - "invocation_id": invocation_id, - "middleware_invocation_id": middleware_invocation_id, # NEW: Track middleware ID - "start_time": start_time, - "chunk_count": 0 - } - - # Start the streaming iteration - async for chunk in self._safe_agent_stream( - agent_execution_streamer, - *request.input_data.input_args, - **request.input_data.input_kwargs - ): - # Check if stream is still active - if connection_id not in self.active_streams: - break + # try: + # # Send stream started status + # await self._send_status(websocket, "stream_started", { + # "agent_id": request.agent_id, + # "invocation_id": invocation_id, + # "middleware_invocation_id": middleware_invocation_id, # NEW: Include middleware ID + # "input_args": request.input_data.input_args, + # "input_kwargs": list(request.input_data.input_kwargs.keys()) + # }) + + # console.print(f"Starting tracked stream for agent: [cyan]{request.agent_id}[/cyan] (invocation: {invocation_id}...)") + # if middleware_invocation_id: + # console.print(f"[dim]Middleware invocation: {middleware_invocation_id}...[/dim]") + + # console.print(f"Input args: [cyan]{request.input_data.input_args}[/cyan]") + # console.print(f"Input kwargs: [cyan]{request.input_data.input_kwargs}[/cyan]") + + # # Track active stream + # self.active_streams[connection_id] = { + # "agent_id": request.agent_id, + # "invocation_id": invocation_id, + # "middleware_invocation_id": middleware_invocation_id, # NEW: Track middleware ID + # "start_time": start_time, + # "chunk_count": 0 + # } + + # # Start the streaming iteration + # async for chunk in self._safe_agent_stream( + # agent_execution_streamer, + # *request.input_data.input_args, + # **request.input_data.input_kwargs + # ): + # # Check if stream is still active + # if connection_id not in self.active_streams: + # break - chunk_count += 1 - self.active_streams[connection_id]["chunk_count"] = chunk_count + # chunk_count += 1 + # self.active_streams[connection_id]["chunk_count"] = chunk_count - # Convert chunk to serializable format before storing - try: - serializable_chunk = self._convert_to_serializable(chunk) + # # Convert chunk to serializable format before storing + # try: + # serializable_chunk = self._convert_to_serializable(chunk) - # Store chunk for final output tracking (limit to prevent memory issues) - if chunk_count <= 100: - stream_output_data.append(serializable_chunk) + # # Store chunk for final output tracking (limit to prevent memory issues) + # if chunk_count <= 100: + # stream_output_data.append(serializable_chunk) - except Exception as e: - console.print(f"āš ļø [yellow]Warning: Could not serialize chunk {chunk_count}: {e}[/yellow]") - # Store a safe representation - stream_output_data.append({ - "chunk_number": chunk_count, - "serialization_error": str(e), - "chunk_type": str(type(chunk)), - "chunk_preview": str(chunk)[:100] + "..." if len(str(chunk)) > 100 else str(chunk) - }) + # except Exception as e: + # console.print(f"āš ļø [yellow]Warning: Could not serialize chunk {chunk_count}: {e}[/yellow]") + # # Store a safe representation + # stream_output_data.append({ + # "chunk_number": chunk_count, + # "serialization_error": str(e), + # "chunk_type": str(type(chunk)), + # "chunk_preview": str(chunk)[:100] + "..." if len(str(chunk)) > 100 else str(chunk) + # }) - raw_data_msg = SafeMessage( - id="raw_chunk", - type=MessageType.RAW_DATA, - timestamp="", - data=chunk - ) - # Send chunk with appropriate message type - serialized_chunk = self.serializer.serialize_message(raw_data_msg) - await websocket.send_text(serialized_chunk) + # raw_data_msg = SafeMessage( + # id="raw_chunk", + # type=MessageType.RAW_DATA, + # timestamp="", + # data=chunk + # ) + # # Send chunk with appropriate message type + # serialized_chunk = self.serializer.serialize_message(raw_data_msg) + # await websocket.send_text(serialized_chunk) - # Small delay to prevent overwhelming the client - await asyncio.sleep(0) - - # Calculate execution time - execution_time = time.time() - start_time - - # Send completion status - await self._send_status(websocket, "stream_completed", { - "agent_id": request.agent_id, - "invocation_id": invocation_id, - "middleware_invocation_id": middleware_invocation_id, # NEW: Include middleware ID - "total_chunks": chunk_count, - "execution_time": execution_time - }) - - # Complete local invocation tracking with success - try: - db_service.complete_invocation( - invocation_id=invocation_id, - output_data={ - "stream_completed": True, - "total_chunks": chunk_count, - "sample_chunks": stream_output_data[:10] if stream_output_data else [], - "execution_time_seconds": execution_time, - "chunk_summary": { - "total_chunks": chunk_count, - "stored_samples": min(len(stream_output_data), 10), - "stream_type": "websocket" - } - }, - execution_time_ms=execution_time * 1000 - ) - console.print(f"āœ… [green]Local invocation tracking completed successfully[/green]") - except Exception as e: - console.print(f"āŒ [red]Failed to complete local invocation tracking: {str(e)}[/red]") - # Try to complete with minimal data - try: - db_service.complete_invocation( - invocation_id=invocation_id, - output_data={ - "stream_completed": True, - "total_chunks": chunk_count, - "execution_time_seconds": execution_time, - "serialization_note": "Some chunks could not be serialized" - }, - execution_time_ms=execution_time * 1000 - ) - console.print(f"āœ… [green]Local invocation tracking completed with minimal data[/green]") - except Exception as e2: - console.print(f"āŒ [red]Critical: Could not complete local invocation tracking: {str(e2)}[/red]") - - # NEW: Sync completion to middleware - if middleware_invocation_id and self.middleware_sync: - try: - await self.middleware_sync.sync_invocation_complete( - middleware_invocation_id, - { - "output_data": { - "stream_completed": True, - "total_chunks": chunk_count, - "sample_chunks": stream_output_data[:10] if stream_output_data else [], - "execution_time_seconds": execution_time, - "chunk_summary": { - "total_chunks": chunk_count, - "stored_samples": min(len(stream_output_data), 10), - "stream_type": "websocket" - } - }, - "execution_time_ms": execution_time * 1000 - } - ) - console.print(f"šŸ“” [dim]Stream completion synced to middleware[/dim]") - except Exception as e: - console.print(f"āš ļø [yellow]Failed to sync stream completion to middleware: {e}[/yellow]") - - # Record in original agent_runs table for backward compatibility - self.db_service.record_agent_run( - agent_id=request.agent_id, - input_data=request.input_data.dict(), - output_data={"stream_completed": True, "chunk_count": chunk_count}, - success=True, - execution_time=execution_time, - ) - - console.print( - f"āœ… Agent [cyan]{request.agent_id}[/cyan] tracked stream completed successfully in " - f"{execution_time:.2f}s with {chunk_count} chunks (invocation: {invocation_id[:8]}...)" - ) - - except Exception as e: - execution_time = time.time() - start_time - error_detail = f"Error streaming agent {request.agent_id}: {str(e)}" - - await self._send_error(websocket, error_detail) - - # Complete local invocation tracking with error - db_service.complete_invocation( - invocation_id=invocation_id, - error_detail=error_detail, - execution_time_ms=execution_time * 1000 - ) - - # NEW: Sync error to middleware - if middleware_invocation_id and self.middleware_sync: - try: - await self.middleware_sync.sync_invocation_complete( - middleware_invocation_id, - { - "error_detail": error_detail, - "execution_time_ms": execution_time * 1000 - } - ) - except Exception as sync_error: - console.print(f"āš ļø [yellow]Failed to sync stream error to middleware: {sync_error}[/yellow]") - - # Record failed run in database (backward compatibility) - self.db_service.record_agent_run( - agent_id=request.agent_id, - input_data=request.input_data.dict(), - output_data=None, - success=False, - error_message=error_detail, - execution_time=execution_time, - ) - - console.print(f"šŸ’„ [red]{error_detail}[/red] (invocation: {invocation_id[:8]}...)") + # # Small delay to prevent overwhelming the client + # await asyncio.sleep(0) + + # # Calculate execution time + # execution_time = time.time() - start_time + + # # Send completion status + # await self._send_status(websocket, "stream_completed", { + # "agent_id": request.agent_id, + # "invocation_id": invocation_id, + # "middleware_invocation_id": middleware_invocation_id, # NEW: Include middleware ID + # "total_chunks": chunk_count, + # "execution_time": execution_time + # }) + + # # Complete local invocation tracking with success + # try: + # db_service.complete_invocation( + # invocation_id=invocation_id, + # output_data={ + # "stream_completed": True, + # "total_chunks": chunk_count, + # "sample_chunks": stream_output_data[:10] if stream_output_data else [], + # "execution_time_seconds": execution_time, + # "chunk_summary": { + # "total_chunks": chunk_count, + # "stored_samples": min(len(stream_output_data), 10), + # "stream_type": "websocket" + # } + # }, + # execution_time_ms=execution_time * 1000 + # ) + # console.print(f"āœ… [green]Local invocation tracking completed successfully[/green]") + # except Exception as e: + # console.print(f"āŒ [red]Failed to complete local invocation tracking: {str(e)}[/red]") + # # Try to complete with minimal data + # try: + # db_service.complete_invocation( + # invocation_id=invocation_id, + # output_data={ + # "stream_completed": True, + # "total_chunks": chunk_count, + # "execution_time_seconds": execution_time, + # "serialization_note": "Some chunks could not be serialized" + # }, + # execution_time_ms=execution_time * 1000 + # ) + # console.print(f"āœ… [green]Local invocation tracking completed with minimal data[/green]") + # except Exception as e2: + # console.print(f"āŒ [red]Critical: Could not complete local invocation tracking: {str(e2)}[/red]") + + # # NEW: Sync completion to middleware + # if middleware_invocation_id and self.middleware_sync: + # try: + # await self.middleware_sync.sync_invocation_complete( + # middleware_invocation_id, + # { + # "output_data": { + # "stream_completed": True, + # "total_chunks": chunk_count, + # "sample_chunks": stream_output_data[:10] if stream_output_data else [], + # "execution_time_seconds": execution_time, + # "chunk_summary": { + # "total_chunks": chunk_count, + # "stored_samples": min(len(stream_output_data), 10), + # "stream_type": "websocket" + # } + # }, + # "execution_time_ms": execution_time * 1000 + # } + # ) + # console.print(f"šŸ“” [dim]Stream completion synced to middleware[/dim]") + # except Exception as e: + # console.print(f"āš ļø [yellow]Failed to sync stream completion to middleware: {e}[/yellow]") + + # # Record in original agent_runs table for backward compatibility + # self.db_service.record_agent_run( + # agent_id=request.agent_id, + # input_data=request.input_data.dict(), + # output_data={"stream_completed": True, "chunk_count": chunk_count}, + # success=True, + # execution_time=execution_time, + # ) + + # console.print( + # f"āœ… Agent [cyan]{request.agent_id}[/cyan] tracked stream completed successfully in " + # f"{execution_time:.2f}s with {chunk_count} chunks (invocation: {invocation_id[:8]}...)" + # ) + + # except Exception as e: + # execution_time = time.time() - start_time + # error_detail = f"Error streaming agent {request.agent_id}: {str(e)}" + + # await self._send_error(websocket, error_detail) + + # # Complete local invocation tracking with error + # db_service.complete_invocation( + # invocation_id=invocation_id, + # error_detail=error_detail, + # execution_time_ms=execution_time * 1000 + # ) + + # # NEW: Sync error to middleware + # if middleware_invocation_id and self.middleware_sync: + # try: + # await self.middleware_sync.sync_invocation_complete( + # middleware_invocation_id, + # { + # "error_detail": error_detail, + # "execution_time_ms": execution_time * 1000 + # } + # ) + # except Exception as sync_error: + # console.print(f"āš ļø [yellow]Failed to sync stream error to middleware: {sync_error}[/yellow]") + + # # Record failed run in database (backward compatibility) + # self.db_service.record_agent_run( + # agent_id=request.agent_id, + # input_data=request.input_data.dict(), + # output_data=None, + # success=False, + # error_message=error_detail, + # execution_time=execution_time, + # ) + + # console.print(f"šŸ’„ [red]{error_detail}[/red] (invocation: {invocation_id[:8]}...)") - finally: - self._cleanup_stream(connection_id) + # finally: + # self._cleanup_stream(connection_id) def _convert_to_serializable(self, obj): """Convert objects to JSON-serializable format""" diff --git a/runagent/sdk/socket_client.py b/runagent/sdk/socket_client.py index f34dd4d..9c776bc 100644 --- a/runagent/sdk/socket_client.py +++ b/runagent/sdk/socket_client.py @@ -29,89 +29,79 @@ def __init__( async def run_stream_async(self, agent_id: str, entrypoint_tag: str, *input_args, **input_kwargs) -> AsyncIterator[Any]: """Stream agent execution results (async version)""" - uri = f"{self.base_socket_url}/agents/{agent_id}/execute/{entrypoint_tag}" + uri = f"{self.base_socket_url}/agents/{agent_id}/run-stream" async with websockets.connect(uri) as websocket: - # Send start stream request - request = WebSocketAgentRequest( - action=WebSocketActionType.START_STREAM, - agent_id=agent_id, - input_args=input_args, - input_kwargs=input_kwargs, - ) + # Send start stream request in the exact format required + request_data = { + "entrypoint_tag": entrypoint_tag, + "input_args": input_args, + "input_kwargs": input_kwargs, + "timeout_seconds": 60, + "async_execution": False + } - start_msg = SafeMessage( - id="stream_start", - type=MessageType.STATUS, - timestamp="", - data=request.dict() - ) - - # Use serialize_message like the sync version - serialized_msg = self.serializer.serialize_message(start_msg) - await websocket.send(serialized_msg) + # Send the request as direct JSON + await websocket.send(json.dumps(request_data)) # Receive and yield chunks async for raw_message in websocket: - # Use deserialize_message like the sync version - safe_msg = self.serializer.deserialize_message(raw_message) + try: + message = json.loads(raw_message) + except json.JSONDecodeError: + continue # Skip invalid messages - if safe_msg.error: - raise Exception(f"Stream error: {safe_msg.error}") + message_type = message.get("type") - if safe_msg.type == MessageType.STATUS: - status = safe_msg.data.get("status") + if message_type == "error": + raise Exception(f"Stream error: {message.get('error')}") + elif message_type == "status": + status = message.get("status") if status == "stream_completed": break elif status == "stream_started": continue # Skip status messages - elif safe_msg.type == MessageType.ERROR: - raise Exception(f"Agent error: {safe_msg.data}") - else: + elif message_type == "data": # Yield the actual chunk data - yield safe_msg.data.get("content", safe_msg.data) + yield message.get("content") def run_stream(self, agent_id: str, entrypoint_tag: str, input_args, input_kwargs) -> Iterator[Any]: """Stream agent execution results (sync version)""" from websockets.sync.client import connect - uri = f"{self.base_socket_url}/agents/{agent_id}/execute/{entrypoint_tag}" + uri = f"{self.base_socket_url}/agents/{agent_id}/run-stream" with connect(uri) as websocket: - # Send start stream request - request = WebSocketAgentRequest( - action=WebSocketActionType.START_STREAM, - agent_id=agent_id, - input_args=input_args, - input_kwargs=input_kwargs, - ) + # Send start stream request in the exact format required + request_data = { + "entrypoint_tag": entrypoint_tag, + "input_args": input_args, + "input_kwargs": input_kwargs, + "timeout_seconds": 60, + "async_execution": False + } - start_msg = SafeMessage( - id="stream_start", - type=MessageType.STATUS, - timestamp="", - data=request.dict() - ) - - serialized_msg = self.serializer.serialize_message(start_msg) - websocket.send(serialized_msg) + # Send the request as direct JSON + websocket.send(json.dumps(request_data)) # Receive and yield chunks for raw_message in websocket: - safe_msg = self.serializer.deserialize_message(raw_message) + try: + message = json.loads(raw_message) + except json.JSONDecodeError: + continue # Skip invalid messages - if safe_msg.error: - raise Exception(f"Stream error: {safe_msg.error}") + message_type = message.get("type") - if safe_msg.type == MessageType.STATUS: - status = safe_msg.data.get("status") + if message_type == "error": + raise Exception(f"Stream error: {message.get('error')}") + elif message_type == "status": + status = message.get("status") if status == "stream_completed": break elif status == "stream_started": continue # Skip status messages - elif safe_msg.type == MessageType.ERROR: - raise Exception(f"Agent error: {safe_msg.data}") - else: + elif message_type == "data": # Yield the actual chunk data - yield safe_msg.data + yield message.get("content") diff --git a/runagent/utils/schema.py b/runagent/utils/schema.py index c44eaa8..526dfc9 100644 --- a/runagent/utils/schema.py +++ b/runagent/utils/schema.py @@ -109,7 +109,7 @@ class WebSocketActionType(str, Enum): class WebSocketAgentRequest(BaseModel): """WebSocket request model for agent streaming""" action: WebSocketActionType - agent_id: str + entrypoint_tag: str = Field(..., description="Entrypoint tag") input_args: t.List[t.Any] = Field( default_factory=list, description="Input data for positional arguments" ) From 67cddddb72067a3e60ab8e6d29ec399a394a718f Mon Sep 17 00:00:00 2001 From: sawradip Date: Sun, 28 Sep 2025 10:47:50 +0600 Subject: [PATCH 09/13] feat: improved langgarph default template --- templates/langgraph/default/agents.py | 36 +++++++++++++++++++ .../langgraph/default/runagent.config.json | 4 +-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/templates/langgraph/default/agents.py b/templates/langgraph/default/agents.py index cc75304..f6f3e0b 100644 --- a/templates/langgraph/default/agents.py +++ b/templates/langgraph/default/agents.py @@ -187,6 +187,42 @@ def get_solutions(query: str, num_solutions: int = 3): # Create workflow app = create_workflow() +def app_invoke(query: str, num_solutions: int = 3) -> dict: + """ + Entrypoint for synchronous agent invocation. + Args: + input_data: Dictionary with keys 'query' and 'num_solutions' + Returns: + Dictionary with solutions and validated results + """ + # Ensure required keys + + initial_state = { + "query": query, + "num_solutions": num_solutions, + "solutions": [], + "validated_results": "", + } + result = app.invoke(initial_state) + return result + +def app_stream(query: str, num_solutions: int = 3): + """ + Entrypoint for streaming agent invocation. + Args: + input_data: Dictionary with keys 'query' and 'num_solutions' + Yields: + Intermediate states as the workflow progresses + """ + + initial_state = { + "query": query, + "num_solutions": num_solutions, + "solutions": [], + "validated_results": "", + } + for state in app.stream(initial_state): + yield state if __name__ == "__main__": for out in app.stream( diff --git a/templates/langgraph/default/runagent.config.json b/templates/langgraph/default/runagent.config.json index 31d0f20..d7939a4 100644 --- a/templates/langgraph/default/runagent.config.json +++ b/templates/langgraph/default/runagent.config.json @@ -14,12 +14,12 @@ "entrypoints": [ { "file": "agents.py", - "module": "app.invoke", + "module": "app_invoke", "tag": "generic" }, { "file": "agents.py", - "module": "app.stream", + "module": "app_stream", "tag": "generic_stream" } ] From 1e2c649ad9fc02930f1890786e8ef1ffd57a9523 Mon Sep 17 00:00:00 2001 From: sawradip Date: Mon, 29 Sep 2025 04:30:20 +0600 Subject: [PATCH 10/13] feat: URL prefix working > rest+socket --- .gitignore | 4 +++- runagent/client/client.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 309e276..66b76c1 100644 --- a/.gitignore +++ b/.gitignore @@ -159,4 +159,6 @@ myenv/ myvenv/ #parlant -parlant-data \ No newline at end of file +parlant-data + +test.md \ No newline at end of file diff --git a/runagent/client/client.py b/runagent/client/client.py index 6d16708..ce890a9 100644 --- a/runagent/client/client.py +++ b/runagent/client/client.py @@ -34,11 +34,11 @@ def __init__(self, agent_id: str, entrypoint_tag: str, local: bool = True, host: agent_base_url_local = f"http://{agent_host}:{agent_port}" agent_socket_url_local = f"ws://{agent_host}:{agent_port}" - self.rest_client = RestClient(base_url=agent_base_url_local, api_prefix="/api/v1") + self.rest_client = RestClient(base_url=agent_base_url_local) self.socket_client = SocketClient(base_socket_url=agent_socket_url_local) else: - self.rest_client = RestClient() - self.socket_client = SocketClient() + self.rest_client = RestClient(is_local=False) + self.socket_client = SocketClient(is_local=False) # self.agent_architecture = self.rest_client.get_agent_architecture(agent_id) From d31e068c4733054c2aaf422252a85d17d7efa7cc Mon Sep 17 00:00:00 2001 From: sawradip Date: Mon, 29 Sep 2025 04:30:38 +0600 Subject: [PATCH 11/13] feat: debug option fixed --- runagent/cli/commands.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/runagent/cli/commands.py b/runagent/cli/commands.py index 23b227a..8b8a7df 100644 --- a/runagent/cli/commands.py +++ b/runagent/cli/commands.py @@ -89,6 +89,8 @@ def setup(api_key, base_url, force): sdk.configure(api_key=api_key, base_url=base_url, save=True) console.print("āœ… [green]Setup completed successfully![/green]") except AuthenticationError as auth_err: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f"āŒ [red]Authentication failed:[/red] {auth_err}") # Provide specific troubleshooting based on error message @@ -520,6 +522,8 @@ def template(action_list, action_info, framework, template, filter_framework, fo ) except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f"āŒ [red]Template error:[/red] {e}") raise click.ClickException("Template operation failed") @@ -572,10 +576,14 @@ def upload(path: Path): sys.exit(1) except AuthenticationError as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f"āŒ [red]Authentication error:[/red] {e}") import sys sys.exit(1) except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f"āŒ [red]Upload error:[/red] {e}") import sys sys.exit(1) @@ -622,10 +630,14 @@ def start(agent_id, config): sys.exit(1) except AuthenticationError as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f"āŒ [red]Authentication error:[/red] {e}") import sys sys.exit(1) except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f"āŒ [red]Start error:[/red] {e}") import sys sys.exit(1) @@ -854,6 +866,8 @@ def serve(port, host, debug, replace, no_animation, animation_style, path): else: console.print(f" Connection: [red]āŒ Failed to connect: {test_result.get('error', 'Unknown error')}[/red]") except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f" Connection: [red]āŒ Connection test failed: {e}[/red]") else: console.print(f" Status: [yellow]āš ļø DISABLED[/yellow]") From e9b64e165f02fe4bed98a8e116c94655970b0777 Mon Sep 17 00:00:00 2001 From: sawradip Date: Mon, 29 Sep 2025 04:31:20 +0600 Subject: [PATCH 12/13] feat: auth test fixed with updated endpoint --- runagent/sdk/config.py | 60 +++++++++++++++++------------ runagent/sdk/rest_client.py | 41 ++++++++++++++++---- runagent/sdk/server/socket_utils.py | 8 ++-- runagent/sdk/socket_client.py | 12 ++++-- runagent/utils/serializer.py | 13 ++++--- 5 files changed, 88 insertions(+), 46 deletions(-) diff --git a/runagent/sdk/config.py b/runagent/sdk/config.py index 216fe07..ff4e51e 100644 --- a/runagent/sdk/config.py +++ b/runagent/sdk/config.py @@ -144,34 +144,42 @@ def _test_authentication(self) -> t.Dict[str, t.Any]: base_url=self._config.get("base_url"), ) - # Test connection using the profile endpoint - response = client.http.get("/users/profile", timeout=10) + # Test connection using the token validation endpoint + api_key = self._config.get("api_key") + response = client.http.post(f"/tokens/validate?token={api_key}", timeout=10) if response.status_code == 200: - profile_data = response.json() + token_data = response.json() - # Extract user info from the middleware response structure - auth_data = profile_data.get("auth_data", {}) - profile_data_inner = profile_data.get("profile_data", {}) - - user_info = { - "email": auth_data.get("email") or profile_data_inner.get("email"), - "user_id": auth_data.get("id") or profile_data_inner.get("id"), - "tier": profile_data_inner.get("tier", "free") - } - - # Store user info for later display - self._config.update({ - "user_email": user_info["email"], - "user_id": user_info["user_id"], - "user_tier": user_info["tier"], - "auth_validated": True - }) - - return { - "success": True, - "user_info": user_info - } + # Check if token validation was successful + if token_data.get("success") and token_data.get("data", {}).get("valid"): + data = token_data.get("data", {}) + + user_info = { + "email": data.get("user_email"), + "user_id": data.get("user_id"), + "tier": data.get("user_tier", "Free") + } + + # Store user info for later display + self._config.update({ + "user_email": user_info["email"], + "user_id": user_info["user_id"], + "user_tier": user_info["tier"], + "auth_validated": True + }) + + return { + "success": True, + "user_info": user_info + } + else: + # Token validation failed + reason = token_data.get("data", {}).get("reason", "Token validation failed") + return { + "success": False, + "error": reason + } elif response.status_code == 401: try: @@ -192,6 +200,8 @@ def _test_authentication(self) -> t.Dict[str, t.Any]: } except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise # Handle connection errors gracefully error_msg = str(e) if "Connection" in error_msg or "timeout" in error_msg.lower(): diff --git a/runagent/sdk/rest_client.py b/runagent/sdk/rest_client.py index d124ebc..c46b577 100644 --- a/runagent/sdk/rest_client.py +++ b/runagent/sdk/rest_client.py @@ -79,13 +79,13 @@ def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None self.session.headers.update({ "accept": "application/json", "content-type": "application/json", + "User-Agent": "RunAgent-CLI/1.0" }) if self.api_key: # Support both JWT tokens and API keys self.session.headers.update({ - "Authorization": f"Bearer {self.api_key}", - "User-Agent": "RunAgent-CLI/1.0" + "Authorization": f"Bearer {self.api_key}" }) def _get_url(self, path: str) -> str: @@ -155,6 +155,7 @@ def _request( data = None try: + print("[REQUEST] to: ", url) response = self.session.request( method=method.upper(), url=url, @@ -244,6 +245,7 @@ def __init__( base_url: Optional[str] = None, api_key: Optional[str] = None, api_prefix: Optional[str] = "/api/v1", + is_local: Optional[bool] = True ): """Initialize REST client for middleware server""" self.api_key = api_key or Config.get_api_key() @@ -257,10 +259,10 @@ def __init__( # Initialize HTTP handler directly with API key # The middleware auth system will handle JWT conversion automatically - self.http = HttpHandler( - api_key=self.api_key, # Use API key directly - middleware handles conversion - base_url=self.base_url - ) + # if is_local: + # self.http = HttpHandler(base_url=self.base_url) + # else: + self.http = HttpHandler(api_key=self.api_key, base_url=self.base_url) # Cache for limits to avoid repeated API calls self._limits_cache = None @@ -367,6 +369,8 @@ def get_local_db_limits(self) -> Dict: return self._get_error_response("connection") except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return self._get_error_response("generic", str(e)) def clear_limits_cache(self): @@ -382,7 +386,8 @@ def _create_zip_from_folder(self, agent_id: str, folder_path: Path) -> str: with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED, compresslevel=6) as zipf: for file_path in folder_path.rglob("*"): - if file_path.is_file() and not file_path.name.startswith("."): + if file_path.is_file(): + # and not file_path.name.startswith("."): # Skip unnecessary files if file_path.name in ["__pycache__", ".DS_Store", "Thumbs.db"]: continue @@ -428,9 +433,13 @@ def _upload_agent_metadata_to_server(self, config_data: Dict, agent_id: str) -> } except (ClientError, ServerError, ConnectionError) as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Metadata upload failed: {e.message}"} except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Metadata upload error: {str(e)}"} def _upload_agent_zip_file_to_server(self, zip_path: str, agent_id: str, progress: Progress, task_id) -> Dict: @@ -475,9 +484,13 @@ def _upload_agent_zip_file_to_server(self, zip_path: str, agent_id: str, progres } except (ClientError, ServerError, ConnectionError) as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"File upload failed: {e.message}"} except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Upload error: {str(e)}"} def _process_upload_result(self, result: Dict, upload_metadata: Dict) -> Dict: @@ -505,6 +518,8 @@ def _process_upload_result(self, result: Dict, upload_metadata: Dict) -> Dict: console.print(f"āš ļø [yellow]Warning: Could not save to local database: {db_result.get('error')}[/yellow]") except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise console.print(f"āš ļø [yellow]Warning: Database error: {str(e)}[/yellow]") # Save deployment info locally @@ -564,6 +579,8 @@ def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: agent_config = get_agent_config(folder_path) console.print(f"šŸ“‹ [green]Agent config loaded successfully[/green]") except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Failed to load agent config: {str(e)}"} # Step 3: Generate agent fingerprint for duplicate detection @@ -696,6 +713,8 @@ def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: }) except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Upload failed: {str(e)}"} def start_agent(self, agent_id: str, config: Dict = None) -> Dict: @@ -714,6 +733,8 @@ def start_agent(self, agent_id: str, config: Dict = None) -> Dict: return {"success": False, "error": f"Failed to start agent: {e.message}"} except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Start agent failed: {str(e)}"} def _process_start_result(self, result: Dict, agent_id: str) -> Dict: @@ -823,6 +844,8 @@ def get_agent_status(self, agent_id: str) -> Dict: except (ClientError, ServerError, ConnectionError) as e: return {"success": False, "error": f"Status check failed: {e.message}"} except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Status check failed: {str(e)}"} def _get_local_deployment_info(self, agent_id: str) -> Optional[Dict]: @@ -900,6 +923,8 @@ def run_agent( } except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return { "success": False, "data": None, @@ -920,6 +945,8 @@ def get_agent_architecture(self, agent_id: str) -> Dict: response = self.http.get(f"/agents/{agent_id}/architecture") return response.json() except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise return {"success": False, "error": f"Failed to get architecture: {str(e)}"} # runagent/sdk/rest_client.py - FIXED RestClient initialization diff --git a/runagent/sdk/server/socket_utils.py b/runagent/sdk/server/socket_utils.py index 84501cb..7cd15c0 100644 --- a/runagent/sdk/server/socket_utils.py +++ b/runagent/sdk/server/socket_utils.py @@ -182,12 +182,12 @@ async def handle_agent_stream_with_tracking( console.print(f"āŒ [red]Middleware sync start failed: {e}[/red]") # Send stream started status to client (simple JSON format) - await websocket.send_text(json.dumps({ + await websocket.send_json({ "type": "status", "status": "stream_started", "invocation_id": invocation_id, "middleware_invocation_id": middleware_invocation_id - })) + }) start_time = time.time() chunk_count = 0 @@ -274,14 +274,14 @@ async def handle_agent_stream_with_tracking( console.print(f"āŒ [red]Middleware sync completion error: {e}[/red]") # Send completion status to client (simple JSON format) - await websocket.send_text(json.dumps({ + await websocket.send_json({ "type": "status", "status": "stream_completed", "total_chunks": chunk_count, "execution_time": execution_time, "invocation_id": invocation_id, "middleware_synced": middleware_invocation_id is not None - })) + }) except Exception as stream_error: # Streaming failed diff --git a/runagent/sdk/socket_client.py b/runagent/sdk/socket_client.py index 9c776bc..eadf2b8 100644 --- a/runagent/sdk/socket_client.py +++ b/runagent/sdk/socket_client.py @@ -16,20 +16,25 @@ def __init__( self, base_socket_url: Optional[str] = None, api_key: Optional[str] = None, - api_prefix: Optional[str] = "/api/v1" + api_prefix: Optional[str] = "/api/v1", + is_local: Optional[bool] = True ): if not base_socket_url: base_url = Config.get_base_url() base_url = base_url.lstrip("http://").lstrip("https://") base_socket_url = f"ws://{base_url}" + self.is_local = is_local self.base_socket_url = base_socket_url.rstrip("/") + api_prefix self.api_key = api_key or Config.get_api_key() self.serializer = CoreSerializer() async def run_stream_async(self, agent_id: str, entrypoint_tag: str, *input_args, **input_kwargs) -> AsyncIterator[Any]: """Stream agent execution results (async version)""" - uri = f"{self.base_socket_url}/agents/{agent_id}/run-stream" + + uri = f"{self.base_socket_url}/agents/{agent_id}/run-stream?token={self.api_key}" + # if not self.is_local: + # uri = f"{uri}?token={self.api_key}" async with websockets.connect(uri) as websocket: # Send start stream request in the exact format required @@ -40,7 +45,6 @@ async def run_stream_async(self, agent_id: str, entrypoint_tag: str, *input_args "timeout_seconds": 60, "async_execution": False } - # Send the request as direct JSON await websocket.send(json.dumps(request_data)) @@ -69,7 +73,7 @@ def run_stream(self, agent_id: str, entrypoint_tag: str, input_args, input_kwarg """Stream agent execution results (sync version)""" from websockets.sync.client import connect - uri = f"{self.base_socket_url}/agents/{agent_id}/run-stream" + uri = f"{self.base_socket_url}/agents/{agent_id}/run-stream?token={self.api_key}" with connect(uri) as websocket: diff --git a/runagent/utils/serializer.py b/runagent/utils/serializer.py index bfb9fe0..4271384 100644 --- a/runagent/utils/serializer.py +++ b/runagent/utils/serializer.py @@ -68,12 +68,13 @@ def deserialize_object(self, json_str: str, reconstruct: bool = False) -> Union[ if os.getenv('DISABLE_TRY_CATCH'): raise self.logger.error(f"JSON deserialization failed: {e}") - raise ValueError(f"Invalid JSON string: {e}") - except Exception as e: - if os.getenv('DISABLE_TRY_CATCH'): - raise - self.logger.error(f"Deserialization failed: {e}") - raise + return json_str + # raise ValueError(f"Invalid JSON string: {e}") + # except Exception as e: + # if os.getenv('DISABLE_TRY_CATCH'): + # raise + # self.logger.error(f"Deserialization failed: {e}") + # raise def serialize_message(self, message: SafeMessage) -> str: """ From 3ae79c54bf9b00694e0cc48f6b4eaa8f3d4c80a2 Mon Sep 17 00:00:00 2001 From: sawradip Date: Mon, 29 Sep 2025 05:19:48 +0600 Subject: [PATCH 13/13] feat: runagent deploy working --- runagent/cli/commands.py | 94 ++++------- runagent/sdk/rest_client.py | 327 ++++++++++++++++++++++++++++++++---- 2 files changed, 335 insertions(+), 86 deletions(-) diff --git a/runagent/cli/commands.py b/runagent/cli/commands.py index 8b8a7df..ff33016 100644 --- a/runagent/cli/commands.py +++ b/runagent/cli/commands.py @@ -644,82 +644,62 @@ def start(agent_id, config): @click.command() -@click.option("--folder", help="Folder containing agent files (for upload + start)") -@click.option("--id", "agent_id", help="Agent ID (for start only)") -@click.option("--local", is_flag=True, help="Deploy locally instead of remote server") -@click.option("--framework", help="Framework type (auto-detected if not specified)") -@click.option("--config", help="JSON configuration for deployment") -def deploy(folder, agent_id, local, framework, config): - """Deploy agent (upload + start) or deploy locally""" +@click.argument( + "path", + type=click.Path( + exists=True, + file_okay=False, + dir_okay=True, + readable=True, + resolve_path=True, + path_type=Path, + ), + default=".", +) +def deploy(path: Path): + """Deploy agent (upload + start) to remote server""" try: sdk = RunAgent() - if local: - # Local deployment - if not folder: - raise click.ClickException("--folder is required for local deployment") - - # Use deploy_local command logic - ctx = click.get_current_context() - ctx.invoke(deploy_local, folder=folder, framework=framework) - return - - # Remote deployment + # Check authentication if not sdk.is_configured(): console.print( "āŒ [red]Not authenticated.[/red] Run [cyan]'runagent setup --api-key '[/cyan] first" ) raise click.ClickException("Authentication required") - # Parse config - config_dict = {} - if config: - try: - config_dict = json.loads(config) - except json.JSONDecodeError: - if os.getenv('DISABLE_TRY_CATCH'): - raise - raise click.ClickException("Invalid JSON in config parameter") - - if folder: - # Full deployment (upload + start) - if not Path(folder).exists(): - raise click.ClickException(f"Folder not found: {folder}") - - console.print(f"šŸŽÆ [bold]Full deployment (upload + start)...[/bold]") - console.print(f"šŸ“ Source: [cyan]{folder}[/cyan]") - - result = sdk.deploy_remote( - folder=folder, framework=framework, config=config_dict - ) + # Validate folder + if not Path(path).exists(): + raise click.ClickException(f"Folder not found: {path}") - if result.get("success"): - console.print(f"\nāœ… [green]Full deployment successful![/green]") - console.print( - f"šŸ†” Agent ID: [bold magenta]{result.get('agent_id')}[/bold magenta]" - ) - console.print(f"🌐 Endpoint: [link]{result.get('endpoint')}[/link]") - else: - console.print(f"āŒ [red]Deployment failed:[/red] {format_error_message(result.get('error'))}") - import sys - sys.exit(1) + console.print(f"šŸŽÆ [bold]Deploying agent (upload + start)...[/bold]") + console.print(f"šŸ“ Source: [cyan]{path}[/cyan]") - elif agent_id: - # Start existing agent - ctx = click.get_current_context() - ctx.invoke(start, agent_id=agent_id, config=config) + # Deploy agent (framework auto-detected) + result = sdk.deploy_remote(folder=str(path)) + if result.get("success"): + console.print(f"\nāœ… [green]Deployment successful![/green]") + console.print(f"šŸ†” Agent ID: [bold magenta]{result.get('agent_id')}[/bold magenta]") + console.print(f"🌐 Endpoint: [link]{result.get('endpoint')}[/link]") else: - raise click.ClickException( - "Either --folder (for upload+start) or --id (for start only) is required" - ) + console.print(f"āŒ [red]Deployment failed:[/red] {format_error_message(result.get('error'))}") + import sys + sys.exit(1) + except AuthenticationError as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + console.print(f"āŒ [red]Authentication error:[/red] {e}") + import sys + sys.exit(1) except Exception as e: if os.getenv('DISABLE_TRY_CATCH'): raise console.print(f"āŒ [red]Deployment error:[/red] {e}") - raise click.ClickException("Deployment failed") + import sys + sys.exit(1) diff --git a/runagent/sdk/rest_client.py b/runagent/sdk/rest_client.py index c46b577..8b9a335 100644 --- a/runagent/sdk/rest_client.py +++ b/runagent/sdk/rest_client.py @@ -346,6 +346,20 @@ def _get_error_response(self, error_type: str, error_msg: str = None) -> Dict: console.print(response["message"]) return response + def _create_progress_bar(self, initial_description: str = "Processing..."): + """Create a standardized progress bar for operations""" + progress = Progress( + SpinnerColumn(), + TextColumn("[bold green]{task.description}[/bold green]"), + BarColumn(bar_width=40), + TextColumn("[bold]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + TimeRemainingColumn(), + console=console, + ) + task_id = progress.add_task(initial_description, total=100) + return progress, task_id + def get_local_db_limits(self) -> Dict: """Fetch local database limits from backend API""" try: @@ -442,6 +456,10 @@ def _upload_agent_metadata_to_server(self, config_data: Dict, agent_id: str) -> raise return {"success": False, "error": f"Metadata upload error: {str(e)}"} + def _upload_agent_metadata_core(self, config_data: Dict, agent_id: str) -> Dict: + """Core logic for uploading agent metadata without progress bar""" + return self._upload_agent_metadata_to_server(config_data, agent_id) + def _upload_agent_zip_file_to_server(self, zip_path: str, agent_id: str, progress: Progress, task_id) -> Dict: """Upload agent zip file (source code) to middleware server""" try: @@ -493,6 +511,50 @@ def _upload_agent_zip_file_to_server(self, zip_path: str, agent_id: str, progres raise return {"success": False, "error": f"Upload error: {str(e)}"} + def _upload_agent_zip_file_core(self, zip_path: str, agent_id: str) -> Dict: + """Core logic for uploading agent zip file without progress bar""" + try: + # Upload zip file + file_size = os.path.getsize(zip_path) + + with open(zip_path, "rb") as f: + files = {"file": (os.path.basename(zip_path), f, "application/zip")} + data = { + "agent_id": agent_id, + } + + try: + response = self.http.post("/agents/upload", files=files, data=data, timeout=300) + result = response.json() + + # Handle new API response format + if result.get("success"): + return { + "success": True, + "agent_id": result.get("data", {}).get("agent_id", agent_id), + "message": result.get("message", "Upload completed"), + "status": result.get("data", {}).get("status", "uploaded"), + "file_size": result.get("data", {}).get("file_size", file_size), + "file_name": result.get("data", {}).get("file_name", os.path.basename(zip_path)) + } + else: + error_info = result.get("error", {}) + return { + "success": False, + "error": f"File upload failed: {error_info.get('message', 'Unknown error')}", + "error_code": error_info.get("code", "UNKNOWN_ERROR") + } + + except (ClientError, ServerError, ConnectionError) as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + return {"success": False, "error": f"File upload failed: {e.message}"} + + except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + return {"success": False, "error": f"Upload error: {str(e)}"} + def _process_upload_result(self, result: Dict, upload_metadata: Dict) -> Dict: """Process upload result""" if result.get("success"): @@ -548,9 +610,8 @@ def _process_upload_result(self, result: Dict, upload_metadata: Dict) -> Dict: return result def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: - """Upload agent folder to middleware server with validation""" + """Upload agent folder to middleware server with validation and progress bar""" try: - if not folder_path.exists(): return {"success": False, "error": f"Folder not found: {folder_path}"} @@ -666,7 +727,7 @@ def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: agent_id = generate_agent_id() console.print(f"šŸ†” New Agent ID: [magenta]{agent_id}[/magenta]") - # Step 5: Create zip file and upload in parallel + # Step 5: Upload with progress bar console.print(f"🌐 Uploading to: [bold blue]{self.base_url}[/bold blue]") with Progress( @@ -687,7 +748,7 @@ def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: "config": agent_config.to_dict() } - metadata_result = self._upload_agent_metadata_to_server(config_data, agent_id) + metadata_result = self._upload_agent_metadata_core(config_data, agent_id) if not metadata_result.get("success"): return {"success": False, "error": f"Metadata upload failed: {metadata_result.get('error')}"} @@ -700,8 +761,13 @@ def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: console.print(f"šŸ“¦ Created upload package: [cyan]{Path(zip_path).name}[/cyan]") - # Step 3: Upload zip file - result = self._upload_agent_zip_file_to_server(zip_path, agent_id, progress, upload_task) + # Step 3: Upload zip file with progress updates + progress.update(upload_task, completed=30, description="Uploading agent files...") + result = self._upload_agent_zip_file_core(zip_path, agent_id) + + if result.get("success"): + progress.update(upload_task, completed=100, description="Upload completed!") + time.sleep(0.5) # Brief pause to show completion # Clean up zip file os.unlink(zip_path) @@ -718,10 +784,44 @@ def upload_agent_metadata_and_zip(self, folder_path: Path) -> Dict: return {"success": False, "error": f"Upload failed: {str(e)}"} def start_agent(self, agent_id: str, config: Dict = None) -> Dict: - """Start/deploy an uploaded agent on the middleware server""" + """Start/deploy an uploaded agent on the middleware server with progress bar""" try: console.print(f"Starting agent: [bold magenta]{agent_id}[/bold magenta]") + with Progress( + SpinnerColumn(), + TextColumn("[bold green]{task.description}[/bold green]"), + BarColumn(bar_width=40), + TextColumn("[bold]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + TimeRemainingColumn(), + console=console, + ) as progress: + start_task = progress.add_task("Initializing agent startup...", total=100) + + # Step 1: Prepare startup + progress.update(start_task, completed=20, description="Preparing agent deployment...") + + # Step 2: Start agent + progress.update(start_task, completed=50, description="Starting agent on server...") + result = self._start_agent_core(agent_id, config) + + if result.get("success"): + progress.update(start_task, completed=100, description="Agent started successfully!") + time.sleep(0.5) # Brief pause to show completion + else: + progress.update(start_task, completed=100, description="Agent startup failed!") + + return result + + except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + return {"success": False, "error": f"Start agent failed: {str(e)}"} + + def _start_agent_core(self, agent_id: str, config: Dict = None) -> Dict: + """Core logic for starting agent without progress bar""" + try: payload = config or {} try: @@ -767,34 +867,203 @@ def _process_start_result(self, result: Dict, agent_id: str) -> Dict: return result def deploy_agent(self, folder_path: str, metadata: Dict = None) -> Dict: - """Upload and start agent in one operation""" + """Upload and start agent in one operation with single progress bar""" console.print("šŸŽÆ [bold cyan]Starting full deployment (upload + start)...[/bold cyan]") - # First upload - upload_result = self.upload_agent(folder_path, metadata) + try: + folder_path = Path(folder_path) + if not folder_path.exists(): + return {"success": False, "error": f"Folder not found: {folder_path}"} - if not upload_result.get("success"): - return upload_result + # Step 1: Validate agent + console.print(f"šŸ” Validating agent...") + is_valid, validation_details = validate_agent(folder_path) + + if not is_valid: + error_msgs = validation_details.get("error_msgs", []) + console.print(f"āŒ [red]Agent validation failed:[/red]") + for error in error_msgs: + console.print(f" • {error}") + return { + "success": False, + "error": "Agent validation failed", + "validation_details": validation_details + } + + console.print(f"āœ… [green]Agent validation passed[/green]") - agent_id = upload_result.get("agent_id") + # Step 2: Load agent config + try: + agent_config = get_agent_config(folder_path) + console.print(f"šŸ“‹ [green]Agent config loaded successfully[/green]") + except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + return {"success": False, "error": f"Failed to load agent config: {str(e)}"} - # Then start - start_result = self.start_agent(agent_id) + # Step 3: Generate agent fingerprint for duplicate detection + fingerprint = generate_agent_fingerprint(folder_path) + console.print(f"šŸ” Agent fingerprint: [dim]{fingerprint[:16]}...[/dim]") - if start_result.get("success"): - return { - "success": True, - "agent_id": agent_id, - "endpoint": start_result.get("endpoint"), - "status": "running", - "message": f'Agent fully deployed and running. Endpoint: {start_result.get("endpoint")}', - } - else: - return { - "success": False, - "error": f"Upload succeeded but start failed: {start_result.get('error')}", - "agent_id": agent_id, - } + # Step 4: Check for existing agents (both by fingerprint and by path) + from runagent.sdk.db import DBService + db_service = DBService() + + # Check for exact fingerprint match (identical content) + existing_agent_by_fingerprint = db_service.get_agent_by_fingerprint(fingerprint) + + # Check for existing agent by path (same folder, potentially modified) + existing_agent_by_path = db_service.get_agent_by_path(str(folder_path)) + + if existing_agent_by_fingerprint: + # Identical content detected + existing_agent = existing_agent_by_fingerprint + console.print(f"āš ļø [yellow]Agent with identical content already exists![/yellow]") + console.print(f"šŸ†” Existing Agent ID: [magenta]{existing_agent['agent_id']}[/magenta]") + console.print(f"šŸ“Š Status: [cyan]{existing_agent['status']}[/cyan]") + console.print(f"šŸ“ Type: [cyan]{'Local' if existing_agent['is_local'] else 'Remote'}[/cyan]") + + # Ask user if they want to overwrite identical content + from rich.prompt import Confirm + overwrite = Confirm.ask("Do you want to overwrite the existing agent?", default=False) + + if not overwrite: + return { + "success": False, + "error": "Deployment cancelled by user", + "code": "USER_CANCELLED", + "existing_agent": existing_agent + } + + # Use existing agent ID for overwrite + agent_id = existing_agent['agent_id'] + console.print(f"šŸ”„ [yellow]Overwriting existing agent: {agent_id}[/yellow]") + + elif existing_agent_by_path: + # Modified content detected (same folder, different fingerprint) + existing_agent = existing_agent_by_path + console.print(f"āš ļø [yellow]Agent content has changed![/yellow]") + console.print(f"šŸ†” Existing Agent ID: [magenta]{existing_agent['agent_id']}[/magenta]") + console.print(f"šŸ“Š Status: [cyan]{existing_agent['status']}[/cyan]") + console.print(f"šŸ“ Type: [cyan]{'Local' if existing_agent['is_local'] else 'Remote'}[/cyan]") + console.print(f"šŸ” Content fingerprint changed (modified files detected)") + + # Show enhanced options for modified content + from rich.prompt import Prompt + choice = Prompt.ask( + "What would you like to do?", + choices=["overwrite", "new", "cancel"], + default="new" + ) + + if choice == "overwrite": + # Feature not available yet - show message and fallback + console.print(f"\n🚧 [yellow]Overwrite functionality is not yet available.[/yellow]") + console.print(f"šŸ’” [cyan]This feature is coming soon! For now, we'll create a new agent.[/cyan]") + console.print(f"šŸ“¢ [blue]Contact us on Discord if you need this feature sooner.[/blue]") + console.print(f"šŸ”— [link]https://discord.gg/Q9P9AdHVHz[/link]") + + # Fallback to new agent creation + agent_id = generate_agent_id() + console.print(f"šŸ†” New Agent ID: [magenta]{agent_id}[/magenta]") + + elif choice == "new": + # Create new agent with new ID + agent_id = generate_agent_id() + console.print(f"šŸ†” New Agent ID: [magenta]{agent_id}[/magenta]") + + else: # cancel + return { + "success": False, + "error": "Deployment cancelled by user", + "code": "USER_CANCELLED", + "existing_agent": existing_agent + } + else: + # No existing agent found - create new one + agent_id = generate_agent_id() + console.print(f"šŸ†” New Agent ID: [magenta]{agent_id}[/magenta]") + + # Step 5: Full deployment with single progress bar + console.print(f"🌐 Deploying to: [bold blue]{self.base_url}[/bold blue]") + + with Progress( + SpinnerColumn(), + TextColumn("[bold green]{task.description}[/bold green]"), + BarColumn(bar_width=40), + TextColumn("[bold]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + TimeRemainingColumn(), + console=console, + ) as progress: + deploy_task = progress.add_task("Initializing deployment...", total=100) + + # Phase 1: Upload metadata (0-20%) + progress.update(deploy_task, completed=5, description="Uploading agent metadata...") + config_data = { + "id": agent_id, + "config": agent_config.to_dict() + } + + metadata_result = self._upload_agent_metadata_core(config_data, agent_id) + + if not metadata_result.get("success"): + return {"success": False, "error": f"Metadata upload failed: {metadata_result.get('error')}"} + + progress.update(deploy_task, completed=20, description="Metadata uploaded successfully") + + # Phase 2: Create and upload zip file (20-70%) + progress.update(deploy_task, completed=25, description="Creating upload package...") + zip_path = self._create_zip_from_folder(agent_id, folder_path) + + console.print(f"šŸ“¦ Created upload package: [cyan]{Path(zip_path).name}[/cyan]") + + progress.update(deploy_task, completed=30, description="Uploading agent files...") + upload_result = self._upload_agent_zip_file_core(zip_path, agent_id) + + if not upload_result.get("success"): + os.unlink(zip_path) # Clean up zip file + return {"success": False, "error": f"File upload failed: {upload_result.get('error')}"} + + progress.update(deploy_task, completed=70, description="Files uploaded successfully") + + # Clean up zip file + os.unlink(zip_path) + + # Phase 3: Start agent (70-100%) + progress.update(deploy_task, completed=75, description="Starting agent deployment...") + start_result = self._start_agent_core(agent_id, metadata) + + if start_result.get("success"): + progress.update(deploy_task, completed=100, description="Deployment completed successfully!") + time.sleep(0.5) # Brief pause to show completion + + # Process upload result for database storage + self._process_upload_result(upload_result, { + "agent_id": agent_id, + "fingerprint": fingerprint, + "source_folder": str(folder_path) + }) + + return { + "success": True, + "agent_id": agent_id, + "endpoint": start_result.get("endpoint"), + "status": "running", + "message": f'Agent fully deployed and running. Endpoint: {start_result.get("endpoint")}', + } + else: + progress.update(deploy_task, completed=100, description="Deployment failed!") + return { + "success": False, + "error": f"Upload succeeded but start failed: {start_result.get('error')}", + "agent_id": agent_id, + } + + except Exception as e: + if os.getenv('DISABLE_TRY_CATCH'): + raise + return {"success": False, "error": f"Deployment failed: {str(e)}"} def _save_deployment_info(self, agent_id: str, metadata: Dict): """Save deployment info for CLI reference"""