From 7091f454680bb95b63ece7dc1799104da3f7f32c Mon Sep 17 00:00:00 2001 From: Jason Yang Date: Tue, 29 Jul 2025 01:04:17 -0700 Subject: [PATCH] Adding changes --- .../080_batch_events/test_batch_events.py | 5 +- .../010_agent_chat/project/run_worker.py | 1 + .../020_state_machine/project/run_worker.py | 1 - .../020_state_machine/project/workflow.py | 2 +- .../deep_research/clarify_user_query.py | 1 + .../lib/cli/handlers/cleanup_handlers.py | 24 ++- .../lib/cli/handlers/deploy_handlers.py | 23 ++- src/agentex/lib/cli/handlers/run_handlers.py | 112 +++----------- src/agentex/lib/cli/utils/path_utils.py | 143 ++++++++++++++++++ 9 files changed, 199 insertions(+), 113 deletions(-) create mode 100644 src/agentex/lib/cli/utils/path_utils.py diff --git a/examples/tutorials/10_agentic/00_base/080_batch_events/test_batch_events.py b/examples/tutorials/10_agentic/00_base/080_batch_events/test_batch_events.py index 1763fa2c..216d7c16 100644 --- a/examples/tutorials/10_agentic/00_base/080_batch_events/test_batch_events.py +++ b/examples/tutorials/10_agentic/00_base/080_batch_events/test_batch_events.py @@ -10,7 +10,8 @@ # Configuration BASE_URL = "http://localhost:5003" -AGENT_ID = "b4f32d71-ff69-4ac9-84d1-eb2937fea0c7" +# AGENT_ID = "b4f32d71-ff69-4ac9-84d1-eb2937fea0c7" +AGENT_ID = "58e78cd0-c898-4009-b5d9-eada8ebcad83" RPC_ENDPOINT = f"{BASE_URL}/agents/{AGENT_ID}/rpc" async def send_rpc_request(method: str, params: dict): @@ -107,4 +108,4 @@ async def main(): print(f"📋 Task ID: {task_id}") if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/examples/tutorials/10_agentic/10_temporal/010_agent_chat/project/run_worker.py b/examples/tutorials/10_agentic/10_temporal/010_agent_chat/project/run_worker.py index c92f1541..834f7287 100644 --- a/examples/tutorials/10_agentic/10_temporal/010_agent_chat/project/run_worker.py +++ b/examples/tutorials/10_agentic/10_temporal/010_agent_chat/project/run_worker.py @@ -8,6 +8,7 @@ from workflow import At010AgentChatWorkflow + environment_variables = EnvironmentVariables.refresh() logger = make_logger(__name__) diff --git a/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/run_worker.py b/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/run_worker.py index a5e6049a..8db0a50a 100644 --- a/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/run_worker.py +++ b/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/run_worker.py @@ -1,5 +1,4 @@ import asyncio -import os from agentex.lib.core.temporal.activities import get_all_activities from agentex.lib.core.temporal.workers.worker import AgentexWorker diff --git a/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflow.py b/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflow.py index 6e374bde..209c13cf 100644 --- a/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflow.py +++ b/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflow.py @@ -1,5 +1,4 @@ import asyncio -import json from typing import override from temporalio import workflow @@ -26,6 +25,7 @@ if environment_variables.AGENT_NAME is None: raise ValueError("Environment variable AGENT_NAME is not set") + logger = make_logger(__name__) @workflow.defn(name=environment_variables.WORKFLOW_NAME) diff --git a/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflows/deep_research/clarify_user_query.py b/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflows/deep_research/clarify_user_query.py index 4ef6834a..c00ef770 100644 --- a/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflows/deep_research/clarify_user_query.py +++ b/examples/tutorials/10_agentic/10_temporal/020_state_machine/project/workflows/deep_research/clarify_user_query.py @@ -10,6 +10,7 @@ logger = make_logger(__name__) + FOLLOW_UP_QUESTION_TEMPLATE = """ Given the following research query from the user, ask a follow up question to clarify the research direction. diff --git a/src/agentex/lib/cli/handlers/cleanup_handlers.py b/src/agentex/lib/cli/handlers/cleanup_handlers.py index 59705dfd..0bf66dfc 100644 --- a/src/agentex/lib/cli/handlers/cleanup_handlers.py +++ b/src/agentex/lib/cli/handlers/cleanup_handlers.py @@ -168,19 +168,13 @@ def cleanup_single_task(client: Agentex, agent_name: str, task_id: str) -> None: """ try: # Use the agent RPC method to cancel the task - try: - client.agents.rpc_by_name( - agent_name=agent_name, - method="task/cancel", - params={"task_id": task_id} - ) - logger.debug(f"Successfully cancelled task {task_id} via agent '{agent_name}'") - except Exception as e: - # If RPC cancel fails, try direct task deletion as fallback - logger.warning(f"RPC task/cancel failed for task {task_id}, trying direct deletion: {e}") - client.tasks.delete(task_id=task_id) - logger.debug(f"Successfully deleted task {task_id} directly") - + client.agents.rpc_by_name( + agent_name=agent_name, + method="task/cancel", + params={"task_id": task_id} + ) + logger.debug(f"Successfully cancelled task {task_id} via agent '{agent_name}'") + except Exception as e: - logger.warning(f"Failed to cleanup task {task_id}: {e}") - raise \ No newline at end of file + logger.warning(f"RPC task/cancel failed for task {task_id}: {e}") + raise \ No newline at end of file diff --git a/src/agentex/lib/cli/handlers/deploy_handlers.py b/src/agentex/lib/cli/handlers/deploy_handlers.py index 792f5351..e26928b9 100644 --- a/src/agentex/lib/cli/handlers/deploy_handlers.py +++ b/src/agentex/lib/cli/handlers/deploy_handlers.py @@ -11,6 +11,7 @@ from agentex.lib.cli.utils.auth_utils import _encode_principal_context from agentex.lib.cli.utils.exceptions import DeploymentError, HelmError from agentex.lib.cli.utils.kubectl_utils import check_and_switch_cluster_context +from agentex.lib.cli.utils.path_utils import calculate_docker_acp_module, PathResolutionError from agentex.lib.environment_variables import EnvVarKeys from agentex.lib.sdk.config.agent_config import AgentConfig from agentex.lib.sdk.config.agent_manifest import AgentManifest @@ -100,10 +101,24 @@ def convert_env_vars_dict_to_list(env_vars: dict[str, str]) -> list[dict[str, st return [{"name": key, "value": value} for key, value in env_vars.items()] +def add_acp_command_to_helm_values(helm_values: dict[str, Any], manifest: AgentManifest, manifest_path: str) -> None: + """Add dynamic ACP command to helm values based on manifest configuration""" + try: + docker_acp_module = calculate_docker_acp_module(manifest, manifest_path) + # Create the uvicorn command with the correct module path + helm_values["command"] = ["uvicorn", f"{docker_acp_module}:acp", "--host", "0.0.0.0", "--port", "8000"] + logger.info(f"Using dynamic ACP command: uvicorn {docker_acp_module}:acp") + except (PathResolutionError, Exception) as e: + # Fallback to default command structure + logger.warning(f"Could not calculate dynamic ACP module ({e}), using default: project.acp") + helm_values["command"] = ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + + def merge_deployment_configs( manifest: AgentManifest, cluster_config: ClusterConfig | None, deploy_overrides: InputDeployOverrides, + manifest_path: str, ) -> dict[str, Any]: agent_config: AgentConfig = manifest.agent @@ -231,10 +246,16 @@ def merge_deployment_configs( # Convert the env vars to a list of dictionaries if "env" in helm_values: helm_values["env"] = convert_env_vars_dict_to_list(helm_values["env"]) + + # Convert the temporal worker env vars to a list of dictionaries if TEMPORAL_WORKER_KEY in helm_values and "env" in helm_values[TEMPORAL_WORKER_KEY]: helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list( helm_values[TEMPORAL_WORKER_KEY]["env"] ) + + # Add dynamic ACP command based on manifest configuration + add_acp_command_to_helm_values(helm_values, manifest, manifest_path) + print("Deploying with the following helm values: ", helm_values) return helm_values @@ -290,7 +311,7 @@ def deploy_agent( add_helm_repo() # Merge configurations - helm_values = merge_deployment_configs(manifest, override_config, deploy_overrides) + helm_values = merge_deployment_configs(manifest, override_config, deploy_overrides, manifest_path) # Create values file values_file = create_helm_values_file(helm_values) diff --git a/src/agentex/lib/cli/handlers/run_handlers.py b/src/agentex/lib/cli/handlers/run_handlers.py index f49f43cf..76ac4e0b 100644 --- a/src/agentex/lib/cli/handlers/run_handlers.py +++ b/src/agentex/lib/cli/handlers/run_handlers.py @@ -11,6 +11,11 @@ cleanup_agent_workflows, should_cleanup_on_restart ) +from agentex.lib.cli.utils.path_utils import ( + get_file_paths, + calculate_uvicorn_target_for_local, +) + from agentex.lib.environment_variables import EnvVarKeys from agentex.lib.sdk.config.agent_manifest import AgentManifest from agentex.lib.utils.logging import make_logger @@ -104,7 +109,10 @@ async def start_worker() -> asyncio.subprocess.Process: # PRE-RESTART CLEANUP - NEW! if current_process is not None: # Extract agent name from worker path for cleanup - agent_name = worker_path.parent.parent.name + + agent_name = env.get("AGENT_NAME") + if agent_name is None: + agent_name = worker_path.parent.parent.name # Perform cleanup if configured if should_cleanup_on_restart(): @@ -180,15 +188,17 @@ async def start_worker() -> asyncio.subprocess.Process: async def start_acp_server( - acp_path: Path, port: int, env: dict[str, str] + acp_path: Path, port: int, env: dict[str, str], manifest_dir: Path ) -> asyncio.subprocess.Process: """Start the ACP server process""" - # Use the actual file path instead of module path for better reload detection + # Use file path relative to manifest directory if possible + uvicorn_target = calculate_uvicorn_target_for_local(acp_path, manifest_dir) + cmd = [ sys.executable, "-m", "uvicorn", - f"{acp_path.parent.name}.acp:acp", + f"{uvicorn_target}:acp", "--reload", "--reload-dir", str(acp_path.parent), # Watch the project directory specifically @@ -201,7 +211,7 @@ async def start_acp_server( console.print(f"[blue]Starting ACP server from {acp_path} on port {port}...[/blue]") return await asyncio.create_subprocess_exec( *cmd, - cwd=acp_path.parent.parent, + cwd=manifest_dir, # Always use manifest directory as CWD for consistency env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, @@ -218,7 +228,7 @@ async def start_temporal_worker( return await asyncio.create_subprocess_exec( *cmd, - cwd=worker_path.parent, + cwd=worker_path.parent, # Use worker directory as CWD for imports to work env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, @@ -280,8 +290,9 @@ async def run_agent(manifest_path: str): ) # Start ACP server + manifest_dir = Path(manifest_path).parent acp_process = await start_acp_server( - file_paths["acp"], manifest.local_development.agent.port, agent_env + file_paths["acp"], manifest.local_development.agent.port, agent_env, manifest_dir ) process_manager.add_process(acp_process) @@ -291,7 +302,7 @@ async def run_agent(manifest_path: str): tasks = [acp_output_task] # Start temporal worker if needed - if is_temporal_agent(manifest): + if is_temporal_agent(manifest) and file_paths["worker"]: worker_task = await start_temporal_worker_with_reload(file_paths["worker"], agent_env, process_manager) tasks.append(worker_task) @@ -323,92 +334,7 @@ async def run_agent(manifest_path: str): await process_manager.cleanup_processes() -def resolve_and_validate_path(base_path: Path, configured_path: str, file_type: str) -> Path: - """Resolve and validate a configured path""" - path_obj = Path(configured_path) - - if path_obj.is_absolute(): - # Absolute path - use as-is - resolved_path = path_obj - else: - # Relative path - resolve relative to manifest directory - resolved_path = (base_path / configured_path).resolve() - - # Validate the file exists - if not resolved_path.exists(): - raise RunError( - f"{file_type} file not found: {resolved_path}\n" - f" Configured path: {configured_path}\n" - f" Resolved from manifest: {base_path}" - ) - - # Validate it's actually a file - if not resolved_path.is_file(): - raise RunError(f"{file_type} path is not a file: {resolved_path}") - - return resolved_path - - -def validate_path_security(resolved_path: Path, manifest_dir: Path) -> None: - """Basic security validation for resolved paths""" - try: - # Ensure the resolved path is accessible - resolved_path.resolve() - - # Optional: Add warnings for paths that go too far up - try: - # Check if path goes more than 3 levels up from manifest - relative_to_manifest = resolved_path.relative_to(manifest_dir.parent.parent.parent) - if str(relative_to_manifest).startswith(".."): - logger.warning( - f"Path goes significantly outside project structure: {resolved_path}" - ) - except ValueError: - # Path is outside the tree - that's okay, just log it - logger.info(f"Using path outside manifest directory tree: {resolved_path}") - - except Exception as e: - raise RunError(f"Path resolution failed: {resolved_path} - {str(e)}") from e - -def get_file_paths(manifest: AgentManifest, manifest_path: str) -> dict[str, Path]: - """Get resolved file paths from manifest configuration""" - manifest_dir = Path(manifest_path).parent.resolve() - - # Use configured paths or fall back to defaults for backward compatibility - if manifest.local_development and manifest.local_development.paths: - paths_config = manifest.local_development.paths - - # Resolve ACP path - acp_path = resolve_and_validate_path(manifest_dir, paths_config.acp, "ACP server") - validate_path_security(acp_path, manifest_dir) - - # Resolve worker path if specified - worker_path = None - if paths_config.worker: - worker_path = resolve_and_validate_path( - manifest_dir, paths_config.worker, "Temporal worker" - ) - validate_path_security(worker_path, manifest_dir) - else: - # Backward compatibility: use old hardcoded structure - project_dir = manifest_dir / "project" - acp_path = project_dir / "acp.py" - worker_path = project_dir / "run_worker.py" if is_temporal_agent(manifest) else None - - # Validate backward compatibility paths - if not acp_path.exists(): - raise RunError(f"ACP file not found: {acp_path}") - - if worker_path and not worker_path.exists(): - raise RunError(f"Worker file not found: {worker_path}") - - return { - "acp": acp_path, - "worker": worker_path, - "acp_dir": acp_path.parent, - "worker_dir": worker_path.parent if worker_path else None, - } def create_agent_environment(manifest: AgentManifest) -> dict[str, str]: diff --git a/src/agentex/lib/cli/utils/path_utils.py b/src/agentex/lib/cli/utils/path_utils.py new file mode 100644 index 00000000..cdbb0a71 --- /dev/null +++ b/src/agentex/lib/cli/utils/path_utils.py @@ -0,0 +1,143 @@ +from pathlib import Path +from typing import Dict + +from agentex.lib.sdk.config.agent_manifest import AgentManifest +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + + +class PathResolutionError(Exception): + """An error occurred during path resolution""" + + +def resolve_and_validate_path(base_path: Path, configured_path: str, file_type: str) -> Path: + """Resolve and validate a configured path""" + path_obj = Path(configured_path) + + if path_obj.is_absolute(): + # Absolute path - resolve to canonical form + resolved_path = path_obj.resolve() + else: + # Relative path - resolve relative to manifest directory + resolved_path = (base_path / configured_path).resolve() + + # Validate the file exists + if not resolved_path.exists(): + raise PathResolutionError( + f"{file_type} file not found: {resolved_path}\n" + f" Configured path: {configured_path}\n" + f" Resolved from manifest: {base_path}" + ) + + # Validate it's actually a file + if not resolved_path.is_file(): + raise PathResolutionError(f"{file_type} path is not a file: {resolved_path}") + + return resolved_path + + +def validate_path_security(resolved_path: Path, manifest_dir: Path) -> None: + """Basic security validation for resolved paths""" + try: + # Ensure the resolved path is accessible + resolved_path.resolve() + + # Optional: Add warnings for paths that go too far up + try: + # Check if path goes more than 3 levels up from manifest + relative_to_manifest = resolved_path.relative_to(manifest_dir.parent.parent.parent) + if str(relative_to_manifest).startswith(".."): + logger.warning( + f"Path goes significantly outside project structure: {resolved_path}" + ) + except ValueError: + # Path is outside the tree - that's okay, just log it + logger.info(f"Using path outside manifest directory tree: {resolved_path}") + + except Exception as e: + raise PathResolutionError(f"Path resolution failed: {resolved_path} - {str(e)}") from e + + +def get_file_paths(manifest: AgentManifest, manifest_path: str) -> Dict[str, Path | None]: + """Get resolved file paths from manifest configuration""" + manifest_dir = Path(manifest_path).parent.resolve() + + # Use configured paths or fall back to defaults for backward compatibility + if manifest.local_development and manifest.local_development.paths: + paths_config = manifest.local_development.paths + + # Resolve ACP path + acp_path = resolve_and_validate_path(manifest_dir, paths_config.acp, "ACP server") + validate_path_security(acp_path, manifest_dir) + + # Resolve worker path if specified + worker_path = None + if paths_config.worker: + worker_path = resolve_and_validate_path( + manifest_dir, paths_config.worker, "Temporal worker" + ) + validate_path_security(worker_path, manifest_dir) + else: + # Backward compatibility: use old hardcoded structure + project_dir = manifest_dir / "project" + acp_path = (project_dir / "acp.py").resolve() + worker_path = (project_dir / "run_worker.py").resolve() if manifest.agent.is_temporal_agent() else None + + # Validate backward compatibility paths + if not acp_path.exists(): + raise PathResolutionError(f"ACP file not found: {acp_path}") + + if worker_path and not worker_path.exists(): + raise PathResolutionError(f"Worker file not found: {worker_path}") + + return { + "acp": acp_path, + "worker": worker_path, + "acp_dir": acp_path.parent, + "worker_dir": worker_path.parent if worker_path else None, + } + + +def calculate_uvicorn_target_for_local(acp_path: Path, manifest_dir: Path) -> str: + """Calculate the uvicorn target path for local development""" + # Ensure both paths are resolved to canonical form for accurate comparison + acp_resolved = acp_path.resolve() + manifest_resolved = manifest_dir.resolve() + + try: + # Try to use path relative to manifest directory + acp_relative = acp_resolved.relative_to(manifest_resolved) + # Convert to module notation: project/acp.py -> project.acp + module_path = str(acp_relative.with_suffix('')) # Remove .py extension + module_path = module_path.replace('/', '.') # Convert slashes to dots + module_path = module_path.replace('\\', '.') # Handle Windows paths + return module_path + except ValueError: + # Path cannot be made relative - use absolute file path + logger.warning(f"ACP file {acp_resolved} cannot be made relative to manifest directory {manifest_resolved}, using absolute file path") + return str(acp_resolved) + + +def calculate_docker_acp_module(manifest: AgentManifest, manifest_path: str) -> str: + """Calculate the Python module path for the ACP file in the Docker container + + This should return the same module notation as local development for consistency. + """ + # Use the same logic as local development + manifest_dir = Path(manifest_path).parent + + # Get the configured ACP path (could be relative or absolute) + if manifest.local_development and manifest.local_development.paths: + acp_config_path = manifest.local_development.paths.acp + else: + acp_config_path = "project/acp.py" # Default + + # Resolve to actual file path + acp_path = resolve_and_validate_path(manifest_dir, acp_config_path, "ACP") + + # Use the same module calculation as local development + return calculate_uvicorn_target_for_local(acp_path, manifest_dir) + + + \ No newline at end of file