Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

# Configuration
BASE_URL = "http://localhost:5003"
AGENT_ID = "b4f32d71-ff69-4ac9-84d1-eb2937fea0c7"
# AGENT_ID = "b4f32d71-ff69-4ac9-84d1-eb2937fea0c7"
AGENT_ID = "58e78cd0-c898-4009-b5d9-eada8ebcad83"
RPC_ENDPOINT = f"{BASE_URL}/agents/{AGENT_ID}/rpc"

async def send_rpc_request(method: str, params: dict):
Expand Down Expand Up @@ -107,4 +108,4 @@ async def main():
print(f"📋 Task ID: {task_id}")

if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from workflow import At010AgentChatWorkflow



environment_variables = EnvironmentVariables.refresh()

logger = make_logger(__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import os

from agentex.lib.core.temporal.activities import get_all_activities
from agentex.lib.core.temporal.workers.worker import AgentexWorker
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import json
from typing import override

from temporalio import workflow
Expand All @@ -26,6 +25,7 @@
if environment_variables.AGENT_NAME is None:
raise ValueError("Environment variable AGENT_NAME is not set")


logger = make_logger(__name__)

@workflow.defn(name=environment_variables.WORKFLOW_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

logger = make_logger(__name__)


FOLLOW_UP_QUESTION_TEMPLATE = """
Given the following research query from the user, ask a follow up question to clarify the research direction.
<query>
Expand Down
24 changes: 9 additions & 15 deletions src/agentex/lib/cli/handlers/cleanup_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,13 @@ def cleanup_single_task(client: Agentex, agent_name: str, task_id: str) -> None:
"""
try:
# Use the agent RPC method to cancel the task
try:
client.agents.rpc_by_name(
agent_name=agent_name,
method="task/cancel",
params={"task_id": task_id}
)
logger.debug(f"Successfully cancelled task {task_id} via agent '{agent_name}'")
except Exception as e:
# If RPC cancel fails, try direct task deletion as fallback
logger.warning(f"RPC task/cancel failed for task {task_id}, trying direct deletion: {e}")
client.tasks.delete(task_id=task_id)
logger.debug(f"Successfully deleted task {task_id} directly")

client.agents.rpc_by_name(
agent_name=agent_name,
method="task/cancel",
params={"task_id": task_id}
)
logger.debug(f"Successfully cancelled task {task_id} via agent '{agent_name}'")

except Exception as e:
logger.warning(f"Failed to cleanup task {task_id}: {e}")
raise
logger.warning(f"RPC task/cancel failed for task {task_id}: {e}")
raise
23 changes: 22 additions & 1 deletion src/agentex/lib/cli/handlers/deploy_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from agentex.lib.cli.utils.auth_utils import _encode_principal_context
from agentex.lib.cli.utils.exceptions import DeploymentError, HelmError
from agentex.lib.cli.utils.kubectl_utils import check_and_switch_cluster_context
from agentex.lib.cli.utils.path_utils import calculate_docker_acp_module, PathResolutionError
from agentex.lib.environment_variables import EnvVarKeys
from agentex.lib.sdk.config.agent_config import AgentConfig
from agentex.lib.sdk.config.agent_manifest import AgentManifest
Expand Down Expand Up @@ -100,10 +101,24 @@ def convert_env_vars_dict_to_list(env_vars: dict[str, str]) -> list[dict[str, st
return [{"name": key, "value": value} for key, value in env_vars.items()]


def add_acp_command_to_helm_values(helm_values: dict[str, Any], manifest: AgentManifest, manifest_path: str) -> None:
"""Add dynamic ACP command to helm values based on manifest configuration"""
try:
docker_acp_module = calculate_docker_acp_module(manifest, manifest_path)
# Create the uvicorn command with the correct module path
helm_values["command"] = ["uvicorn", f"{docker_acp_module}:acp", "--host", "0.0.0.0", "--port", "8000"]
logger.info(f"Using dynamic ACP command: uvicorn {docker_acp_module}:acp")
except (PathResolutionError, Exception) as e:
# Fallback to default command structure
logger.warning(f"Could not calculate dynamic ACP module ({e}), using default: project.acp")
helm_values["command"] = ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]


def merge_deployment_configs(
manifest: AgentManifest,
cluster_config: ClusterConfig | None,
deploy_overrides: InputDeployOverrides,
manifest_path: str,
) -> dict[str, Any]:
agent_config: AgentConfig = manifest.agent

Expand Down Expand Up @@ -231,10 +246,16 @@ def merge_deployment_configs(
# Convert the env vars to a list of dictionaries
if "env" in helm_values:
helm_values["env"] = convert_env_vars_dict_to_list(helm_values["env"])

# Convert the temporal worker env vars to a list of dictionaries
if TEMPORAL_WORKER_KEY in helm_values and "env" in helm_values[TEMPORAL_WORKER_KEY]:
helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list(
helm_values[TEMPORAL_WORKER_KEY]["env"]
)

# Add dynamic ACP command based on manifest configuration
add_acp_command_to_helm_values(helm_values, manifest, manifest_path)

print("Deploying with the following helm values: ", helm_values)
return helm_values

Expand Down Expand Up @@ -290,7 +311,7 @@ def deploy_agent(
add_helm_repo()

# Merge configurations
helm_values = merge_deployment_configs(manifest, override_config, deploy_overrides)
helm_values = merge_deployment_configs(manifest, override_config, deploy_overrides, manifest_path)

# Create values file
values_file = create_helm_values_file(helm_values)
Expand Down
112 changes: 19 additions & 93 deletions src/agentex/lib/cli/handlers/run_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
cleanup_agent_workflows,
should_cleanup_on_restart
)
from agentex.lib.cli.utils.path_utils import (
get_file_paths,
calculate_uvicorn_target_for_local,
)

from agentex.lib.environment_variables import EnvVarKeys
from agentex.lib.sdk.config.agent_manifest import AgentManifest
from agentex.lib.utils.logging import make_logger
Expand Down Expand Up @@ -104,7 +109,10 @@ async def start_worker() -> asyncio.subprocess.Process:
# PRE-RESTART CLEANUP - NEW!
if current_process is not None:
# Extract agent name from worker path for cleanup
agent_name = worker_path.parent.parent.name

agent_name = env.get("AGENT_NAME")
if agent_name is None:
agent_name = worker_path.parent.parent.name

# Perform cleanup if configured
if should_cleanup_on_restart():
Expand Down Expand Up @@ -180,15 +188,17 @@ async def start_worker() -> asyncio.subprocess.Process:


async def start_acp_server(
acp_path: Path, port: int, env: dict[str, str]
acp_path: Path, port: int, env: dict[str, str], manifest_dir: Path
) -> asyncio.subprocess.Process:
"""Start the ACP server process"""
# Use the actual file path instead of module path for better reload detection
# Use file path relative to manifest directory if possible
uvicorn_target = calculate_uvicorn_target_for_local(acp_path, manifest_dir)

cmd = [
sys.executable,
"-m",
"uvicorn",
f"{acp_path.parent.name}.acp:acp",
f"{uvicorn_target}:acp",
"--reload",
"--reload-dir",
str(acp_path.parent), # Watch the project directory specifically
Expand All @@ -201,7 +211,7 @@ async def start_acp_server(
console.print(f"[blue]Starting ACP server from {acp_path} on port {port}...[/blue]")
return await asyncio.create_subprocess_exec(
*cmd,
cwd=acp_path.parent.parent,
cwd=manifest_dir, # Always use manifest directory as CWD for consistency
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
Expand All @@ -218,7 +228,7 @@ async def start_temporal_worker(

return await asyncio.create_subprocess_exec(
*cmd,
cwd=worker_path.parent,
cwd=worker_path.parent, # Use worker directory as CWD for imports to work
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
Expand Down Expand Up @@ -280,8 +290,9 @@ async def run_agent(manifest_path: str):
)

# Start ACP server
manifest_dir = Path(manifest_path).parent
acp_process = await start_acp_server(
file_paths["acp"], manifest.local_development.agent.port, agent_env
file_paths["acp"], manifest.local_development.agent.port, agent_env, manifest_dir
)
process_manager.add_process(acp_process)

Expand All @@ -291,7 +302,7 @@ async def run_agent(manifest_path: str):
tasks = [acp_output_task]

# Start temporal worker if needed
if is_temporal_agent(manifest):
if is_temporal_agent(manifest) and file_paths["worker"]:
worker_task = await start_temporal_worker_with_reload(file_paths["worker"], agent_env, process_manager)
tasks.append(worker_task)

Expand Down Expand Up @@ -323,92 +334,7 @@ async def run_agent(manifest_path: str):
await process_manager.cleanup_processes()


def resolve_and_validate_path(base_path: Path, configured_path: str, file_type: str) -> Path:
"""Resolve and validate a configured path"""
path_obj = Path(configured_path)

if path_obj.is_absolute():
# Absolute path - use as-is
resolved_path = path_obj
else:
# Relative path - resolve relative to manifest directory
resolved_path = (base_path / configured_path).resolve()

# Validate the file exists
if not resolved_path.exists():
raise RunError(
f"{file_type} file not found: {resolved_path}\n"
f" Configured path: {configured_path}\n"
f" Resolved from manifest: {base_path}"
)

# Validate it's actually a file
if not resolved_path.is_file():
raise RunError(f"{file_type} path is not a file: {resolved_path}")

return resolved_path


def validate_path_security(resolved_path: Path, manifest_dir: Path) -> None:
"""Basic security validation for resolved paths"""
try:
# Ensure the resolved path is accessible
resolved_path.resolve()

# Optional: Add warnings for paths that go too far up
try:
# Check if path goes more than 3 levels up from manifest
relative_to_manifest = resolved_path.relative_to(manifest_dir.parent.parent.parent)
if str(relative_to_manifest).startswith(".."):
logger.warning(
f"Path goes significantly outside project structure: {resolved_path}"
)
except ValueError:
# Path is outside the tree - that's okay, just log it
logger.info(f"Using path outside manifest directory tree: {resolved_path}")

except Exception as e:
raise RunError(f"Path resolution failed: {resolved_path} - {str(e)}") from e


def get_file_paths(manifest: AgentManifest, manifest_path: str) -> dict[str, Path]:
"""Get resolved file paths from manifest configuration"""
manifest_dir = Path(manifest_path).parent.resolve()

# Use configured paths or fall back to defaults for backward compatibility
if manifest.local_development and manifest.local_development.paths:
paths_config = manifest.local_development.paths

# Resolve ACP path
acp_path = resolve_and_validate_path(manifest_dir, paths_config.acp, "ACP server")
validate_path_security(acp_path, manifest_dir)

# Resolve worker path if specified
worker_path = None
if paths_config.worker:
worker_path = resolve_and_validate_path(
manifest_dir, paths_config.worker, "Temporal worker"
)
validate_path_security(worker_path, manifest_dir)
else:
# Backward compatibility: use old hardcoded structure
project_dir = manifest_dir / "project"
acp_path = project_dir / "acp.py"
worker_path = project_dir / "run_worker.py" if is_temporal_agent(manifest) else None

# Validate backward compatibility paths
if not acp_path.exists():
raise RunError(f"ACP file not found: {acp_path}")

if worker_path and not worker_path.exists():
raise RunError(f"Worker file not found: {worker_path}")

return {
"acp": acp_path,
"worker": worker_path,
"acp_dir": acp_path.parent,
"worker_dir": worker_path.parent if worker_path else None,
}


def create_agent_environment(manifest: AgentManifest) -> dict[str, str]:
Expand Down
Loading
Loading