Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/agentex/lib/adk/utils/_modules/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@


class EnvAuth(httpx.Auth):
def __init__(self, header_name="x-agent-identity"):
def __init__(self, header_name="x-agent-api-key"):
self.header_name = header_name

def auth_flow(self, request):
# This gets called for every request
env_vars = EnvironmentVariables.refresh()
if env_vars:
agent_id = env_vars.AGENT_ID
if agent_id:
request.headers[self.header_name] = agent_id
logger.info(f"Adding header {self.header_name}:{agent_id}")
agent_api_key = env_vars.AGENT_API_KEY
if agent_api_key:
request.headers[self.header_name] = agent_api_key
masked_key = agent_api_key[-4:] if agent_api_key and len(agent_api_key) > 4 else "****"
logger.info(f"Adding header {self.header_name}:{masked_key}")
yield request


Expand Down
17 changes: 17 additions & 0 deletions src/agentex/lib/core/temporal/workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
)

from agentex.lib.utils.logging import make_logger
from agentex.lib.utils.registration import register_agent
from agentex.lib.environment_variables import EnvironmentVariables

logger = make_logger(__name__)

Expand Down Expand Up @@ -103,6 +105,7 @@ async def run(
workflow: type,
):
await self.start_health_check_server()
await self._register_agent()
temporal_client = await get_temporal_client(
temporal_address=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
)
Expand Down Expand Up @@ -160,3 +163,17 @@ async def start_health_check_server(self):
f"Failed to start health check server on alternative port {alt_port}: {e}"
)
raise

"""
Register the worker with the Agentex server.

Even though the Temporal server will also register the agent with the server,
doing this on the worker side is required to make sure that both share the API key
which is returned on registration and used to authenticate the worker with the Agentex server.
"""
async def _register_agent(self):
env_vars = EnvironmentVariables.refresh()
if env_vars and env_vars.AGENTEX_BASE_URL:
await register_agent(env_vars)
else:
logger.warning("AGENTEX_BASE_URL not set, skipping worker registration")
2 changes: 2 additions & 0 deletions src/agentex/lib/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class EnvVarKeys(str, Enum):
AGENT_NAME = "AGENT_NAME"
AGENT_DESCRIPTION = "AGENT_DESCRIPTION"
AGENT_ID = "AGENT_ID"
AGENT_API_KEY = "AGENT_API_KEY"
# ACP Configuration
ACP_URL = "ACP_URL"
ACP_PORT = "ACP_PORT"
Expand Down Expand Up @@ -52,6 +53,7 @@ class EnvironmentVariables(BaseModel):
AGENT_NAME: str
AGENT_DESCRIPTION: str | None = None
AGENT_ID: str | None = None
AGENT_API_KEY: str | None = None
ACP_TYPE: str | None = "agentic"
# ACP Configuration
ACP_URL: str
Expand Down
103 changes: 13 additions & 90 deletions src/agentex/lib/sdk/fastacp/base/base_acp_server.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import asyncio
import base64
import inspect
import json
import os
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
from typing import Any

import httpx
import uvicorn
from agentex.lib.adk.utils._modules.client import create_async_agentex_client
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import TypeAdapter, ValidationError
Expand All @@ -30,6 +25,7 @@
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.utils.logging import make_logger
from agentex.lib.utils.model_utils import BaseModel
from agentex.lib.utils.registration import register_agent

logger = make_logger(__name__)

Expand Down Expand Up @@ -74,7 +70,7 @@ def get_lifespan_function(self):
async def lifespan_context(app: FastAPI):
env_vars = EnvironmentVariables.refresh()
if env_vars.AGENTEX_BASE_URL:
await self._register_agent(env_vars)
await register_agent(env_vars)
else:
logger.warning("AGENTEX_BASE_URL not set, skipping agent registration")

Expand All @@ -101,6 +97,16 @@ async def _handle_jsonrpc(self, request: Request):
data = await request.json()
rpc_request = JSONRPCRequest(**data)

# Check if the request is authenticated
if refreshed_environment_variables and getattr(refreshed_environment_variables, "AGENT_API_KEY", None):
authorization_header = request.headers.get("x-agent-api-key")
if authorization_header != refreshed_environment_variables.AGENT_API_KEY:
return JSONRPCResponse(
id=rpc_request.id,
error=JSONRPCError(code=-32601, message="Unauthorized"),
)


# Check if method is valid first
try:
method = RPCMethod(rpc_request.method)
Expand Down Expand Up @@ -345,87 +351,4 @@ def run(self, host: str = "0.0.0.0", port: int = 8000, **kwargs):
"""Start the Uvicorn server for async handlers."""
uvicorn.run(self, host=host, port=port, **kwargs)

def _get_auth_principal(self, env_vars: EnvironmentVariables):
if not env_vars.AUTH_PRINCIPAL_B64:
return None

try:
decoded_str = base64.b64decode(env_vars.AUTH_PRINCIPAL_B64).decode('utf-8')
return json.loads(decoded_str)
except Exception:
return None

async def _register_agent(self, env_vars: EnvironmentVariables):
"""Register this agent with the Agentex server"""
# Build the agent's own URL
full_acp_url = f"{env_vars.ACP_URL.rstrip('/')}:{env_vars.ACP_PORT}"

description = (
env_vars.AGENT_DESCRIPTION
or f"Generic description for agent: {env_vars.AGENT_NAME}"
)

# Prepare registration data
registration_data = {
"name": env_vars.AGENT_NAME,
"description": description,
"acp_url": full_acp_url,
"acp_type": env_vars.ACP_TYPE,
"principal_context": self._get_auth_principal(env_vars)
}

if env_vars.AGENT_ID:
registration_data["agent_id"] = env_vars.AGENT_ID

# Make the registration request
registration_url = f"{env_vars.AGENTEX_BASE_URL.rstrip('/')}/agents/register"
# Retry logic with configurable attempts and delay
max_retries = 3
base_delay = 5 # seconds
last_exception = None

attempt = 0
while attempt < max_retries:
try:
async with httpx.AsyncClient() as client:
response = await client.post(
registration_url, json=registration_data, timeout=30.0
)
if response.status_code == 200:
agent = response.json()
agent_id, agent_name = agent["id"], agent["name"]

os.environ["AGENT_ID"] = agent_id
os.environ["AGENT_NAME"] = agent_name
env_vars.AGENT_ID = agent_id
env_vars.AGENT_NAME = agent_name
global refreshed_environment_variables
refreshed_environment_variables = env_vars
logger.info(
f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}"
)
return # Success, exit the retry loop
else:
error_msg = f"Failed to register agent. Status: {response.status_code}, Response: {response.text}"
logger.error(error_msg)
last_exception = Exception(
f"Failed to startup agent: {response.text}"
)

except Exception as e:
logger.error(
f"Exception during agent registration attempt {attempt + 1}: {e}"
)
last_exception = e
attempt += 1
if attempt < max_retries:
delay = (attempt) * base_delay # 5, 10, 15 seconds
logger.info(
f"Retrying in {delay} seconds... (attempt {attempt}/{max_retries})"
)
await asyncio.sleep(delay)

# If we get here, all retries failed
raise last_exception or Exception(
f"Failed to register agent after {max_retries} attempts"
)

101 changes: 101 additions & 0 deletions src/agentex/lib/utils/registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import base64
import json
import os
import httpx
import asyncio

from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables
from agentex.lib.utils.logging import make_logger

logger = make_logger(__name__)

def get_auth_principal(env_vars: EnvironmentVariables):
if not env_vars.AUTH_PRINCIPAL_B64:
return None

try:
decoded_str = base64.b64decode(env_vars.AUTH_PRINCIPAL_B64).decode('utf-8')
return json.loads(decoded_str)
except Exception:
return None

async def register_agent(env_vars: EnvironmentVariables):
"""Register this agent with the Agentex server"""
if not env_vars.AGENTEX_BASE_URL:
logger.warning("AGENTEX_BASE_URL is not set, skipping registration")
return
# Build the agent's own URL
full_acp_url = f"{env_vars.ACP_URL.rstrip('/')}:{env_vars.ACP_PORT}"

description = (
env_vars.AGENT_DESCRIPTION
or f"Generic description for agent: {env_vars.AGENT_NAME}"
)

# Prepare registration data
registration_data = {
"name": env_vars.AGENT_NAME,
"description": description,
"acp_url": full_acp_url,
"acp_type": env_vars.ACP_TYPE,
"principal_context": get_auth_principal(env_vars)
}

if env_vars.AGENT_ID:
registration_data["agent_id"] = env_vars.AGENT_ID

# Make the registration request
registration_url = f"{env_vars.AGENTEX_BASE_URL.rstrip('/')}/agents/register"
# Retry logic with configurable attempts and delay
max_retries = 3
base_delay = 5 # seconds
last_exception = None

attempt = 0
while attempt < max_retries:
try:
async with httpx.AsyncClient() as client:
response = await client.post(
registration_url, json=registration_data, timeout=30.0
)
if response.status_code == 200:
agent = response.json()
agent_id, agent_name = agent["id"], agent["name"]
agent_api_key = agent["agent_api_key"]

os.environ["AGENT_ID"] = agent_id
os.environ["AGENT_NAME"] = agent_name
os.environ["AGENT_API_KEY"] = agent_api_key
env_vars.AGENT_ID = agent_id
env_vars.AGENT_NAME = agent_name
env_vars.AGENT_API_KEY = agent_api_key
global refreshed_environment_variables
refreshed_environment_variables = env_vars
logger.info(
f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}"
)
return # Success, exit the retry loop
else:
error_msg = f"Failed to register agent. Status: {response.status_code}, Response: {response.text}"
logger.error(error_msg)
last_exception = Exception(
f"Failed to startup agent: {response.text}"
)

except Exception as e:
logger.error(
f"Exception during agent registration attempt {attempt + 1}: {e}"
)
last_exception = e
attempt += 1
if attempt < max_retries:
delay = (attempt) * base_delay # 5, 10, 15 seconds
logger.info(
f"Retrying in {delay} seconds... (attempt {attempt}/{max_retries})"
)
await asyncio.sleep(delay)

# If we get here, all retries failed
raise last_exception or Exception(
f"Failed to register agent after {max_retries} attempts"
)
Loading