Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from agentex.types.span import Span
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.environment_variables import EnvironmentVariables
from agentex.lib.core.tracing.processors.tracing_processor_interface import (
SyncTracingProcessor,
AsyncTracingProcessor,
Expand All @@ -24,9 +25,24 @@ def __init__(self, config: SGPTracingProcessorConfig):
disabled=disabled,
)
self._spans: dict[str, SGPSpan] = {}
self.env_vars = EnvironmentVariables.refresh()

def _add_source_to_span(self, span: Span) -> None:
if span.data is None:
span.data = {}
if isinstance(span.data, dict):
span.data["__source__"] = "agentex"
if self.env_vars.ACP_TYPE is not None:
span.data["__acp_type__"] = self.env_vars.ACP_TYPE
if self.env_vars.AGENT_NAME is not None:
span.data["__agent_name__"] = self.env_vars.AGENT_NAME
if self.env_vars.AGENT_ID is not None:
span.data["__agent_id__"] = self.env_vars.AGENT_ID

@override
def on_span_start(self, span: Span) -> None:
self._add_source_to_span(span)

sgp_span = create_span(
name=span.name,
span_id=span.id,
Expand All @@ -50,6 +66,7 @@ def on_span_end(self, span: Span) -> None:
)
return

self._add_source_to_span(span)
sgp_span.output = span.output # type: ignore[assignment]
sgp_span.metadata = span.data # type: ignore[assignment]
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
Expand All @@ -70,9 +87,23 @@ def __init__(self, config: SGPTracingProcessorConfig):
if not self.disabled
else None
)
self.env_vars = EnvironmentVariables.refresh()

def _add_source_to_span(self, span: Span) -> None:
if span.data is None:
span.data = {}
if isinstance(span.data, dict):
span.data["__source__"] = "agentex"
if self.env_vars.ACP_TYPE is not None:
span.data["__acp_type__"] = self.env_vars.ACP_TYPE
if self.env_vars.AGENT_NAME is not None:
span.data["__agent_name__"] = self.env_vars.AGENT_NAME
if self.env_vars.AGENT_ID is not None:
span.data["__agent_id__"] = self.env_vars.AGENT_ID

@override
async def on_span_start(self, span: Span) -> None:
self._add_source_to_span(span)
sgp_span = create_span(
name=span.name,
span_id=span.id,
Expand All @@ -85,6 +116,7 @@ async def on_span_start(self, span: Span) -> None:
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]

if self.disabled:
logger.warning("SGP is disabled, skipping span upsert")
return
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
items=[sgp_span.to_request_params()]
Expand All @@ -101,6 +133,7 @@ async def on_span_end(self, span: Span) -> None:
)
return

self._add_source_to_span(span)
sgp_span.output = span.output # type: ignore[assignment]
sgp_span.metadata = span.data # type: ignore[assignment]
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
Expand Down