Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,24 @@ def __init__(self, next, payload_converter):
def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle:
"""Add task_id, trace_id, and parent_span_id to headers when starting model activities."""

# Only add headers for model activity calls (OpenAI and Claude)
activity_name = str(input.activity) if hasattr(input, 'activity') else ""

if ("invoke_model_activity" in activity_name or
"invoke-model-activity" in activity_name or
"run_claude_agent_activity" in activity_name):
# Get task_id, trace_id, and parent_span_id from workflow instance instead of inbound interceptor
try:
workflow_instance = workflow.instance()
task_id = getattr(workflow_instance, '_task_id', None)
trace_id = getattr(workflow_instance, '_trace_id', None)
parent_span_id = getattr(workflow_instance, '_parent_span_id', None)

if task_id and trace_id and parent_span_id:
# Initialize headers if needed
if not input.headers:
input.headers = {}

# Add task_id to headers
input.headers[TASK_ID_HEADER] = self._payload_converter.to_payload(task_id) # type: ignore[index]
input.headers[TRACE_ID_HEADER] = self._payload_converter.to_payload(trace_id) # type: ignore[index]
input.headers[PARENT_SPAN_ID_HEADER] = self._payload_converter.to_payload(parent_span_id) # type: ignore[index]
logger.debug(f"[OutboundInterceptor] Added task_id, trace_id, and parent_span_id to activity headers: {task_id}, {trace_id}, {parent_span_id}")
else:
logger.warning("[OutboundInterceptor] No _task_id, _trace_id, or _parent_span_id found in workflow instance")
except Exception as e:
logger.error(f"[OutboundInterceptor] Failed to get task_id, trace_id, or parent_span_id from workflow instance: {e}")
try:
workflow_instance = workflow.instance()
task_id = getattr(workflow_instance, '_task_id', None)
trace_id = getattr(workflow_instance, '_trace_id', None)
parent_span_id = getattr(workflow_instance, '_parent_span_id', None)

if task_id and trace_id and parent_span_id:
if not input.headers:
input.headers = {}

input.headers[TASK_ID_HEADER] = self._payload_converter.to_payload(task_id) # type: ignore[index]
input.headers[TRACE_ID_HEADER] = self._payload_converter.to_payload(trace_id) # type: ignore[index]
input.headers[PARENT_SPAN_ID_HEADER] = self._payload_converter.to_payload(parent_span_id) # type: ignore[index]
logger.debug(f"[OutboundInterceptor] Added task_id, trace_id, and parent_span_id to activity headers: {task_id}, {trace_id}, {parent_span_id}")
else:
logger.warning("[OutboundInterceptor] No _task_id, _trace_id, or _parent_span_id found in workflow instance")
except Exception as e:
logger.error(f"[OutboundInterceptor] Failed to get task_id, trace_id, or parent_span_id from workflow instance: {e}")

return self.next.start_activity(input)

Expand Down