diff --git a/src/agentex/lib/adk/utils/_modules/client.py b/src/agentex/lib/adk/utils/_modules/client.py index 2999574e..49e303fc 100644 --- a/src/agentex/lib/adk/utils/_modules/client.py +++ b/src/agentex/lib/adk/utils/_modules/client.py @@ -8,17 +8,18 @@ class EnvAuth(httpx.Auth): - def __init__(self, header_name="x-agent-identity"): + def __init__(self, header_name="x-agent-api-key"): self.header_name = header_name def auth_flow(self, request): # This gets called for every request env_vars = EnvironmentVariables.refresh() if env_vars: - agent_id = env_vars.AGENT_ID - if agent_id: - request.headers[self.header_name] = agent_id - logger.info(f"Adding header {self.header_name}:{agent_id}") + agent_api_key = env_vars.AGENT_API_KEY + if agent_api_key: + request.headers[self.header_name] = agent_api_key + masked_key = agent_api_key[-4:] if agent_api_key and len(agent_api_key) > 4 else "****" + logger.info(f"Adding header {self.header_name}:{masked_key}") yield request diff --git a/src/agentex/lib/core/temporal/workers/worker.py b/src/agentex/lib/core/temporal/workers/worker.py index b3411d9e..6d99aa2b 100644 --- a/src/agentex/lib/core/temporal/workers/worker.py +++ b/src/agentex/lib/core/temporal/workers/worker.py @@ -24,6 +24,8 @@ ) from agentex.lib.utils.logging import make_logger +from agentex.lib.utils.registration import register_agent +from agentex.lib.environment_variables import EnvironmentVariables logger = make_logger(__name__) @@ -103,6 +105,7 @@ async def run( workflow: type, ): await self.start_health_check_server() + await self._register_agent() temporal_client = await get_temporal_client( temporal_address=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), ) @@ -160,3 +163,17 @@ async def start_health_check_server(self): f"Failed to start health check server on alternative port {alt_port}: {e}" ) raise + + """ + Register the worker with the Agentex server. + + Even though the Temporal server will also register the agent with the server, + doing this on the worker side is required to make sure that both share the API key + which is returned on registration and used to authenticate the worker with the Agentex server. + """ + async def _register_agent(self): + env_vars = EnvironmentVariables.refresh() + if env_vars and env_vars.AGENTEX_BASE_URL: + await register_agent(env_vars) + else: + logger.warning("AGENTEX_BASE_URL not set, skipping worker registration") \ No newline at end of file diff --git a/src/agentex/lib/environment_variables.py b/src/agentex/lib/environment_variables.py index d0109e64..1f70c48d 100644 --- a/src/agentex/lib/environment_variables.py +++ b/src/agentex/lib/environment_variables.py @@ -23,6 +23,7 @@ class EnvVarKeys(str, Enum): AGENT_NAME = "AGENT_NAME" AGENT_DESCRIPTION = "AGENT_DESCRIPTION" AGENT_ID = "AGENT_ID" + AGENT_API_KEY = "AGENT_API_KEY" # ACP Configuration ACP_URL = "ACP_URL" ACP_PORT = "ACP_PORT" @@ -52,6 +53,7 @@ class EnvironmentVariables(BaseModel): AGENT_NAME: str AGENT_DESCRIPTION: str | None = None AGENT_ID: str | None = None + AGENT_API_KEY: str | None = None ACP_TYPE: str | None = "agentic" # ACP Configuration ACP_URL: str diff --git a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py index 4b3e5876..0e68f440 100644 --- a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py +++ b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py @@ -1,15 +1,10 @@ import asyncio -import base64 import inspect -import json -import os from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import asynccontextmanager from typing import Any -import httpx import uvicorn -from agentex.lib.adk.utils._modules.client import create_async_agentex_client from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from pydantic import TypeAdapter, ValidationError @@ -30,6 +25,7 @@ from agentex.types.task_message_content import TaskMessageContent from agentex.lib.utils.logging import make_logger from agentex.lib.utils.model_utils import BaseModel +from agentex.lib.utils.registration import register_agent logger = make_logger(__name__) @@ -74,7 +70,7 @@ def get_lifespan_function(self): async def lifespan_context(app: FastAPI): env_vars = EnvironmentVariables.refresh() if env_vars.AGENTEX_BASE_URL: - await self._register_agent(env_vars) + await register_agent(env_vars) else: logger.warning("AGENTEX_BASE_URL not set, skipping agent registration") @@ -101,6 +97,16 @@ async def _handle_jsonrpc(self, request: Request): data = await request.json() rpc_request = JSONRPCRequest(**data) + # Check if the request is authenticated + if refreshed_environment_variables and getattr(refreshed_environment_variables, "AGENT_API_KEY", None): + authorization_header = request.headers.get("x-agent-api-key") + if authorization_header != refreshed_environment_variables.AGENT_API_KEY: + return JSONRPCResponse( + id=rpc_request.id, + error=JSONRPCError(code=-32601, message="Unauthorized"), + ) + + # Check if method is valid first try: method = RPCMethod(rpc_request.method) @@ -345,87 +351,4 @@ def run(self, host: str = "0.0.0.0", port: int = 8000, **kwargs): """Start the Uvicorn server for async handlers.""" uvicorn.run(self, host=host, port=port, **kwargs) - def _get_auth_principal(self, env_vars: EnvironmentVariables): - if not env_vars.AUTH_PRINCIPAL_B64: - return None - - try: - decoded_str = base64.b64decode(env_vars.AUTH_PRINCIPAL_B64).decode('utf-8') - return json.loads(decoded_str) - except Exception: - return None - - async def _register_agent(self, env_vars: EnvironmentVariables): - """Register this agent with the Agentex server""" - # Build the agent's own URL - full_acp_url = f"{env_vars.ACP_URL.rstrip('/')}:{env_vars.ACP_PORT}" - - description = ( - env_vars.AGENT_DESCRIPTION - or f"Generic description for agent: {env_vars.AGENT_NAME}" - ) - - # Prepare registration data - registration_data = { - "name": env_vars.AGENT_NAME, - "description": description, - "acp_url": full_acp_url, - "acp_type": env_vars.ACP_TYPE, - "principal_context": self._get_auth_principal(env_vars) - } - - if env_vars.AGENT_ID: - registration_data["agent_id"] = env_vars.AGENT_ID - - # Make the registration request - registration_url = f"{env_vars.AGENTEX_BASE_URL.rstrip('/')}/agents/register" - # Retry logic with configurable attempts and delay - max_retries = 3 - base_delay = 5 # seconds - last_exception = None - - attempt = 0 - while attempt < max_retries: - try: - async with httpx.AsyncClient() as client: - response = await client.post( - registration_url, json=registration_data, timeout=30.0 - ) - if response.status_code == 200: - agent = response.json() - agent_id, agent_name = agent["id"], agent["name"] - - os.environ["AGENT_ID"] = agent_id - os.environ["AGENT_NAME"] = agent_name - env_vars.AGENT_ID = agent_id - env_vars.AGENT_NAME = agent_name - global refreshed_environment_variables - refreshed_environment_variables = env_vars - logger.info( - f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}" - ) - return # Success, exit the retry loop - else: - error_msg = f"Failed to register agent. Status: {response.status_code}, Response: {response.text}" - logger.error(error_msg) - last_exception = Exception( - f"Failed to startup agent: {response.text}" - ) - - except Exception as e: - logger.error( - f"Exception during agent registration attempt {attempt + 1}: {e}" - ) - last_exception = e - attempt += 1 - if attempt < max_retries: - delay = (attempt) * base_delay # 5, 10, 15 seconds - logger.info( - f"Retrying in {delay} seconds... (attempt {attempt}/{max_retries})" - ) - await asyncio.sleep(delay) - - # If we get here, all retries failed - raise last_exception or Exception( - f"Failed to register agent after {max_retries} attempts" - ) + \ No newline at end of file diff --git a/src/agentex/lib/utils/registration.py b/src/agentex/lib/utils/registration.py new file mode 100644 index 00000000..8b149d46 --- /dev/null +++ b/src/agentex/lib/utils/registration.py @@ -0,0 +1,101 @@ +import base64 +import json +import os +import httpx +import asyncio + +from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + +def get_auth_principal(env_vars: EnvironmentVariables): + if not env_vars.AUTH_PRINCIPAL_B64: + return None + + try: + decoded_str = base64.b64decode(env_vars.AUTH_PRINCIPAL_B64).decode('utf-8') + return json.loads(decoded_str) + except Exception: + return None + +async def register_agent(env_vars: EnvironmentVariables): + """Register this agent with the Agentex server""" + if not env_vars.AGENTEX_BASE_URL: + logger.warning("AGENTEX_BASE_URL is not set, skipping registration") + return + # Build the agent's own URL + full_acp_url = f"{env_vars.ACP_URL.rstrip('/')}:{env_vars.ACP_PORT}" + + description = ( + env_vars.AGENT_DESCRIPTION + or f"Generic description for agent: {env_vars.AGENT_NAME}" + ) + + # Prepare registration data + registration_data = { + "name": env_vars.AGENT_NAME, + "description": description, + "acp_url": full_acp_url, + "acp_type": env_vars.ACP_TYPE, + "principal_context": get_auth_principal(env_vars) + } + + if env_vars.AGENT_ID: + registration_data["agent_id"] = env_vars.AGENT_ID + + # Make the registration request + registration_url = f"{env_vars.AGENTEX_BASE_URL.rstrip('/')}/agents/register" + # Retry logic with configurable attempts and delay + max_retries = 3 + base_delay = 5 # seconds + last_exception = None + + attempt = 0 + while attempt < max_retries: + try: + async with httpx.AsyncClient() as client: + response = await client.post( + registration_url, json=registration_data, timeout=30.0 + ) + if response.status_code == 200: + agent = response.json() + agent_id, agent_name = agent["id"], agent["name"] + agent_api_key = agent["agent_api_key"] + + os.environ["AGENT_ID"] = agent_id + os.environ["AGENT_NAME"] = agent_name + os.environ["AGENT_API_KEY"] = agent_api_key + env_vars.AGENT_ID = agent_id + env_vars.AGENT_NAME = agent_name + env_vars.AGENT_API_KEY = agent_api_key + global refreshed_environment_variables + refreshed_environment_variables = env_vars + logger.info( + f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}" + ) + return # Success, exit the retry loop + else: + error_msg = f"Failed to register agent. Status: {response.status_code}, Response: {response.text}" + logger.error(error_msg) + last_exception = Exception( + f"Failed to startup agent: {response.text}" + ) + + except Exception as e: + logger.error( + f"Exception during agent registration attempt {attempt + 1}: {e}" + ) + last_exception = e + attempt += 1 + if attempt < max_retries: + delay = (attempt) * base_delay # 5, 10, 15 seconds + logger.info( + f"Retrying in {delay} seconds... (attempt {attempt}/{max_retries})" + ) + await asyncio.sleep(delay) + + # If we get here, all retries failed + raise last_exception or Exception( + f"Failed to register agent after {max_retries} attempts" + )