diff --git a/tests/models/test_llm_response.py b/tests/models/test_llm_response.py index d57ffa6..24f53ba 100644 --- a/tests/models/test_llm_response.py +++ b/tests/models/test_llm_response.py @@ -296,3 +296,47 @@ def test_create_with_candidate_no_finish_reason(self): assert llm_response.content is not None assert llm_response.content.parts[0].text == "Response" # finish_reason=None should not cause error when content exists + + +class TestLlmResponseHasContent: + """Test suite for :meth:`LlmResponse.has_content`.""" + + def test_none_content_is_false(self): + assert LlmResponse(content=None).has_content() is False + + def test_empty_parts_is_false(self): + assert LlmResponse(content=Content(parts=[], role="model")).has_content() is False + + def test_text_part_is_true(self): + content = Content(parts=[Part.from_text(text="hello")], role="model") + assert LlmResponse(content=content).has_content() is True + + def test_function_call_part_is_true(self): + content = Content( + parts=[Part.from_function_call(name="calc", args={"x": 1})], + role="model", + ) + assert LlmResponse(content=content).has_content() is True + + def test_function_response_only_is_false(self): + """Function responses are not user-visible content per the contract.""" + content = Content( + parts=[Part.from_function_response(name="calc", response={"ok": True})], + role="tool", + ) + assert LlmResponse(content=content).has_content() is False + + def test_empty_text_with_function_call_is_true(self): + content = Content( + parts=[ + Part(text=""), + Part.from_function_call(name="calc", args={}), + ], + role="model", + ) + assert LlmResponse(content=content).has_content() is True + + def test_whitespace_text_only_is_true(self): + """Any non-empty text counts as visible content.""" + content = Content(parts=[Part(text=" ")], role="model") + assert LlmResponse(content=content).has_content() is True diff --git a/tests/telemetry/test_custom_metrics.py b/tests/telemetry/test_custom_metrics.py new file mode 100644 index 0000000..91cd740 --- /dev/null +++ b/tests/telemetry/test_custom_metrics.py @@ -0,0 +1,224 @@ +# Tencent is pleased to support the open source community by making tRPC-Agent-Python available. +# +# Copyright (C) 2026 Tencent. All rights reserved. +# +# tRPC-Agent-Python is licensed under Apache-2.0. +"""Unit tests for :class:`trpc_agent_sdk.telemetry.CustomMetricsReporter`. + +Verifies the event-routing state machine: + * partial events bump TTFT only + * function-call events close an LLM segment and start tool timers + * function-response events close tool timers and reopen an LLM segment + * plain content events close an LLM segment + +All ``report_*`` functions are patched to record the calls instead of emitting +to OTel, so the tests are hermetic. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any +from typing import Dict +from typing import List +from unittest.mock import patch + +import pytest + +from trpc_agent_sdk.events import Event +from trpc_agent_sdk.telemetry import CustomMetricsReporter +from trpc_agent_sdk.types import Content +from trpc_agent_sdk.types import Part + + +def _ctx(): + return SimpleNamespace( + app_name="demo", + user_id="alice", + agent_name="asst", + agent=SimpleNamespace(model=None), + ) + + +def _text_event(text: str, *, partial: bool = False) -> Event: + return Event( + invocation_id="inv-1", + author="asst", + partial=partial, + content=Content(parts=[Part.from_text(text=text)], role="model"), + ) + + +def _function_call_event(call_id: str, name: str) -> Event: + return Event( + invocation_id="inv-1", + author="asst", + content=Content( + parts=[Part(function_call={ + "id": call_id, + "name": name, + "args": { + "x": 1 + }, + })], + role="model", + ), + ) + + +def _function_response_event(call_id: str, name: str, *, error: bool = False) -> Event: + ev = Event( + invocation_id="inv-1", + author="tool", + content=Content( + parts=[Part(function_response={ + "id": call_id, + "name": name, + "response": { + "ok": not error + }, + })], + role="tool", + ), + ) + if error: + ev.error_code = "500" + return ev + + +class _Capture: + """Helper to capture kwargs from patched ``report_*`` functions.""" + + def __init__(self): + self.calls: List[Dict[str, Any]] = [] + + def __call__(self, *args, **kwargs): + self.calls.append({"args": args, "kwargs": kwargs}) + + +@pytest.fixture() +def patched_reporters(): + """Patch the two ``report_*`` functions imported into ``_custom_metrics``.""" + llm = _Capture() + tool = _Capture() + with patch("trpc_agent_sdk.telemetry._custom_metrics.report_call_llm", + new=llm), patch("trpc_agent_sdk.telemetry._custom_metrics.report_execute_tool", new=tool): + yield llm, tool + + +class TestCustomMetricsReporterRouting: + + def test_plain_content_event_emits_call_llm(self, patched_reporters): + llm, tool = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst", model_prefix="claude") + + reporter.report_event(_ctx(), _text_event("hello")) + + assert len(llm.calls) == 1 + assert len(tool.calls) == 0 + kw = llm.calls[0]["kwargs"] + req = kw["llm_request"] + assert req.model == "claude:asst" + assert kw["is_stream"] is True # default + assert kw["duration_s"] >= 0.0 + assert kw["ttft_s"] >= 0.0 + + def test_partial_event_does_not_emit(self, patched_reporters): + llm, tool = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst") + + reporter.report_event(_ctx(), _text_event("chunk", partial=True)) + reporter.report_event(_ctx(), _text_event("chunk 2", partial=True)) + + assert llm.calls == [] + assert tool.calls == [] + # TTFT is latched as soon as any content event arrives, partial or not. + assert reporter._llm_ttft is not None + + def test_function_call_closes_llm_and_opens_tool_timers(self, patched_reporters): + llm, tool = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst") + + reporter.report_event(_ctx(), _function_call_event("c1", "search")) + + assert len(llm.calls) == 1, "function-call event must emit the open LLM segment" + assert len(tool.calls) == 0 + assert reporter._pending_tool_starts.keys() == {"c1"} + assert reporter._pending_tool_starts["c1"][0] == "search" + assert reporter._llm_segment_start is None + + def test_function_response_emits_execute_tool_and_reopens_segment(self, patched_reporters): + llm, tool = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst") + + reporter.report_event(_ctx(), _function_call_event("c1", "search")) + reporter.report_event(_ctx(), _function_response_event("c1", "search")) + + assert len(tool.calls) == 1 + kw = tool.calls[0]["kwargs"] + assert kw["duration_s"] >= 0.0 + assert kw["error_type"] is None + assert tool.calls[0]["args"][1].name == "search" + assert reporter._pending_tool_starts == {} + assert reporter._llm_segment_start is not None + + def test_tool_error_type_propagates(self, patched_reporters): + llm, tool = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst") + + reporter.report_event(_ctx(), _function_call_event("c1", "search")) + reporter.report_event(_ctx(), _function_response_event("c1", "search", error=True)) + + assert tool.calls[0]["kwargs"]["error_type"] == "500" + + def test_unmatched_function_response_is_ignored(self, patched_reporters): + llm, tool = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst") + + # No matching function_call beforehand. + reporter.report_event(_ctx(), _function_response_event("unknown", "search")) + + assert tool.calls == [] + # Segment was still reopened. + assert reporter._llm_segment_start is not None + + def test_full_round_trip_chat_tool_chat(self, patched_reporters): + """LLM call -> tool call -> tool result -> final LLM chunk.""" + llm, tool = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst", model_prefix="a2a") + + reporter.report_event(_ctx(), _function_call_event("c1", "search")) + reporter.report_event(_ctx(), _function_response_event("c1", "search")) + reporter.report_event(_ctx(), _text_event("final answer")) + + assert len(llm.calls) == 2, "two LLM segments: before tool + after tool" + assert len(tool.calls) == 1 + for call in llm.calls: + assert call["kwargs"]["llm_request"].model == "a2a:asst" + + def test_extra_attributes_forwarded(self, patched_reporters): + llm, tool = patched_reporters + reporter = CustomMetricsReporter( + agent_name="asst", + extra_attributes={"gen_ai.system": "openai"}, + ) + reporter.report_event(_ctx(), _function_call_event("c1", "search")) + reporter.report_event(_ctx(), _function_response_event("c1", "search")) + + assert llm.calls[0]["kwargs"]["extra_attributes"] == {"gen_ai.system": "openai"} + assert tool.calls[0]["kwargs"]["extra_attributes"] == {"gen_ai.system": "openai"} + + +class TestCustomMetricsReporterReset: + + def test_reset_clears_pending_state(self, patched_reporters): + _, _ = patched_reporters + reporter = CustomMetricsReporter(agent_name="asst") + reporter.report_event(_ctx(), _function_call_event("c1", "search")) + assert reporter._pending_tool_starts + + reporter.reset() + + assert reporter._pending_tool_starts == {} + assert reporter._llm_segment_start is None + assert reporter._llm_ttft is None diff --git a/tests/telemetry/test__custom_trace.py b/tests/telemetry/test_custom_trace.py similarity index 100% rename from tests/telemetry/test__custom_trace.py rename to tests/telemetry/test_custom_trace.py diff --git a/tests/telemetry/test_metrics.py b/tests/telemetry/test_metrics.py new file mode 100644 index 0000000..d02c086 --- /dev/null +++ b/tests/telemetry/test_metrics.py @@ -0,0 +1,512 @@ +# Tencent is pleased to support the open source community by making tRPC-Agent-Python available. +# +# Copyright (C) 2026 Tencent. All rights reserved. +# +# tRPC-Agent-Python is licensed under Apache-2.0. +"""Unit tests for :mod:`trpc_agent_sdk.telemetry._metrics`. + +Uses an ``InMemoryMetricReader`` bound to a private ``MeterProvider`` so the +tests can inspect the OTel data points the ``report_*`` functions emit, without +touching the global meter provider. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any +from typing import Dict +from typing import Mapping +from typing import Optional + +import pytest +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader + +from trpc_agent_sdk.telemetry import _metrics as tmetrics + + +def _make_ctx( + *, + app_name: str = "demo", + user_id: str = "alice", + agent_name: str = "asst", + agent_model: Any = None, +) -> SimpleNamespace: + """Build a duck-typed ``InvocationContext`` stub. + + The ``report_*`` functions only read ``app_name``, ``user_id``, + ``agent_name``, and ``agent`` from the context, so a ``SimpleNamespace`` is + sufficient and avoids having to construct a real session/agent. + """ + agent = SimpleNamespace(model=agent_model) + return SimpleNamespace( + app_name=app_name, + user_id=user_id, + agent_name=agent_name, + agent=agent, + ) + + +class _StubTool: + + def __init__(self, name: str): + self.name = name + + +class _StubLlmRequest: + + def __init__(self, model: str): + self.model = model + + +class _StubUsage: + + def __init__(self, prompt: int, total: int): + self.prompt_token_count = prompt + self.total_token_count = total + + +class _StubLlmResponse: + + def __init__( + self, + *, + model: str = "", + error_code: str = "", + usage: Optional[_StubUsage] = None, + ): + self.model = model + self.error_code = error_code + self.usage_metadata = usage + + +@pytest.fixture() +def reader_provider(monkeypatch): + """Install an ``InMemoryMetricReader`` on a private ``MeterProvider``. + + Rebinds the module-level instruments in :mod:`trpc_agent_sdk.telemetry._metrics` + to the test meter so we can introspect emissions without global state. + """ + reader = InMemoryMetricReader() + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("test") + + originals = { + "_request_cnt": tmetrics._request_cnt, + "_operation_duration": tmetrics._operation_duration, + "_time_to_first_token": tmetrics._time_to_first_token, + "_usage_input_tokens": tmetrics._usage_input_tokens, + "_usage_output_tokens": tmetrics._usage_output_tokens, + } + monkeypatch.setattr( + tmetrics, + "_request_cnt", + meter.create_counter("gen_ai.request_cnt"), + ) + monkeypatch.setattr( + tmetrics, + "_operation_duration", + meter.create_histogram("gen_ai.client.operation.duration"), + ) + monkeypatch.setattr( + tmetrics, + "_time_to_first_token", + meter.create_histogram("gen_ai.server.time_to_first_token"), + ) + monkeypatch.setattr( + tmetrics, + "_usage_input_tokens", + meter.create_histogram("gen_ai.usage.input_tokens"), + ) + monkeypatch.setattr( + tmetrics, + "_usage_output_tokens", + meter.create_histogram("gen_ai.usage.output_tokens"), + ) + + yield reader, provider + + for name, inst in originals.items(): + monkeypatch.setattr(tmetrics, name, inst) + provider.shutdown() + + +def _collect(reader: InMemoryMetricReader) -> Dict[str, list]: + """Collect and index data points by metric name.""" + data = reader.get_metrics_data() + out: Dict[str, list] = {} + if data is None: + return out + for rm in data.resource_metrics: + for sm in rm.scope_metrics: + for metric in sm.metrics: + for dp in getattr(metric.data, "data_points", []) or []: + out.setdefault(metric.name, []).append(dp) + return out + + +def _attrs(dp) -> Mapping[str, Any]: + return dict(dp.attributes or {}) + + +class TestInferSystem: + """Vendor inference from model name.""" + + @pytest.mark.parametrize( + "model,expected", + [ + ("gpt-4", "openai"), + ("gpt-4o-mini", "openai"), + ("o1-preview", "openai"), + ("text-embedding-3-large", "openai"), + ("claude-3-5-sonnet", "anthropic"), + ("CLAUDE-opus", "anthropic"), + ("gemini-2.0-flash", "gcp.gemini"), + ("hunyuan-pro", "hunyuan"), + ("taiji-v1", "taiji"), + ("", ""), + ("unknown-model-x", ""), + ], + ) + def test_known_and_unknown(self, model: str, expected: str): + assert tmetrics._infer_system(model) == expected + + +class TestAgentModelName: + """Best-effort extraction of the agent's model name.""" + + def test_string_model(self): + agent = SimpleNamespace(model="claude-3-haiku") + assert tmetrics._agent_model_name(agent) == "claude-3-haiku" + + def test_model_with_name_property(self): + agent = SimpleNamespace(model=SimpleNamespace(name="gpt-4")) + assert tmetrics._agent_model_name(agent) == "gpt-4" + + def test_missing_model_attribute(self): + agent = SimpleNamespace() + assert tmetrics._agent_model_name(agent) == "" + + def test_model_is_none(self): + agent = SimpleNamespace(model=None) + assert tmetrics._agent_model_name(agent) == "" + + def test_model_is_callable(self): + """Callable factories (lazy agents) are not statically reachable.""" + agent = SimpleNamespace(model=lambda *_: None) + assert tmetrics._agent_model_name(agent) == "" + + +class TestMergeExtras: + + def test_no_extras(self): + base = {"a": 1} + assert tmetrics._merge_extras(base, None) is base + + def test_extras_override_base(self): + base = {"a": 1, "b": 2} + out = tmetrics._merge_extras(base, {"b": 3, "c": 4}) + assert out == {"a": 1, "b": 3, "c": 4} + assert base == {"a": 1, "b": 2}, "base must not be mutated" + + def test_none_values_are_skipped(self): + out = tmetrics._merge_extras({"a": 1}, {"a": None, "b": None, "c": 7}) + assert out == {"a": 1, "c": 7} + + +class TestReportCallLlm: + + def test_emits_request_cnt_duration_and_ttft(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx() + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("gpt-4"), + _StubLlmResponse(model="gpt-4-0613"), + duration_s=1.25, + ttft_s=0.2, + is_stream=True, + ) + metrics = _collect(reader) + + assert "gen_ai.request_cnt" in metrics + assert "gen_ai.client.operation.duration" in metrics + assert "gen_ai.server.time_to_first_token" in metrics + assert "gen_ai.usage.input_tokens" not in metrics + assert "gen_ai.usage.output_tokens" not in metrics + + cnt_dp = metrics["gen_ai.request_cnt"][0] + assert cnt_dp.value == 1 + attrs = _attrs(cnt_dp) + assert attrs["gen_ai.operation.name"] == "chat" + assert attrs["gen_ai.system"] == "openai" + assert attrs["gen_ai.app.name"] == "demo" + assert attrs["gen_ai.user.id"] == "alice" + assert attrs["gen_ai.agent.id"] == "asst" + assert attrs["gen_ai.agent.name"] == "asst" + assert attrs["gen_ai.request.model"] == "gpt-4" + assert attrs["gen_ai.response.model"] == "gpt-4-0613" + assert attrs["gen_ai.is_stream"] is True + assert attrs["error.type"] == "" + assert attrs["gen_ai.response.error_code"] == "" + + dur_dp = metrics["gen_ai.client.operation.duration"][0] + assert dur_dp.sum == pytest.approx(1.25) + ttft_dp = metrics["gen_ai.server.time_to_first_token"][0] + assert ttft_dp.sum == pytest.approx(0.2) + + def test_usage_tokens_emitted_when_usage_metadata_present(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx() + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("claude-3-5-sonnet"), + _StubLlmResponse( + model="claude-3-5-sonnet", + usage=_StubUsage(prompt=120, total=170), + ), + duration_s=2.0, + ttft_s=0.3, + is_stream=False, + ) + metrics = _collect(reader) + + inp = metrics["gen_ai.usage.input_tokens"][0] + out = metrics["gen_ai.usage.output_tokens"][0] + assert inp.sum == 120 + assert out.sum == 50 + + def test_usage_tokens_skipped_when_missing(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx() + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("gpt-4"), + None, + duration_s=1.0, + ttft_s=0.1, + is_stream=False, + ) + metrics = _collect(reader) + assert "gen_ai.usage.input_tokens" not in metrics + assert "gen_ai.usage.output_tokens" not in metrics + + def test_usage_zero_tokens_skipped(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx() + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("gpt-4"), + _StubLlmResponse(model="gpt-4", usage=_StubUsage(prompt=0, total=0)), + duration_s=1.0, + ttft_s=0.1, + is_stream=False, + ) + metrics = _collect(reader) + assert "gen_ai.usage.input_tokens" not in metrics + assert "gen_ai.usage.output_tokens" not in metrics + + def test_error_type_and_response_code_propagate(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx() + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("gpt-4"), + _StubLlmResponse(model="gpt-4", error_code="429"), + duration_s=0.1, + ttft_s=0.1, + is_stream=False, + error_type="rate_limit", + ) + cnt_dp = _collect(reader)["gen_ai.request_cnt"][0] + attrs = _attrs(cnt_dp) + assert attrs["error.type"] == "rate_limit" + assert attrs["gen_ai.response.error_code"] == "429" + + def test_extra_attributes_override_inferred_system(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx() + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("some-custom-model"), + _StubLlmResponse(model="some-custom-model"), + duration_s=0.1, + ttft_s=0.1, + is_stream=False, + extra_attributes={ + "gen_ai.system": "openai", + "user_ext1": "abc" + }, + ) + attrs = _attrs(_collect(reader)["gen_ai.request_cnt"][0]) + assert attrs["gen_ai.system"] == "openai" + assert attrs["user_ext1"] == "abc" + + +class TestReportExecuteTool: + + def test_emits_request_cnt_and_duration(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model="hunyuan-pro") + tmetrics.report_execute_tool(ctx, _StubTool("search"), duration_s=0.5) + metrics = _collect(reader) + + assert "gen_ai.request_cnt" in metrics + assert "gen_ai.client.operation.duration" in metrics + assert "gen_ai.server.time_to_first_token" not in metrics + assert "gen_ai.usage.input_tokens" not in metrics + + attrs = _attrs(metrics["gen_ai.request_cnt"][0]) + assert attrs["gen_ai.operation.name"] == "execute_tool" + assert attrs["gen_ai.tool.name"] == "search" + assert attrs["gen_ai.system"] == "hunyuan" + assert attrs["gen_ai.app.name"] == "demo" + + def test_system_empty_when_agent_has_no_model(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model=None) + tmetrics.report_execute_tool(ctx, _StubTool("search"), duration_s=0.5) + attrs = _attrs(_collect(reader)["gen_ai.request_cnt"][0]) + assert attrs["gen_ai.system"] == "" + + def test_extra_attrs_can_supply_system(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model=None) + tmetrics.report_execute_tool( + ctx, + _StubTool("search"), + duration_s=0.5, + extra_attributes={"gen_ai.system": "openai"}, + ) + attrs = _attrs(_collect(reader)["gen_ai.request_cnt"][0]) + assert attrs["gen_ai.system"] == "openai" + + def test_error_type_propagates(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model="claude-3-haiku") + tmetrics.report_execute_tool(ctx, _StubTool("search"), duration_s=0.5, error_type="timeout") + attrs = _attrs(_collect(reader)["gen_ai.request_cnt"][0]) + assert attrs["error.type"] == "timeout" + assert attrs["gen_ai.system"] == "anthropic" + + +class TestReportInvokeAgent: + + def test_emits_all_five_instruments(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model="gpt-4") + tmetrics.report_invoke_agent( + ctx, + duration_s=3.0, + ttft_s=0.4, + input_tokens=200, + output_tokens=50, + is_stream=True, + ) + metrics = _collect(reader) + + assert metrics["gen_ai.request_cnt"][0].value == 1 + assert metrics["gen_ai.client.operation.duration"][0].sum == pytest.approx(3.0) + assert metrics["gen_ai.server.time_to_first_token"][0].sum == pytest.approx(0.4) + assert metrics["gen_ai.usage.input_tokens"][0].sum == 200 + assert metrics["gen_ai.usage.output_tokens"][0].sum == 50 + + attrs = _attrs(metrics["gen_ai.request_cnt"][0]) + assert attrs["gen_ai.operation.name"] == "invoke_agent" + assert attrs["gen_ai.system"] == "openai" + assert attrs["gen_ai.is_stream"] is True + assert attrs["gen_ai.agent.name"] == "asst" + + def test_zero_tokens_are_skipped(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model="gpt-4") + tmetrics.report_invoke_agent( + ctx, + duration_s=1.0, + ttft_s=0.1, + input_tokens=0, + output_tokens=0, + is_stream=False, + ) + metrics = _collect(reader) + assert "gen_ai.usage.input_tokens" not in metrics + assert "gen_ai.usage.output_tokens" not in metrics + + def test_partial_tokens_are_independently_skipped(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model="gpt-4") + tmetrics.report_invoke_agent( + ctx, + duration_s=1.0, + ttft_s=0.1, + input_tokens=120, + output_tokens=0, + is_stream=False, + ) + metrics = _collect(reader) + assert metrics["gen_ai.usage.input_tokens"][0].sum == 120 + assert "gen_ai.usage.output_tokens" not in metrics + + def test_error_type_propagates(self, reader_provider): + reader, _ = reader_provider + ctx = _make_ctx(agent_model="gpt-4") + tmetrics.report_invoke_agent( + ctx, + duration_s=0.1, + ttft_s=0.1, + input_tokens=0, + output_tokens=0, + is_stream=True, + error_type="cancelled", + ) + attrs = _attrs(_collect(reader)["gen_ai.request_cnt"][0]) + assert attrs["error.type"] == "cancelled" + + +class TestOperationRouting: + + def test_three_operations_create_three_separate_streams(self, reader_provider): + """Same counter, different attrs -> three independent time series.""" + reader, _ = reader_provider + ctx = _make_ctx(agent_model="gpt-4") + + tmetrics.report_invoke_agent( + ctx, + duration_s=1.0, + ttft_s=0.1, + input_tokens=0, + output_tokens=0, + is_stream=True, + ) + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("gpt-4"), + _StubLlmResponse(model="gpt-4"), + duration_s=0.5, + ttft_s=0.1, + is_stream=True, + ) + tmetrics.report_execute_tool(ctx, _StubTool("calc"), duration_s=0.2) + + request_cnt_dps = _collect(reader)["gen_ai.request_cnt"] + ops = sorted(_attrs(dp)["gen_ai.operation.name"] for dp in request_cnt_dps) + assert ops == ["chat", "execute_tool", "invoke_agent"] + for dp in request_cnt_dps: + assert dp.value == 1 + + def test_repeated_same_operation_aggregates(self, reader_provider): + """Identical attrs -> single time series with summed value.""" + reader, _ = reader_provider + ctx = _make_ctx(agent_model="gpt-4") + for _ in range(3): + tmetrics.report_call_llm( + ctx, + _StubLlmRequest("gpt-4"), + _StubLlmResponse(model="gpt-4"), + duration_s=0.1, + ttft_s=0.05, + is_stream=True, + ) + dps = _collect(reader)["gen_ai.request_cnt"] + assert len(dps) == 1 + assert dps[0].value == 3 diff --git a/tests/telemetry/test__trace.py b/tests/telemetry/test_trace.py similarity index 100% rename from tests/telemetry/test__trace.py rename to tests/telemetry/test_trace.py diff --git a/trpc_agent_sdk/agents/_base_agent.py b/trpc_agent_sdk/agents/_base_agent.py index 0539dba..930cd89 100644 --- a/trpc_agent_sdk/agents/_base_agent.py +++ b/trpc_agent_sdk/agents/_base_agent.py @@ -21,6 +21,7 @@ from __future__ import annotations +import time from abc import abstractmethod from functools import partial from typing import Any @@ -50,6 +51,34 @@ InstructionProvider = Callable[[InvocationContext], Union[str, Awaitable[str]]] +def _aggregate_llm_usage(events: list[Event]) -> tuple[int, int]: + """Sum prompt/completion tokens across LLM events during one agent run. + + Agent-level metrics (``GenAIInvokeAgent``) roll up the token usage of every + LLM call performed during the agent run. Each non-partial ``Event`` produced + by an LLM carries a :class:`GenerateContentResponseUsageMetadata` with the + cumulative counts for that single model call. + + Args: + events: Non-partial events collected during the agent run. + + Returns: + Tuple of ``(input_tokens, output_tokens)``. + """ + input_tokens = 0 + output_tokens = 0 + for event in events: + usage = getattr(event, "usage_metadata", None) + if usage is None: + continue + prompt = getattr(usage, "prompt_token_count", None) or 0 + total = getattr(usage, "total_token_count", None) or 0 + if prompt and total: + input_tokens += prompt + output_tokens += max(total - prompt, 0) + return input_tokens, output_tokens + + def _build_action_string_from_events(events: list[Event], max_length: int = 500) -> str: """Build formatted action string from agent events. @@ -227,6 +256,7 @@ async def run_async( - State changes - Actions """ + from trpc_agent_sdk.telemetry import report_invoke_agent from trpc_agent_sdk.telemetry._trace import tracer from trpc_agent_sdk.telemetry._trace import trace_agent @@ -246,14 +276,23 @@ async def run_async( # Track all non-partial events for building action trace non_partial_events = [] + mono_start = time.monotonic() + t_first_visible: Optional[float] = None + metrics_error_type: Optional[str] = None + try: gen_co = run_stream_filters(ctx.agent_context, None, self.filters, handle) # type: ignore async for event in gen_co: + if t_first_visible is None and event.has_content(): + t_first_visible = time.monotonic() if not event.partial and event.content is not None: # Collect non-partial events with content for tracing # This excludes state update events which have content=None non_partial_events.append(event) yield event # type: ignore + except Exception as ex: + metrics_error_type = type(ex).__name__ + raise finally: # Compute state after agent run state_end = dict(ctx.session.state) @@ -268,6 +307,21 @@ async def run_async( state_begin=state_begin, state_end=state_end, ) + + duration_s = time.monotonic() - mono_start + ttft_s = (t_first_visible - mono_start) if t_first_visible is not None else duration_s + input_tokens, output_tokens = _aggregate_llm_usage(non_partial_events) + is_stream = bool(ctx.run_config and ctx.run_config.streaming) + report_invoke_agent( + ctx, + duration_s=duration_s, + ttft_s=ttft_s, + input_tokens=input_tokens, + output_tokens=output_tokens, + is_stream=is_stream, + error_type=metrics_error_type, + ) + # avoid memory leak reset_invocation_ctx(token) finally: diff --git a/trpc_agent_sdk/agents/core/_llm_processor.py b/trpc_agent_sdk/agents/core/_llm_processor.py index 834766d..6d3ca37 100644 --- a/trpc_agent_sdk/agents/core/_llm_processor.py +++ b/trpc_agent_sdk/agents/core/_llm_processor.py @@ -14,7 +14,9 @@ from __future__ import annotations +import time from typing import AsyncGenerator +from typing import Optional from trpc_agent_sdk.context import InvocationContext from trpc_agent_sdk.events import Event @@ -23,6 +25,7 @@ from trpc_agent_sdk.models import LlmRequest from trpc_agent_sdk.models import LlmResponse from trpc_agent_sdk.planners import default_planning_processor +from trpc_agent_sdk.telemetry import report_call_llm from trpc_agent_sdk.telemetry import trace_call_llm from trpc_agent_sdk.telemetry import tracer @@ -94,29 +97,56 @@ def _append_function_calls(target: list[dict], calls: list) -> None: "args": getattr(call, "args", None), }) - async for llm_response in self.model.generate_async(request, stream=stream, ctx=context): - # Collect raw model-level function calls from every chunk. - raw_calls = [] - if llm_response.content and llm_response.content.parts: - for part in llm_response.content.parts: - if part.function_call: - raw_calls.append(part.function_call) - _append_function_calls(aggregated_raw_function_calls, raw_calls) - - # Create Event directly from LlmResponse - event = self._create_event_from_response(context, event_id, llm_response) - - # Process response with planner if available - event = self._process_planning_response(event, context) - _append_function_calls(aggregated_event_function_calls, event.get_function_calls()) - - # Track the latest non-partial response for tracing - # In streaming mode, only the final (non-partial) response - # contains complete data suitable for telemetry reporting. - if not llm_response.partial: - final_llm_response = llm_response - - yield event + t_start = time.monotonic() + t_first_token: Optional[float] = None + metrics_error_type: Optional[str] = None + try: + async for llm_response in self.model.generate_async(request, stream=stream, ctx=context): + if t_first_token is None and llm_response.has_content(): + t_first_token = time.monotonic() + # Collect raw model-level function calls from every chunk. + raw_calls = [] + if llm_response.content and llm_response.content.parts: + for part in llm_response.content.parts: + if part.function_call: + raw_calls.append(part.function_call) + _append_function_calls(aggregated_raw_function_calls, raw_calls) + + # Create Event directly from LlmResponse + event = self._create_event_from_response(context, event_id, llm_response) + + # Process response with planner if available + event = self._process_planning_response(event, context) + _append_function_calls(aggregated_event_function_calls, event.get_function_calls()) + + # Create Event directly from LlmResponse + event = self._create_event_from_response(context, event_id, llm_response) + + # Process response with planner if available + event = self._process_planning_response(event, context) + + # Track the latest non-partial response for tracing + # In streaming mode, only the final (non-partial) response + # contains complete data suitable for telemetry reporting. + if not llm_response.partial: + final_llm_response = llm_response + + yield event + except Exception as ex: + metrics_error_type = type(ex).__name__ + raise + finally: + duration_s = time.monotonic() - t_start + ttft_s = (t_first_token - t_start) if t_first_token is not None else duration_s + report_call_llm( + context, + request, + final_llm_response, + duration_s=duration_s, + ttft_s=ttft_s, + is_stream=stream, + error_type=metrics_error_type, + ) # Trace the LLM call once after the stream completes, # using the final complete response to avoid attribute diff --git a/trpc_agent_sdk/agents/core/_tools_processor.py b/trpc_agent_sdk/agents/core/_tools_processor.py index 9977d54..29307ef 100644 --- a/trpc_agent_sdk/agents/core/_tools_processor.py +++ b/trpc_agent_sdk/agents/core/_tools_processor.py @@ -27,6 +27,7 @@ from trpc_agent_sdk.events import EventActions from trpc_agent_sdk.log import logger from trpc_agent_sdk.models import LlmRequest +from trpc_agent_sdk.telemetry import report_execute_tool from trpc_agent_sdk.telemetry import trace_merged_tool_calls from trpc_agent_sdk.telemetry import trace_tool_call from trpc_agent_sdk.telemetry import tracer @@ -311,10 +312,18 @@ async def _execute_tool(self, tool_call: FunctionCall, tool: BaseTool, context: # Set function call ID for context context.function_call_id = tool_call.id + start_time = time.monotonic() + try: - start_time = time.time() result = await tool.run_async(tool_context=context, args=arguments) - execution_time = time.time() - start_time + execution_time = time.monotonic() - start_time + + report_execute_tool( + context, + tool, + duration_s=execution_time, + error_type=None, + ) # Build function response if not isinstance(result, dict): @@ -370,6 +379,13 @@ async def _execute_tool(self, tool_call: FunctionCall, tool: BaseTool, context: return event except Exception as ex: # pylint: disable=broad-except + report_execute_tool( + context, + tool, + duration_s=time.monotonic() - start_time, + error_type=type(ex).__name__, + ) + error_event = self._create_error_event(context, "tool_execution_error", str(ex), tool_call.id, tool_call.name) diff --git a/trpc_agent_sdk/models/_llm_response.py b/trpc_agent_sdk/models/_llm_response.py index 28212a9..aa2fbc9 100644 --- a/trpc_agent_sdk/models/_llm_response.py +++ b/trpc_agent_sdk/models/_llm_response.py @@ -49,6 +49,17 @@ class LlmResponse(ResponseABC): custom_metadata: The custom metadata of the LlmResponse. """ + def has_content(self) -> bool: + """Returns whether the response carries user-visible content (text or function call). + + Returns True if any content part contains text or a function call. Parts that + only hold function responses, executable code, or code execution results are + not considered content for this check. + """ + if not self.content or not self.content.parts: + return False + return any(p.text or p.function_call for p in self.content.parts) + @override def create( self, diff --git a/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py b/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py index 5304255..e76540c 100644 --- a/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py +++ b/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py @@ -59,6 +59,7 @@ from trpc_agent_sdk.events import Event from trpc_agent_sdk.exceptions import RunCancelledException from trpc_agent_sdk.log import logger +from trpc_agent_sdk.telemetry import CustomMetricsReporter from trpc_agent_sdk.telemetry import CustomTraceReporter from trpc_agent_sdk.types import Content from trpc_agent_sdk.types import Part @@ -246,6 +247,10 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, model_prefix="a2a", tool_description_prefix="Remote A2A tool", ) + metrics_reporter = CustomMetricsReporter( + agent_name=self.name, + model_prefix="a2a", + ) task_id = None out_headers: dict[str, str] = {} @@ -277,6 +282,7 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, for event in self._events_from_response(result, event_count, ctx): trace_reporter.trace_event(ctx, event) + metrics_reporter.report_event(ctx, event) yield event logger.debug("Streaming completed with %s events", event_count) diff --git a/trpc_agent_sdk/telemetry/__init__.py b/trpc_agent_sdk/telemetry/__init__.py index 202996f..03bacbd 100644 --- a/trpc_agent_sdk/telemetry/__init__.py +++ b/trpc_agent_sdk/telemetry/__init__.py @@ -5,7 +5,11 @@ # tRPC-Agent-Python is licensed under Apache-2.0. """Telemetry module for TRPC Agent framework.""" +from ._custom_metrics import CustomMetricsReporter from ._custom_trace import CustomTraceReporter +from ._metrics import report_call_llm +from ._metrics import report_execute_tool +from ._metrics import report_invoke_agent from ._trace import get_trpc_agent_span_name from ._trace import set_trpc_agent_span_name from ._trace import trace_agent @@ -17,7 +21,11 @@ from ._trace import tracer __all__ = [ + "CustomMetricsReporter", "CustomTraceReporter", + "report_call_llm", + "report_execute_tool", + "report_invoke_agent", "trace_agent", "trace_call_llm", "trace_cancellation", diff --git a/trpc_agent_sdk/telemetry/_custom_metrics.py b/trpc_agent_sdk/telemetry/_custom_metrics.py new file mode 100644 index 0000000..e38b7c2 --- /dev/null +++ b/trpc_agent_sdk/telemetry/_custom_metrics.py @@ -0,0 +1,215 @@ +# Tencent is pleased to support the open source community by making tRPC-Agent-Python available. +# +# Copyright (C) 2026 Tencent. All rights reserved. +# +# tRPC-Agent-Python is licensed under Apache-2.0. +"""Custom metrics reporter for ecosystem agents and user-defined custom agents. + +Parallel to :mod:`trpc_agent_sdk.telemetry._custom_trace`. Custom agents that do +not go through the standard ``_llm_processor`` / ``_tools_processor`` paths use +this reporter to emit OTel ``gen_ai.*`` metrics for each LLM response and each +tool invocation by pairing ``function_call`` events with their matching +``function_response`` events. + +``invoke_agent`` metrics are already emitted by ``BaseAgent.run_async`` for +every agent subclass, so this reporter intentionally covers only ``chat`` and +``execute_tool``. + +Example usage: + ```python + from trpc_agent_sdk.telemetry import CustomMetricsReporter, CustomTraceReporter + + class MyCustomAgent(BaseAgent): + async def _run_async_impl(self, ctx): + trace_reporter = CustomTraceReporter( + agent_name=self.name, model_prefix="custom") + metrics_reporter = CustomMetricsReporter( + agent_name=self.name, model_prefix="custom") + + async for event in self._stream(): + trace_reporter.trace_event(ctx, event) + metrics_reporter.report_event(ctx, event) + yield event + ``` +""" + +from __future__ import annotations + +import time +from typing import Any +from typing import Mapping +from typing import Optional + +from trpc_agent_sdk.context import InvocationContext +from trpc_agent_sdk.events import Event +from trpc_agent_sdk.models import LlmRequest +from trpc_agent_sdk.models import LlmResponse + +from ._custom_trace import _SyntheticTool +from ._metrics import report_call_llm +from ._metrics import report_execute_tool + + +class CustomMetricsReporter: + """Reusable metrics reporter for custom agent implementations. + + Mirrors :class:`CustomTraceReporter`: stream every event through + :meth:`report_event` and the reporter will emit ``call_llm`` metrics for + each complete LLM response event and ``execute_tool`` metrics for each + ``function_call`` / ``function_response`` pair. ``invoke_agent`` metrics + are emitted by ``BaseAgent.run_async`` and are not handled here. + + Timing model: + - ``call_llm.duration`` is measured from the end of the previous + segment (agent start, or the last ``function_response``) to the + arrival of the current non-partial LLM event. + - ``call_llm.time_to_first_token`` is measured from the segment start + to the first event carrying user-visible content. + - ``execute_tool.duration`` is measured from a ``function_call`` event + to its matching ``function_response`` event. + + Attributes: + agent_name: The name of the agent using this reporter. + model_prefix: Prefix for the model name in metrics (e.g., "claude"). The + emitted ``gen_ai.request.model`` will be ``{model_prefix}:{agent_name}``. + is_stream: Whether the underlying agent streams its responses. Affects + the ``gen_ai.is_stream`` attribute on emitted records. + extra_attributes: Optional constant attributes merged onto every record. + """ + + def __init__( + self, + agent_name: str, + model_prefix: str = "custom", + *, + is_stream: bool = True, + extra_attributes: Optional[Mapping[str, Any]] = None, + ): + """Initialize the CustomMetricsReporter. + + Args: + agent_name: The name of the agent using this reporter. + model_prefix: Prefix for the model name in emitted metrics + (default: ``"custom"``). The full model name will be + ``"{model_prefix}:{agent_name}"``. + is_stream: Whether the agent streams its output (default: ``True``). + extra_attributes: Optional extra attributes merged onto every + emitted record. ``None`` values are ignored. + """ + self.agent_name = agent_name + self.model_prefix = model_prefix + self.is_stream = is_stream + self.extra_attributes = dict(extra_attributes) if extra_attributes else None + + self._pending_tool_starts: dict[str, tuple[str, float]] = {} + self._llm_segment_start: Optional[float] = None + self._llm_ttft: Optional[float] = None + + def report_event(self, ctx: InvocationContext, event: Event) -> None: + """Process metrics for a single event. + + Routes the event to one of three handlers: + * ``function_call`` events start tool timers and close the current + LLM segment (emitting ``call_llm``). + * ``function_response`` events close tool timers (emitting + ``execute_tool``) and start a new LLM segment. + * Any other non-partial event carrying user-visible content is + treated as a complete LLM response (emitting ``call_llm``). + + Partial events only update the TTFT measurement for the current LLM + segment; they do not trigger any emission. + + Args: + ctx: Invocation context. + event: The event to process. + """ + now = time.monotonic() + if self._llm_segment_start is None: + self._llm_segment_start = now + if self._llm_ttft is None and event.has_content(): + self._llm_ttft = now - self._llm_segment_start + + if event.partial: + return + + if event.get_function_calls(): + self._emit_call_llm(ctx, event, now) + for fc in event.get_function_calls(): + self._pending_tool_starts[fc.id] = (fc.name, now) + self._llm_segment_start = None + self._llm_ttft = None + return + + if event.get_function_responses(): + for fr in event.get_function_responses(): + state = self._pending_tool_starts.pop(fr.id, None) + if state is None: + continue + tool_name, t0 = state + report_execute_tool( + ctx, + _SyntheticTool(name=tool_name), + duration_s=now - t0, + error_type=self._tool_error_type(event), + extra_attributes=self.extra_attributes, + ) + self._llm_segment_start = now + self._llm_ttft = None + return + + if event.has_content(): + self._emit_call_llm(ctx, event, now) + self._llm_segment_start = now + self._llm_ttft = None + + def reset(self) -> None: + """Reset reporter state. + + Clears pending tool-call timers and LLM-segment bookkeeping. Call + between separate runs of the same reporter instance. + """ + self._pending_tool_starts.clear() + self._llm_segment_start = None + self._llm_ttft = None + + def _emit_call_llm( + self, + ctx: InvocationContext, + event: Event, + now: float, + ) -> None: + if self._llm_segment_start is None: + return + duration = now - self._llm_segment_start + ttft = self._llm_ttft if self._llm_ttft is not None else duration + + llm_request = LlmRequest(model=f"{self.model_prefix}:{self.agent_name}") + llm_response = LlmResponse( + content=event.content, + usage_metadata=getattr(event, "usage_metadata", None), + error_code=getattr(event, "error_code", None), + error_message=getattr(event, "error_message", None), + ) + error_type: Optional[str] = None + if getattr(event, "error_code", None): + error_type = str(event.error_code) + report_call_llm( + ctx, + llm_request=llm_request, + llm_response=llm_response, + duration_s=duration, + ttft_s=ttft, + is_stream=self.is_stream, + error_type=error_type, + extra_attributes=self.extra_attributes, + ) + + @staticmethod + def _tool_error_type(event: Event) -> Optional[str]: + """Best-effort error-type extraction from a function-response event.""" + err_code = getattr(event, "error_code", None) + if err_code: + return str(err_code) + if getattr(event, "error_message", None): + return "error" + return None diff --git a/trpc_agent_sdk/telemetry/_metrics.py b/trpc_agent_sdk/telemetry/_metrics.py new file mode 100644 index 0000000..a2d7a80 --- /dev/null +++ b/trpc_agent_sdk/telemetry/_metrics.py @@ -0,0 +1,218 @@ +# Tencent is pleased to support the open source community by making tRPC-Agent-Python available. +# +# Copyright (C) 2026 Tencent. All rights reserved. +# +# tRPC-Agent-Python is licensed under Apache-2.0. +"""OTel-native ``gen_ai.*`` metrics for the TRPC Agent framework. + +Mirrors :mod:`trpc_agent_sdk.telemetry._trace`: module-level instruments plus +``report_*`` free functions. Backends fan out via the installed +``MeterProvider`` and route by ``gen_ai.operation.name``. +""" + +from __future__ import annotations + +from typing import Any +from typing import Mapping +from typing import Optional + +from opentelemetry import metrics + +from trpc_agent_sdk.context import InvocationContext +from trpc_agent_sdk.models import LlmRequest +from trpc_agent_sdk.models import LlmResponse +from trpc_agent_sdk.tools import BaseTool + +_meter = metrics.get_meter("trpc.python.agent") + +_request_cnt = _meter.create_counter( + name="gen_ai.request_cnt", + description="Number of gen_ai operations.", + unit="{request}", +) +_operation_duration = _meter.create_histogram( + name="gen_ai.client.operation.duration", + description="End-to-end wall-clock time of a gen_ai operation.", + unit="s", +) +_time_to_first_token = _meter.create_histogram( + name="gen_ai.server.time_to_first_token", + description="Time to first output token / visible chunk.", + unit="s", +) +_usage_input_tokens = _meter.create_histogram( + name="gen_ai.usage.input_tokens", + description="Prompt tokens consumed by a gen_ai operation.", + unit="{token}", +) +_usage_output_tokens = _meter.create_histogram( + name="gen_ai.usage.output_tokens", + description="Completion tokens produced by a gen_ai operation.", + unit="{token}", +) + +# OTel GenAI semconv attribute keys. +_ATTR_OPERATION_NAME = "gen_ai.operation.name" +_ATTR_SYSTEM = "gen_ai.system" +_ATTR_APP_NAME = "gen_ai.app.name" +_ATTR_USER_ID = "gen_ai.user.id" +_ATTR_AGENT_ID = "gen_ai.agent.id" +_ATTR_AGENT_NAME = "gen_ai.agent.name" +_ATTR_REQUEST_MODEL = "gen_ai.request.model" +_ATTR_RESPONSE_MODEL = "gen_ai.response.model" +_ATTR_IS_STREAM = "gen_ai.is_stream" +_ATTR_TOOL_NAME = "gen_ai.tool.name" +_ATTR_ERROR_TYPE = "error.type" +_ATTR_RESPONSE_ERROR_CODE = "gen_ai.response.error_code" + +_OP_CHAT = "chat" +_OP_EXECUTE_TOOL = "execute_tool" +_OP_INVOKE_AGENT = "invoke_agent" + + +def _merge_extras( + base: dict[str, Any], + extras: Optional[Mapping[str, Any]], +) -> dict[str, Any]: + if not extras: + return base + out = dict(base) + for k, v in extras.items(): + if v is None: + continue + out[k] = v + return out + + +def _infer_system(model: str) -> str: + """Map a model name to a ``gen_ai.system`` value; empty string if unknown.""" + if not model: + return "" + m = model.lower() + if m.startswith(("gpt", "o1", "text-embedding")): + return "openai" + if m.startswith("claude"): + return "anthropic" + if m.startswith("gemini"): + return "gcp.gemini" + if m.startswith("hunyuan"): + return "hunyuan" + if m.startswith("taiji"): + return "taiji" + return "" + + +def _agent_model_name(agent: Any) -> str: + """Best-effort model name from an agent; "" if not statically reachable.""" + model = getattr(agent, "model", None) + if isinstance(model, str): + return model + name = getattr(model, "name", None) + if isinstance(name, str): + return name + return "" + + +def report_call_llm( + invocation_context: InvocationContext, + llm_request: LlmRequest, + llm_response: Optional[LlmResponse], + *, + duration_s: float, + ttft_s: float, + is_stream: bool, + error_type: Optional[str] = None, + extra_attributes: Optional[Mapping[str, Any]] = None, +) -> None: + """Record one LLM call. Token histograms are skipped without ``usage_metadata``.""" + request_model = getattr(llm_request, "model", "") or "" + response_model = "" + response_error_code = "" + if llm_response is not None: + response_model = getattr(llm_response, "model", "") or request_model + response_error_code = getattr(llm_response, "error_code", "") or "" + + attrs = { + _ATTR_OPERATION_NAME: _OP_CHAT, + _ATTR_SYSTEM: _infer_system(request_model), + _ATTR_APP_NAME: invocation_context.app_name, + _ATTR_USER_ID: invocation_context.user_id, + _ATTR_AGENT_ID: invocation_context.agent_name, + _ATTR_AGENT_NAME: invocation_context.agent_name, + _ATTR_REQUEST_MODEL: request_model, + _ATTR_RESPONSE_MODEL: response_model, + _ATTR_IS_STREAM: is_stream, + _ATTR_ERROR_TYPE: error_type or "", + _ATTR_RESPONSE_ERROR_CODE: response_error_code, + } + attrs = _merge_extras(attrs, extra_attributes) + + _request_cnt.add(1, attrs) + _operation_duration.record(duration_s, attrs) + _time_to_first_token.record(ttft_s, attrs) + + if llm_response is not None and llm_response.usage_metadata is not None: + usage = llm_response.usage_metadata + prompt = getattr(usage, "prompt_token_count", None) or 0 + total = getattr(usage, "total_token_count", None) or 0 + if prompt and total: + _usage_input_tokens.record(prompt, attrs) + _usage_output_tokens.record(max(total - prompt, 0), attrs) + + +def report_execute_tool( + invocation_context: InvocationContext, + tool: BaseTool, + *, + duration_s: float, + error_type: Optional[str] = None, + extra_attributes: Optional[Mapping[str, Any]] = None, +) -> None: + """Record one tool invocation.""" + attrs = { + _ATTR_OPERATION_NAME: _OP_EXECUTE_TOOL, + _ATTR_SYSTEM: _infer_system(_agent_model_name(invocation_context.agent)), + _ATTR_APP_NAME: invocation_context.app_name, + _ATTR_USER_ID: invocation_context.user_id, + _ATTR_AGENT_ID: invocation_context.agent_name, + _ATTR_AGENT_NAME: invocation_context.agent_name, + _ATTR_TOOL_NAME: tool.name, + _ATTR_ERROR_TYPE: error_type or "", + } + attrs = _merge_extras(attrs, extra_attributes) + + _request_cnt.add(1, attrs) + _operation_duration.record(duration_s, attrs) + + +def report_invoke_agent( + invocation_context: InvocationContext, + *, + duration_s: float, + ttft_s: float, + input_tokens: int, + output_tokens: int, + is_stream: bool, + error_type: Optional[str] = None, + extra_attributes: Optional[Mapping[str, Any]] = None, +) -> None: + """Record one agent run; token counts are aggregated from child LLM calls.""" + attrs = { + _ATTR_OPERATION_NAME: _OP_INVOKE_AGENT, + _ATTR_SYSTEM: _infer_system(_agent_model_name(invocation_context.agent)), + _ATTR_APP_NAME: invocation_context.app_name, + _ATTR_USER_ID: invocation_context.user_id, + _ATTR_AGENT_ID: invocation_context.agent_name, + _ATTR_AGENT_NAME: invocation_context.agent_name, + _ATTR_IS_STREAM: is_stream, + _ATTR_ERROR_TYPE: error_type or "", + } + attrs = _merge_extras(attrs, extra_attributes) + + _request_cnt.add(1, attrs) + _operation_duration.record(duration_s, attrs) + _time_to_first_token.record(ttft_s, attrs) + if input_tokens: + _usage_input_tokens.record(input_tokens, attrs) + if output_tokens: + _usage_output_tokens.record(output_tokens, attrs)