diff --git a/runagent/cli/commands.py b/runagent/cli/commands.py index 61886ac..6e8fdeb 100644 --- a/runagent/cli/commands.py +++ b/runagent/cli/commands.py @@ -727,28 +727,59 @@ def run(agent_id, input_file, message, local, direct, timeout): @click.option("--cleanup-days", type=int, help="Clean up records older than N days") @click.option("--agent-id", help="Show detailed info for specific agent") @click.option("--capacity", is_flag=True, help="Show detailed capacity information") -def db_status(cleanup_days, agent_id, capacity): +@click.option("--servers", is_flag=True, help="Show running server status") +def db_status(cleanup_days, agent_id, capacity, servers): """Show local database status and statistics""" try: sdk = RunAgent() + if servers: + # Show server status + capacity_info = sdk.get_local_capacity() + + console.print(f"\nšŸ–„ļø [bold]Server Status[/bold]") + + running_servers = capacity_info.get("running_servers", []) + stopped_servers = capacity_info.get("stopped_servers", []) + + if running_servers: + console.print(f"\n🟢 [bold]Running Servers ({len(running_servers)}):[/bold]") + for server in running_servers: + console.print(f" • [green]{server['agent_id']}[/green] - {server['framework']} on {server['host']}:{server['port']}") + console.print(f" Runs: {server['run_count']}, Last: {server['last_run'] or 'Never'}") + + if stopped_servers: + console.print(f"\nšŸ”“ [bold]Stopped Servers ({len(stopped_servers)}):[/bold]") + for server in stopped_servers: + console.print(f" • [red]{server['agent_id']}[/red] - {server['framework']} on {server['host']}:{server['port']}") + console.print(f" Runs: {server['run_count']}, Last: {server['last_run'] or 'Never'}") + + summary = capacity_info.get("server_summary", {}) + console.print(f"\nšŸ“Š [bold]Summary:[/bold]") + console.print(f" Total Agents: [cyan]{summary.get('total_agents', 0)}[/cyan]") + console.print(f" Running Servers: [green]{summary.get('running_servers', 0)}[/green]") + console.print(f" Stopped Servers: [red]{summary.get('stopped_servers', 0)}[/red]") + console.print(f" Unique Ports: [yellow]{summary.get('unique_ports', 0)}[/yellow]") + + return + if capacity: # Show detailed capacity info capacity_info = sdk.get_local_capacity() console.print(f"\nšŸ“Š [bold]Database Capacity Information[/bold]") console.print( - f"Current: [cyan]{capacity_info.get('current_count', 0)}/5[/cyan] agents" + f"Current: [cyan]{capacity_info.get('current_count', 0)}/{capacity_info.get('max_capacity', 5)}[/cyan] agents" ) console.print( f"Remaining slots: [green]{capacity_info.get('remaining_slots', 0)}[/green]" ) - status = "šŸ”“ FULL" if capacity_info.get("is_full") else "🟢 Available" + status = "šŸ”“ FULL" if capacity_info.get('is_full') else "🟢 Available" console.print(f"Status: {status}") - agents = capacity_info.get("agents", []) + agents = capacity_info.get('agents', []) if agents: console.print(f"\nšŸ“‹ [bold]Deployed Agents (by age):[/bold]") for i, agent in enumerate(agents): @@ -763,11 +794,11 @@ def db_status(cleanup_days, agent_id, capacity): else " (newest)" if i == len(agents) - 1 else "" ) console.print( - f" {i+1}. {status_icon} [magenta]{agent['agent_id']}[/magenta] ({agent['framework']}) - {agent['deployed_at']}{age_label}" + f" {i+1}. {status_icon} [magenta]{agent['agent_id']}[/magenta] ({agent['framework']}) - {agent['host']}:{agent['port']}{age_label}" ) - if capacity_info.get("is_full"): - oldest = capacity_info.get("oldest_agent", {}) + if capacity_info.get('is_full'): + oldest = capacity_info.get('oldest_agent', {}) console.print( f"\nšŸ’” [yellow]To deploy new agent, replace oldest:[/yellow]" ) @@ -776,6 +807,9 @@ def db_status(cleanup_days, agent_id, capacity): ) return + except Exception as e: + console.print(f"āŒ [red]Database status error:[/red] {e}") + raise click.ClickException("Failed to get database status") if agent_id: # Show specific agent info diff --git a/runagent/client/client.py b/runagent/client/client.py index 25000a9..a8f72df 100644 --- a/runagent/client/client.py +++ b/runagent/client/client.py @@ -2,10 +2,11 @@ from runagent.sdk.rest_client import RestClient from runagent.utils.agent import detect_framework, validate_agent from runagent.sdk.socket_client import SocketClient +import requests class RunAgentClient: - def __init__(self, agent_id: str, local: bool = True): + def __init__(self, agent_id: str, local: bool = True, port: int = None, host: str = None): self.sdk = RunAgentSDK() self.agent_id = agent_id self.local = local @@ -16,8 +17,23 @@ def __init__(self, agent_id: str, local: bool = True): raise ValueError(f"Agent {agent_id} not found in local DB") self.agent_info = agent_info - agent_host = self.agent_info["host"] - agent_port = self.agent_info["port"] + # Use provided port/host or fall back to database values + agent_host = host or self.agent_info["host"] + agent_port = port or self.agent_info["port"] + + # Try to connect to the specified port first + if port is not None: + test_url = f"http://{agent_host}:{port}/api/v1/health" + try: + response = requests.get(test_url, timeout=2) + if response.status_code == 200: + agent_port = port + print(f"āœ… Connected to server on port {port}") + else: + print(f"āš ļø Server on port {port} returned status {response.status_code}, using database port {agent_port}") + except requests.exceptions.RequestException: + print(f"āš ļø Could not connect to port {port}, using database port {agent_port}") + agent_base_url = f"http://{agent_host}:{agent_port}" agent_socket_url = f"ws://{agent_host}:{agent_port}" @@ -25,12 +41,30 @@ def __init__(self, agent_id: str, local: bool = True): self.socket_client = SocketClient( base_socket_url=agent_socket_url, api_prefix="/api/v1" - ) + ) else: self.rest_client = RestClient() self.socket_client = SocketClient() def run_generic(self, *input_args, **input_kwargs): + """ + Run agent with generic interface - simplified for easy testing + + Args: + *input_args: Can be a simple string message or complex arguments + **input_kwargs: Keyword arguments for the agent + + Returns: + Agent response + """ + # Handle simple string input + if len(input_args) == 1 and isinstance(input_args[0], str) and not input_kwargs: + # Convert simple string to messages format + input_kwargs = { + "messages": [{"role": "user", "content": input_args[0]}] + } + input_args = () + return self.rest_client.run_agent_generic( self.agent_id, input_args=input_args, input_kwargs=input_kwargs ) @@ -52,4 +86,4 @@ async def run_generic_stream(self, *input_args, **input_kwargs): async for item in self.socket_client.run_agent_generic_stream_async( self.agent_id, *input_args, **input_kwargs ): - yield item + yield item \ No newline at end of file diff --git a/runagent/sdk/sdk.py b/runagent/sdk/sdk.py index c8c0b08..12fccaa 100644 --- a/runagent/sdk/sdk.py +++ b/runagent/sdk/sdk.py @@ -46,6 +46,7 @@ def __init__( self.templates = TemplateManager() self.db_service = DBService() # self.local = LocalDeployment(self.config) + self.local = LocalDeploymentManager(self.db_service) self.remote = RemoteDeployment(self.config) # Validate configuration on initialization @@ -507,3 +508,114 @@ def __exit__(self, exc_type, exc_val, exc_tb): # String representation def __repr__(self): return f"RunAgentSDK(configured={self.is_configured()})" + + + + +class LocalDeploymentManager: + """Manager for local agent deployments""" + + def __init__(self, db_service): + self.db_service = db_service + + def list_agents(self) -> t.List[t.Dict[str, t.Any]]: + """List all locally deployed agents""" + agents = self.db_service.list_agents() + + # Add additional info like file existence + for agent in agents: + agent_path = Path(agent["agent_path"]) + agent["exists"] = agent_path.exists() + agent["deployment_exists"] = agent_path.exists() + agent["source_exists"] = agent_path.exists() + + return agents + + def get_capacity_info(self) -> t.Dict[str, t.Any]: + """Get local database capacity information""" + return self.db_service.get_database_capacity_info() + + def get_agent_info(self, agent_id: str) -> t.Dict[str, t.Any]: + """Get comprehensive information about a local agent""" + agent_data = self.db_service.get_agent(agent_id) + if not agent_data: + return {"success": False, "error": f"Agent {agent_id} not found"} + + # Add file existence checks + agent_path = Path(agent_data["agent_path"]) + + # Get agent statistics + stats = self.db_service.get_agent_stats(agent_id) + + return { + "success": True, + "agent_info": { + **agent_data, + "deployment_exists": agent_path.exists(), + "source_exists": agent_path.exists(), + "deployment_path": str(agent_path), + "folder_path": str(agent_path), + "stats": stats, + } + } + + def delete_agent(self, agent_id: str) -> t.Dict[str, t.Any]: + """Delete agent (disabled for safety)""" + return { + "success": False, + "error": "Agent deletion is disabled for safety. Use database cleanup instead.", + } + + def cleanup_old_records(self, days_old: int = 30) -> t.Dict[str, t.Any]: + """Clean up old database records""" + try: + deleted_count = self.db_service.cleanup_old_runs(days_old) + return { + "success": True, + "message": f"Cleaned up {deleted_count} old run records", + "deleted_count": deleted_count, + } + except Exception as e: + return {"success": False, "error": str(e)} + + def get_database_stats(self) -> t.Dict[str, t.Any]: + """Get database statistics""" + return self.db_service.get_database_stats() + + def run_agent(self, agent_id: str, input_data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + """Run agent locally via HTTP""" + try: + agent_info = self.db_service.get_agent(agent_id) + if not agent_info: + return {"success": False, "error": f"Agent {agent_id} not found"} + + # Try to connect to the agent's server + import requests + import time + + start_time = time.time() + url = f"http://{agent_info['host']}:{agent_info['port']}/api/v1/agents/{agent_id}/execute/generic" + + try: + response = requests.post(url, json={"input_data": input_data}, timeout=30) + execution_time = time.time() - start_time + + if response.status_code == 200: + result = response.json() + result["execution_time"] = execution_time + return result + else: + return { + "success": False, + "error": f"Server returned status {response.status_code}: {response.text}", + "execution_time": execution_time, + } + except requests.exceptions.RequestException as e: + execution_time = time.time() - start_time + return { + "success": False, + "error": f"Server not reachable: {str(e)}", + "execution_time": execution_time, + } + except Exception as e: + return {"success": False, "error": str(e)} \ No newline at end of file diff --git a/runagent/sdk/server/framework/__init__.py b/runagent/sdk/server/framework/__init__.py index 1a662a4..963b9ce 100644 --- a/runagent/sdk/server/framework/__init__.py +++ b/runagent/sdk/server/framework/__init__.py @@ -2,6 +2,8 @@ from typing import Dict from runagent.sdk.server.framework.langgraph import LangGraphExecutor +from runagent.sdk.server.framework.langchain import LangChainExecutor +from runagent.sdk.server.framework.generic import GenericExecutor from runagent.utils.schema import EntryPoint, EntryPointType @@ -15,9 +17,14 @@ def get_executor( valid_entrypoint_found = True break if not valid_entrypoint_found: - raise ValueError(f"No valid entrypoint type found in agent configuration. Valid types are: {[t.value for t in EntrypointType]}") + raise ValueError(f"No valid entrypoint type found in agent configuration. Valid types are: {[t.value for t in EntryPointType]}") if framework == "langgraph": return LangGraphExecutor(agent_dir, agent_entrypoints) + elif framework == "langchain": + return LangChainExecutor(agent_dir, agent_entrypoints) + elif framework in ["llamaindex", "crewai", "autogen", "letta"]: + # Use generic executor for other frameworks + return GenericExecutor(agent_dir, agent_entrypoints) else: - raise ValueError(f"Framework {framework} not supported yet.") + raise ValueError(f"Framework {framework} not supported yet.") \ No newline at end of file diff --git a/runagent/sdk/server/framework/generic.py b/runagent/sdk/server/framework/generic.py index 3ca4d44..e5fbf75 100644 --- a/runagent/sdk/server/framework/generic.py +++ b/runagent/sdk/server/framework/generic.py @@ -29,44 +29,125 @@ def __init__(self, agent_dir: Path, agent_entrypoints: Dict[str, EntryPoint]): ) if generic_stream_ep else None def entrypoint_resolver(self, entrypoint_filepath: Path, entrypoint_module: str): + """ + Enhanced entrypoint resolver that handles: + - Functions: "solve_problem" + - Object methods: "app.invoke" + - Class methods: "MyClass.method" + - Nested attributes: "obj.attr.method" + """ print(f"DEBUG: Resolving entrypoint - filepath: {entrypoint_filepath}, module: {entrypoint_module}") - primary_module, secondary_attr = ( - entrypoint_module.split(".", 1) + [""] - )[:2] - print(f"DEBUG: Split module - primary: {primary_module}, secondary: {secondary_attr}") - resolved_module = self.importer.resolve_import( + # Split the module path + module_parts = entrypoint_module.split(".") + primary_module = module_parts[0] + attribute_chain = module_parts[1:] if len(module_parts) > 1 else [] + + print(f"DEBUG: Primary module: {primary_module}, Attribute chain: {attribute_chain}") + + # Import the primary module/object + try: + resolved_module = self.importer.resolve_import( entrypoint_filepath, primary_module ) - print(f"DEBUG: Resolved primary module: {resolved_module}") + print(f"DEBUG: Resolved primary module: {resolved_module}") + except Exception as e: + print(f"ERROR: Failed to resolve primary module '{primary_module}': {e}") + raise - if secondary_attr: - attrs = secondary_attr.split('.') - print(f"DEBUG: Secondary attributes to resolve: {attrs}") - for attr in attrs: - resolved_module = getattr(resolved_module, attr) - print(f"DEBUG: Resolved attribute {attr}: {resolved_module}") - - print(f"DEBUG: Final resolved module: {resolved_module}") - return resolved_module + # Navigate through the attribute chain + current_object = resolved_module + for i, attr in enumerate(attribute_chain): + print(f"DEBUG: Resolving attribute '{attr}' on {type(current_object)}") + try: + if hasattr(current_object, attr): + current_object = getattr(current_object, attr) + print(f"DEBUG: Successfully resolved '{attr}' -> {type(current_object)}") + else: + # Try to handle special cases + available_attrs = [a for a in dir(current_object) if not a.startswith('_')] + print(f"ERROR: Attribute '{attr}' not found. Available attributes: {available_attrs}") + raise AttributeError(f"'{type(current_object).__name__}' object has no attribute '{attr}'") + except Exception as e: + print(f"ERROR: Failed to resolve attribute '{attr}': {e}") + raise + + print(f"DEBUG: Final resolved entrypoint: {current_object}") + + # Validate that the final object is callable + if not callable(current_object): + raise ValueError(f"Entrypoint '{entrypoint_module}' resolved to non-callable object: {type(current_object)}") + + return current_object def generic(self, *input_args, **input_kwargs): if self._generic_entrypoint is None: raise ValueError("No `generic` entrypoint found in agent config") - result_obj = self._generic_entrypoint(*input_args, **input_kwargs) - result_json = self.serializer.serialize_object(result_obj) - return result_json + + print(f"DEBUG: Calling entrypoint with args: {input_args}, kwargs: {input_kwargs}") + + # Smart argument handling based on the type of entrypoint + try: + # Check if this looks like a LangGraph app.invoke method + if hasattr(self._generic_entrypoint, '__self__') and hasattr(self._generic_entrypoint.__self__, 'invoke'): + # This is likely a method on a LangGraph app + print("DEBUG: Detected LangGraph-style app.invoke method") + + # Convert arguments to LangGraph state format + if input_kwargs: + # Use kwargs as the state + langgraph_input = input_kwargs + elif input_args and len(input_args) == 1 and isinstance(input_args[0], dict): + # Single dict argument + langgraph_input = input_args[0] + elif input_args: + # Convert positional args to state + langgraph_input = {"input": input_args[0] if len(input_args) == 1 else input_args} + else: + langgraph_input = {} + + print(f"DEBUG: LangGraph input: {langgraph_input}") + result_obj = self._generic_entrypoint(langgraph_input) + else: + # Regular function call + print("DEBUG: Regular function call") + result_obj = self._generic_entrypoint(*input_args, **input_kwargs) + + result_json = self.serializer.serialize_object(result_obj) + return result_json + + except Exception as e: + print(f"ERROR: Entrypoint execution failed: {e}") + print(f"ERROR: Entrypoint type: {type(self._generic_entrypoint)}") + if hasattr(self._generic_entrypoint, '__self__'): + print(f"ERROR: Entrypoint self: {type(self._generic_entrypoint.__self__)}") + raise async def generic_stream(self, *input_args, **input_kwargs): if self._generic_stream_entrypoint is None: raise ValueError("No `generic_stream` entrypoint found in agent config") - for chunk in self._generic_stream_entrypoint( - *input_args, - **input_kwargs - ): - yield chunk - await asyncio.sleep(0) - + # Similar logic for streaming + try: + if hasattr(self._generic_stream_entrypoint, '__self__') and hasattr(self._generic_stream_entrypoint.__self__, 'stream'): + # LangGraph app.stream method + if input_kwargs: + langgraph_input = input_kwargs + elif input_args and len(input_args) == 1 and isinstance(input_args[0], dict): + langgraph_input = input_args[0] + else: + langgraph_input = {"input": input_args[0] if len(input_args) == 1 else input_args} + + for chunk in self._generic_stream_entrypoint(langgraph_input): + yield chunk + await asyncio.sleep(0) + else: + # Regular generator function + for chunk in self._generic_stream_entrypoint(*input_args, **input_kwargs): + yield chunk + await asyncio.sleep(0) + except Exception as e: + print(f"ERROR: Stream entrypoint execution failed: {e}") + raise \ No newline at end of file diff --git a/runagent/sdk/server/framework/langchain.py b/runagent/sdk/server/framework/langchain.py new file mode 100644 index 0000000..d57e484 --- /dev/null +++ b/runagent/sdk/server/framework/langchain.py @@ -0,0 +1,113 @@ +from pathlib import Path +from typing import Dict, Any + +from runagent.sdk.server.framework.generic import GenericExecutor +from runagent.utils.schema import EntryPoint + + +class LangChainExecutor(GenericExecutor): + """Executor for LangChain agents with framework-specific input handling""" + + def __init__(self, agent_dir: Path, agent_entrypoints: Dict[str, EntryPoint]): + super().__init__(agent_dir, agent_entrypoints) + + def generic(self, *input_args, **input_kwargs): + """ + LangChain-specific generic execution that formats inputs correctly + """ + if self._generic_entrypoint is None: + raise ValueError("No `generic` entrypoint found in agent config") + + print(f"DEBUG: LangChain generic execution with args: {input_args}, kwargs: {input_kwargs}") + + # Convert inputs to LangChain's expected format + input_data = self._format_langchain_input(*input_args, **input_kwargs) + + print(f"DEBUG: LangChain formatted input: {input_data}") + + try: + # Call the LangChain run function with properly formatted input_data + result_obj = self._generic_entrypoint(input_data) + result_json = self.serializer.serialize_object(result_obj) + return result_json + + except Exception as e: + print(f"ERROR: LangChain execution failed: {e}") + print(f"ERROR: Entrypoint type: {type(self._generic_entrypoint)}") + raise + + def _format_langchain_input(self, *input_args, **input_kwargs) -> Dict[str, Any]: + """ + Convert generic inputs to LangChain's expected input_data format + + LangChain expects: {"messages": [...], "config": {...}} + """ + input_data = {"config": {}} + + # Handle different input patterns + if len(input_args) == 1 and isinstance(input_args[0], dict): + # Single dict argument - use as base + base_data = input_args[0] + if "messages" in base_data: + input_data.update(base_data) + else: + # Convert dict to messages format + input_data["messages"] = [{"role": "user", "content": str(base_data)}] + + elif len(input_args) == 1 and isinstance(input_args[0], str): + # Single string argument - convert to message + input_data["messages"] = [{"role": "user", "content": input_args[0]}] + + elif input_args: + # Multiple args - convert to messages + content = " ".join(str(arg) for arg in input_args) + input_data["messages"] = [{"role": "user", "content": content}] + + # Handle keyword arguments + if "messages" in input_kwargs: + # Direct messages provided + input_data["messages"] = input_kwargs["messages"] + + elif "query" in input_kwargs or "message" in input_kwargs: + # Query/message format - convert to messages + content = input_kwargs.get("query") or input_kwargs.get("message") + input_data["messages"] = [{"role": "user", "content": str(content)}] + + elif input_kwargs and "messages" not in input_data: + # Other kwargs - convert to message content + content = str(input_kwargs) + input_data["messages"] = [{"role": "user", "content": content}] + + # Add config from kwargs + if "config" in input_kwargs: + input_data["config"].update(input_kwargs["config"]) + + # Add other relevant kwargs to config + config_keys = ["temperature", "model", "verbose", "max_iterations"] + for key in config_keys: + if key in input_kwargs: + input_data["config"][key] = input_kwargs[key] + + # Ensure we have messages + if "messages" not in input_data or not input_data["messages"]: + input_data["messages"] = [{"role": "user", "content": "Hello"}] + + return input_data + + async def generic_stream(self, *input_args, **input_kwargs): + """ + LangChain-specific streaming execution + """ + if self._generic_stream_entrypoint is None: + raise ValueError("No `generic_stream` entrypoint found in agent config") + + # Format input the same way + input_data = self._format_langchain_input(*input_args, **input_kwargs) + + try: + # For LangChain streaming (if implemented) + for chunk in self._generic_stream_entrypoint(input_data): + yield chunk + except Exception as e: + print(f"ERROR: LangChain stream execution failed: {e}") + raise \ No newline at end of file diff --git a/runagent/sdk/server/local_server.py b/runagent/sdk/server/local_server.py index 5d0ff80..43aedf2 100644 --- a/runagent/sdk/server/local_server.py +++ b/runagent/sdk/server/local_server.py @@ -165,19 +165,19 @@ def from_id(agent_id: str) -> "LocalServer": host=agent["host"], ) - # Modified from_path method for LocalServer class @staticmethod def from_path( path: str, port: int = 8450, host: str = "127.0.0.1" ) -> "LocalServer": """ Create LocalServer instance from an agent path. - If an agent from the same path already exists, use that agent's configuration. + If an agent from the same path already exists, use that agent's configuration + but allow port/host override. Args: path: Path to agent directory - port: Port to run server on (ignored if existing agent found) - host: Host to bind to (ignored if existing agent found) + port: Port to run server on + host: Host to bind to Returns: LocalServer instance @@ -189,7 +189,7 @@ def from_path( existing_agent = db_service.get_agent_by_path(str(agent_path)) if existing_agent: - # Agent already exists - use existing configuration + # Agent already exists - use existing configuration but allow port/host override console.print(f"šŸ” [yellow]Found existing agent for path: {agent_path}[/yellow]") console.print(f"šŸ“‹ [cyan]Agent Details:[/cyan]") console.print(f" • Agent ID: [bold magenta]{existing_agent['agent_id']}[/bold magenta]") @@ -204,13 +204,33 @@ def from_path( if existing_agent['last_run']: console.print(f" • Last Run: [dim]{existing_agent['last_run']}[/dim]") - console.print(f"\nšŸ”„ [green]Reusing existing agent configuration[/green]") + # Check if we're using different port/host than stored + if port != existing_agent['port'] or host != existing_agent['host']: + console.print(f"šŸ”„ [yellow]Overriding server config:[/yellow]") + console.print(f" • New Host: [blue]{host}[/blue] (was {existing_agent['host']})") + console.print(f" • New Port: [blue]{port}[/blue] (was {existing_agent['port']})") + + # Update the database with new host/port using proper SQLAlchemy syntax + try: + from sqlalchemy import text + with db_service.db_manager.get_session() as session: + session.execute( + text("UPDATE agents SET host = :host, port = :port WHERE agent_id = :agent_id"), + {"host": host, "port": port, "agent_id": existing_agent['agent_id']} + ) + session.commit() + console.print(f"āœ… [green]Database updated with new server config[/green]") + except Exception as e: + console.print(f"āš ļø [yellow]Warning: Could not update database: {e}[/yellow]") + console.print(f" Server will still run on requested port {port}") + else: + console.print(f"šŸ”„ [green]Reusing existing agent configuration[/green]") return LocalServer( agent_path=agent_path, agent_id=existing_agent['agent_id'], - port=existing_agent['port'], # Use existing port - host=existing_agent['host'], # Use existing host + port=port, # Use the requested port + host=host, # Use the requested host db_service=db_service, ) @@ -248,7 +268,6 @@ def from_path( host=host, db_service=db_service, ) - def _setup_routes(self): """Setup FastAPI routes""" diff --git a/templates/langchain/chatbot/__init__.py b/templates/langchain/chatbot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/templates/langchain/chatbot/agent.py b/templates/langchain/chatbot/agent.py new file mode 100644 index 0000000..bbc26ac --- /dev/null +++ b/templates/langchain/chatbot/agent.py @@ -0,0 +1,55 @@ +import os +from typing import Any, Dict + +from dotenv import load_dotenv +from langchain.chains import ConversationChain +from langchain.memory import ConversationBufferMemory +from langchain_core.messages import AIMessage, HumanMessage +from langchain_openai import ChatOpenAI + +# Load environment variables +load_dotenv() + + +class LangChainBasicAgent: + """Basic LangChain agent with conversation memory""" + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + self.llm = ChatOpenAI( + temperature=self.config.get("temperature", 0.7), + model=self.config.get("model", "gpt-4o-mini"), + api_key=os.getenv("OPENAI_API_KEY"), + ) + self.memory = ConversationBufferMemory() + self.conversation = ConversationChain( + llm=self.llm, memory=self.memory, verbose=self.config.get("verbose", False) + ) + + def process_message(self, message: str) -> str: + """Process a single message and return response""" + try: + response = self.conversation.predict(input=message) + return response + except Exception as e: + raise Exception(f"Error processing message: {str(e)}") + + def process_messages(self, messages: list) -> str: + """Process a list of messages and return the final response""" + if not messages: + return "No messages provided" + + # Add previous messages to memory (except the last one) + for msg in messages[:-1]: + if msg.get("role") == "user": + self.memory.chat_memory.add_user_message(msg["content"]) + elif msg.get("role") == "assistant": + self.memory.chat_memory.add_ai_message(msg["content"]) + + # Process the last message + last_message = messages[-1]["content"] + return self.process_message(last_message) + + def get_conversation_history(self) -> list: + """Get the conversation history""" + return self.memory.chat_memory.messages \ No newline at end of file diff --git a/templates/langchain/chatbot/main.py b/templates/langchain/chatbot/main.py new file mode 100644 index 0000000..6c476f6 --- /dev/null +++ b/templates/langchain/chatbot/main.py @@ -0,0 +1,68 @@ +import time +from typing import Any, Dict + +from .agent import LangChainBasicAgent + + +def run(input_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Main entry point for the LangChain Basic agent + + Args: + input_data: Dictionary containing: + - messages: List of message objects with 'role' and 'content' + - config: Optional configuration parameters + + Returns: + Dictionary with result, errors, and success status + """ + start_time = time.time() + + try: + # Extract configuration + config = input_data.get("config", {}) + messages = input_data.get("messages", []) + + if not messages: + return { + "result": { + "type": "string", + "content": "No messages provided", + "metadata": {"execution_time": time.time() - start_time}, + }, + "errors": ["No messages provided"], + "success": False, + } + + # Initialize agent + agent = LangChainBasicAgent(config) + + # Process messages + response = agent.process_messages(messages) + + # Calculate execution time + execution_time = time.time() - start_time + + return { + "result": { + "type": "string", + "content": response, + "metadata": { + "model_used": config.get("model", "gpt-4o-mini"), + "framework": "langchain", + "template": "basic", + "execution_time": execution_time + }, + }, + "errors": [], + "success": True, + } + + except Exception as e: + execution_time = time.time() - start_time + return { + "result": None, + "errors": [str(e)], + "success": False, + "metadata": {"execution_time": execution_time}, + } \ No newline at end of file diff --git a/templates/langchain/chatbot/runagent.config.json b/templates/langchain/chatbot/runagent.config.json new file mode 100644 index 0000000..1740dfa --- /dev/null +++ b/templates/langchain/chatbot/runagent.config.json @@ -0,0 +1,25 @@ +{ + "agent_name": "langchain_basic_test", + "description": "Basic LangChain agent for testing", + "framework": "langchain", + "template": "basic", + "version": "1.0.0", + "created_at": "2025-06-29 10:00:00", + "template_source": { + "repo_url": "https://github.com/runagent-dev/runagent.git", + "path": "/home/riamdriad5/runagent/runagent/templates/langchain/chatbot", + "author": "test" + }, + "agent_architecture": { + "entrypoints": [ + { + "file": "main.py", + "module": "run", + "type": "generic" + } + ] + }, + "env_vars": { + "OPENAI_API_KEY": "Your OPENAI_API_KEY" + } +} \ No newline at end of file diff --git a/templates/langgraph/problem_solver/runagent.config.json b/templates/langgraph/problem_solver/runagent.config.json index c49dfaf..8848068 100644 --- a/templates/langgraph/problem_solver/runagent.config.json +++ b/templates/langgraph/problem_solver/runagent.config.json @@ -7,7 +7,7 @@ "created_at": "2025-06-25 13:42:03", "template_source": { "repo_url": "https://github.com/runagent-dev/runagent.git", - "path": "templates/langgraph/problem_solver", + "path": "/home/riamdriad5/runagent/runagent/templates/langgraph/problem_solver", "author": "sawradip" }, "agent_architecture": { @@ -16,15 +16,10 @@ "file": "agents.py", "module": "app.invoke", "type": "generic" - }, - { - "file": "agents.py", - "module": "app.stream", - "type": "generic_stream" } ] }, "env_vars": { - "OPENAI_API_KEY": "sk-proj-1234567890" + "OPENAI_API_KEY": "Your OPENAI_API_KEY" } } \ No newline at end of file diff --git a/test/test_sdk.py b/test/test_sdk.py new file mode 100644 index 0000000..94616f3 --- /dev/null +++ b/test/test_sdk.py @@ -0,0 +1,12 @@ +from runagent import RunAgentClient + +# Connect to an already deployed agent +client = RunAgentClient(agent_id="d606beb5-a391-409d-9b5d-2adf86842292", local=True) + +# Execute with generic interface +result = client.run_generic( + query="My mobile has green screen", + num_solutions=3 +) + +print(result) \ No newline at end of file diff --git a/test/test_sdk_langchain.py b/test/test_sdk_langchain.py new file mode 100644 index 0000000..8ade9ba --- /dev/null +++ b/test/test_sdk_langchain.py @@ -0,0 +1,5 @@ +from runagent import RunAgentClient + +client = RunAgentClient(agent_id="694624f8-5e66-4f23-b0b7-28d00d75931c", local=True, port=8460) +result = client.run_generic("My mobile has green screen") +print(result) \ No newline at end of file