From d52c73f9fdc6990e11719427631e159769a3dc1b Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Thu, 30 Apr 2026 15:30:05 -0400 Subject: [PATCH] fix(adk): Always inject headers on execute activity --- .../interceptors/context_interceptor.py | 45 ++++++++----------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py b/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py index 1111249f0..893f75f28 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/interceptors/context_interceptor.py @@ -85,33 +85,24 @@ def __init__(self, next, payload_converter): def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle: """Add task_id, trace_id, and parent_span_id to headers when starting model activities.""" - # Only add headers for model activity calls (OpenAI and Claude) - activity_name = str(input.activity) if hasattr(input, 'activity') else "" - - if ("invoke_model_activity" in activity_name or - "invoke-model-activity" in activity_name or - "run_claude_agent_activity" in activity_name): - # Get task_id, trace_id, and parent_span_id from workflow instance instead of inbound interceptor - try: - workflow_instance = workflow.instance() - task_id = getattr(workflow_instance, '_task_id', None) - trace_id = getattr(workflow_instance, '_trace_id', None) - parent_span_id = getattr(workflow_instance, '_parent_span_id', None) - - if task_id and trace_id and parent_span_id: - # Initialize headers if needed - if not input.headers: - input.headers = {} - - # Add task_id to headers - input.headers[TASK_ID_HEADER] = self._payload_converter.to_payload(task_id) # type: ignore[index] - input.headers[TRACE_ID_HEADER] = self._payload_converter.to_payload(trace_id) # type: ignore[index] - input.headers[PARENT_SPAN_ID_HEADER] = self._payload_converter.to_payload(parent_span_id) # type: ignore[index] - logger.debug(f"[OutboundInterceptor] Added task_id, trace_id, and parent_span_id to activity headers: {task_id}, {trace_id}, {parent_span_id}") - else: - logger.warning("[OutboundInterceptor] No _task_id, _trace_id, or _parent_span_id found in workflow instance") - except Exception as e: - logger.error(f"[OutboundInterceptor] Failed to get task_id, trace_id, or parent_span_id from workflow instance: {e}") + try: + workflow_instance = workflow.instance() + task_id = getattr(workflow_instance, '_task_id', None) + trace_id = getattr(workflow_instance, '_trace_id', None) + parent_span_id = getattr(workflow_instance, '_parent_span_id', None) + + if task_id and trace_id and parent_span_id: + if not input.headers: + input.headers = {} + + input.headers[TASK_ID_HEADER] = self._payload_converter.to_payload(task_id) # type: ignore[index] + input.headers[TRACE_ID_HEADER] = self._payload_converter.to_payload(trace_id) # type: ignore[index] + input.headers[PARENT_SPAN_ID_HEADER] = self._payload_converter.to_payload(parent_span_id) # type: ignore[index] + logger.debug(f"[OutboundInterceptor] Added task_id, trace_id, and parent_span_id to activity headers: {task_id}, {trace_id}, {parent_span_id}") + else: + logger.warning("[OutboundInterceptor] No _task_id, _trace_id, or _parent_span_id found in workflow instance") + except Exception as e: + logger.error(f"[OutboundInterceptor] Failed to get task_id, trace_id, or parent_span_id from workflow instance: {e}") return self.next.start_activity(input)