diff --git a/examples/tutorials/00_sync/010_multiturn/manifest.yaml b/examples/tutorials/00_sync/010_multiturn/manifest.yaml index c0c7a08f..6b10e48b 100644 --- a/examples/tutorials/00_sync/010_multiturn/manifest.yaml +++ b/examples/tutorials/00_sync/010_multiturn/manifest.yaml @@ -114,4 +114,4 @@ deployment: memory: "1Gi" limits: cpu: "1000m" - memory: "2Gi" \ No newline at end of file + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/adk/_modules/acp.py b/src/agentex/lib/adk/_modules/acp.py index 657f8e14..650fe486 100644 --- a/src/agentex/lib/adk/_modules/acp.py +++ b/src/agentex/lib/adk/_modules/acp.py @@ -4,6 +4,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.services.adk.acp.acp import ACPService from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers from agentex.lib.core.temporal.activities.adk.acp.acp_activities import ( @@ -40,7 +41,7 @@ def __init__(self, acp_service: ACPService | None = None): acp_activities (Optional[ACPActivities]): Optional pre-configured ACP activities. If None, will be auto-initialized. """ if acp_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() tracer = AsyncTracer(agentex_client) self._acp_service = ACPService(agentex_client=agentex_client, tracer=tracer) else: diff --git a/src/agentex/lib/adk/_modules/agent_task_tracker.py b/src/agentex/lib/adk/_modules/agent_task_tracker.py index 44ffd518..b7385ee1 100644 --- a/src/agentex/lib/adk/_modules/agent_task_tracker.py +++ b/src/agentex/lib/adk/_modules/agent_task_tracker.py @@ -3,6 +3,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.services.adk.agent_task_tracker import AgentTaskTrackerService from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers from agentex.lib.core.temporal.activities.adk.agent_task_tracker_activities import ( @@ -33,7 +34,7 @@ def __init__( agent_task_tracker_service: AgentTaskTrackerService | None = None, ): if agent_task_tracker_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() tracer = AsyncTracer(agentex_client) self._agent_task_tracker_service = AgentTaskTrackerService( agentex_client=agentex_client, tracer=tracer diff --git a/src/agentex/lib/adk/_modules/agents.py b/src/agentex/lib/adk/_modules/agents.py index 5e1bbe0e..b97633c6 100644 --- a/src/agentex/lib/adk/_modules/agents.py +++ b/src/agentex/lib/adk/_modules/agents.py @@ -1,6 +1,7 @@ from datetime import timedelta from typing import Optional +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.temporal.activities.adk.agents_activities import AgentsActivityName, GetAgentParams from temporalio.common import RetryPolicy @@ -28,7 +29,7 @@ def __init__( agents_service: Optional[AgentsService] = None, ): if agents_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() tracer = AsyncTracer(agentex_client) self._agents_service = AgentsService(agentex_client=agentex_client, tracer=tracer) else: diff --git a/src/agentex/lib/adk/_modules/events.py b/src/agentex/lib/adk/_modules/events.py index 529db4c3..20817a0a 100644 --- a/src/agentex/lib/adk/_modules/events.py +++ b/src/agentex/lib/adk/_modules/events.py @@ -3,6 +3,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.services.adk.events import EventsService from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers from agentex.lib.core.temporal.activities.adk.events_activities import ( @@ -32,7 +33,7 @@ def __init__( events_service: EventsService | None = None, ): if events_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() tracer = AsyncTracer(agentex_client) self._events_service = EventsService( agentex_client=agentex_client, tracer=tracer diff --git a/src/agentex/lib/adk/_modules/messages.py b/src/agentex/lib/adk/_modules/messages.py index 6d2e2477..51cd0629 100644 --- a/src/agentex/lib/adk/_modules/messages.py +++ b/src/agentex/lib/adk/_modules/messages.py @@ -3,6 +3,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.adapters.streams.adapter_redis import RedisStreamRepository from agentex.lib.core.services.adk.messages import MessagesService from agentex.lib.core.services.adk.streaming import StreamingService @@ -37,7 +38,7 @@ def __init__( messages_service: MessagesService | None = None, ): if messages_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() stream_repository = RedisStreamRepository() streaming_service = StreamingService( agentex_client=agentex_client, diff --git a/src/agentex/lib/adk/_modules/state.py b/src/agentex/lib/adk/_modules/state.py index c13a6b59..7759e981 100644 --- a/src/agentex/lib/adk/_modules/state.py +++ b/src/agentex/lib/adk/_modules/state.py @@ -5,6 +5,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.services.adk.state import StateService from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers from agentex.lib.core.temporal.activities.adk.state_activities import ( @@ -36,7 +37,7 @@ def __init__( state_service: StateService | None = None, ): if state_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() tracer = AsyncTracer(agentex_client) self._state_service = StateService( agentex_client=agentex_client, tracer=tracer diff --git a/src/agentex/lib/adk/_modules/streaming.py b/src/agentex/lib/adk/_modules/streaming.py index f604db16..d75fa0f4 100644 --- a/src/agentex/lib/adk/_modules/streaming.py +++ b/src/agentex/lib/adk/_modules/streaming.py @@ -1,6 +1,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.adapters.streams.adapter_redis import RedisStreamRepository from agentex.lib.core.services.adk.streaming import ( StreamingService, @@ -34,7 +35,7 @@ def __init__(self, streaming_service: StreamingService | None = None): """ if streaming_service is None: stream_repository = RedisStreamRepository() - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() self._streaming_service = StreamingService( agentex_client=agentex_client, stream_repository=stream_repository, diff --git a/src/agentex/lib/adk/_modules/tasks.py b/src/agentex/lib/adk/_modules/tasks.py index 8b8f949a..2b95dd52 100644 --- a/src/agentex/lib/adk/_modules/tasks.py +++ b/src/agentex/lib/adk/_modules/tasks.py @@ -3,6 +3,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.services.adk.tasks import TasksService from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers from agentex.lib.core.temporal.activities.adk.tasks_activities import ( @@ -31,7 +32,7 @@ def __init__( tasks_service: TasksService | None = None, ): if tasks_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() tracer = AsyncTracer(agentex_client) self._tasks_service = TasksService( agentex_client=agentex_client, tracer=tracer diff --git a/src/agentex/lib/adk/_modules/tracing.py b/src/agentex/lib/adk/_modules/tracing.py index 138f25c9..f4073d0e 100644 --- a/src/agentex/lib/adk/_modules/tracing.py +++ b/src/agentex/lib/adk/_modules/tracing.py @@ -6,6 +6,7 @@ from temporalio.common import RetryPolicy from agentex import AsyncAgentex +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from agentex.lib.core.services.adk.tracing import TracingService from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers from agentex.lib.core.temporal.activities.adk.tracing_activities import ( @@ -38,7 +39,7 @@ def __init__(self, tracing_service: TracingService | None = None): tracing_activities (Optional[TracingActivities]): Optional pre-configured tracing activities. If None, will be auto-initialized. """ if tracing_service is None: - agentex_client = AsyncAgentex() + agentex_client = get_async_agentex_client() tracer = AsyncTracer(agentex_client) self._tracing_service = TracingService(tracer=tracer) else: diff --git a/src/agentex/lib/adk/utils/_modules/client.py b/src/agentex/lib/adk/utils/_modules/client.py new file mode 100644 index 00000000..bfe4ad93 --- /dev/null +++ b/src/agentex/lib/adk/utils/_modules/client.py @@ -0,0 +1,43 @@ +import threading +from typing import Dict, Optional, Any + +from agentex import AsyncAgentex +from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables + +_client: Optional["AsyncAgentex"] = None +_cached_headers: Dict[str, str] = {} +_init_kwargs: Dict[str, Any] = {} +_lock = threading.RLock() + + +def _build_headers() -> Dict[str, str]: + EnvironmentVariables.refresh() + if refreshed_environment_variables and getattr(refreshed_environment_variables, "AGENT_ID", None): + return {"x-agent-identity": refreshed_environment_variables.AGENT_ID} + return {} + + +def get_async_agentex_client(**kwargs) -> "AsyncAgentex": + """ + Return a cached AsyncAgentex instance (created synchronously). + Each call re-checks env vars and updates client.default_headers if needed. + """ + global _client, _cached_headers, _init_kwargs + + new_headers = _build_headers() + + with _lock: + # First time (or kwargs changed) -> build a new client + if _client is None or kwargs != _init_kwargs: + _client = AsyncAgentex(default_headers=new_headers.copy(), **kwargs) + _cached_headers = new_headers + _init_kwargs = dict(kwargs) + return _client + + # Same client; maybe headers changed + if new_headers != _cached_headers: + _cached_headers = new_headers + _client.default_headers.clear() + _client.default_headers.update(new_headers) + + return _client \ No newline at end of file diff --git a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py index 8b8cc9ae..c503d5ce 100644 --- a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py +++ b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py @@ -2,18 +2,20 @@ import base64 import inspect import json +import os from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import asynccontextmanager from typing import Any import httpx import uvicorn +from agentex.lib.adk.utils._modules.client import get_async_agentex_client from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from pydantic import TypeAdapter, ValidationError # from agentex.lib.sdk.fastacp.types import BaseACPConfig -from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables from agentex.lib.types.acp import ( PARAMS_MODEL_BY_METHOD, RPC_SYNC_METHODS, @@ -390,6 +392,14 @@ async def _register_agent(self, env_vars: EnvironmentVariables): registration_url, json=registration_data, timeout=30.0 ) if response.status_code == 200: + agent = response.json() + agent_id, agent_name = agent["id"], agent["name"] + + os.environ["AGENT_ID"] = agent_id + os.environ["AGENT_NAME"] = agent_name + refreshed_environment_variables.AGENT_ID = agent_id + refreshed_environment_variables.AGENT_NAME = agent_name + get_async_agentex_client() # refresh cache logger.info( f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}" )