fix(anthropic): always set streaming token usage from API data (#3949)#3976
fix(anthropic): always set streaming token usage from API data (#3949)#3976carsonjc04 wants to merge 5 commits intotraceloop:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughCentralized token-usage resolution for Anthropic streaming: added Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (1)
246-279: Extract the stream token-resolution branch into one helper.This logic now exists twice, and it has already started to drift (
logger.warning(..., e)vslogger.warning(..., str(e))). Pulling it into a shared helper will make the sync and async paths stay aligned the next time this behavior changes.Proposed refactor sketch
+def _resolve_stream_token_usage(complete_response, instance, kwargs): + usage = complete_response.get("usage") + if usage: + return ( + usage.get("input_tokens", 0) or 0, + usage.get("output_tokens", 0) or 0, + ) + + if not Config.enrich_token_usage: + return None, None + + prompt_tokens = count_prompt_tokens_from_request(instance, kwargs) + completion_content = "".join( + event.get("text", "") + for event in complete_response.get("events", []) + if event.get("text") + ) + completion_tokens = None + if complete_response.get("model") and hasattr(instance, "count_tokens"): + completion_tokens = instance.count_tokens(completion_content) + + return prompt_tokens, completion_tokens + - try: - usage = self._complete_response.get("usage") - prompt_tokens = None - completion_tokens = None - ... + try: + prompt_tokens, completion_tokens = _resolve_stream_token_usage( + self._complete_response, self._instance, self._kwargs + ) if prompt_tokens is not None: _set_token_usage( self._span, self._complete_response, prompt_tokens, completion_tokens or 0,Also applies to: 408-441
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py` around lines 246 - 279, The token-resolution logic duplicated in the streaming path should be extracted into a single helper (e.g., a method like _resolve_stream_token_usage or a private module function) that accepts self (or the minimal pieces: self._complete_response, self._instance, self._kwargs) and returns prompt_tokens and completion_tokens (or None/0) so both sync and async locations can call it; move the current branch that checks self._complete_response.get("usage"), falls back to Config.enrich_token_usage with count_prompt_tokens_from_request and self._instance.count_tokens, and the final _set_token_usage call into the helper, and replace the duplicated blocks (the shown block and the one at lines 408–441) with calls to this helper, ensuring exception handling is unified (use logger.warning(..., str(e)) or include the error consistently) and references to symbols like Config.enrich_token_usage, count_prompt_tokens_from_request, _set_token_usage, self._complete_response, and self._instance remain the same.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@packages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py`:
- Around line 1546-1607: Add async counterparts of the two sync tests to cover
AnthropicAsyncStream: duplicate
test_streaming_sets_token_usage_from_api_without_enrich_flag and
test_streaming_skips_token_usage_without_api_data_and_enrich_disabled but use
the async helper (e.g., _make_anthropic_async_stream or the async stream class
AnthropicAsyncStream), patch Config.enrich_token_usage the same way, await the
async completion handler (await stream._handle_completion() or call the
coroutine appropriately), and assert the same span attribute expectations
(GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS / GEN_AI_USAGE_OUTPUT_TOKENS presence
or absence), stream._instrumentation_completed True, and that span.end was
called. Ensure test names reference Async (or AsyncStream) and mirror the
setup/usage blocks from the sync tests (span, span.is_recording, span.end =
MagicMock) so both sync and async paths are covered.
---
Nitpick comments:
In
`@packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py`:
- Around line 246-279: The token-resolution logic duplicated in the streaming
path should be extracted into a single helper (e.g., a method like
_resolve_stream_token_usage or a private module function) that accepts self (or
the minimal pieces: self._complete_response, self._instance, self._kwargs) and
returns prompt_tokens and completion_tokens (or None/0) so both sync and async
locations can call it; move the current branch that checks
self._complete_response.get("usage"), falls back to Config.enrich_token_usage
with count_prompt_tokens_from_request and self._instance.count_tokens, and the
final _set_token_usage call into the helper, and replace the duplicated blocks
(the shown block and the one at lines 408–441) with calls to this helper,
ensuring exception handling is unified (use logger.warning(..., str(e)) or
include the error consistently) and references to symbols like
Config.enrich_token_usage, count_prompt_tokens_from_request, _set_token_usage,
self._complete_response, and self._instance remain the same.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3e1e1713-4299-411f-a360-0b9833856b0a
📒 Files selected for processing (2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.pypackages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py
cf19fd4 to
c999e5f
Compare
…loop#3949) The streaming _handle_completion() and _complete_instrumentation() methods gated all token-usage recording behind `if Config.enrich_token_usage`. Since that flag defaults to False, streaming spans never received gen_ai.usage.input_tokens — even when the API provided real usage data in the SSE stream events. Downstream tools (e.g. Langfuse) would then fall back to tokenizing the raw input content, which includes base64 image data, producing massively inflated token counts (e.g. 1,633 instead of 343). Restructure the logic so that: - API-provided usage is always read and recorded on the span - The enrich_token_usage flag only gates the local estimation fallback (count_prompt_tokens_from_request) for when usage is absent from the response This aligns the streaming path with the non-streaming create() path, which already sets token attributes unconditionally. Closes traceloop#3949 Made-with: Cursor
There was a problem hiding this comment.
♻️ Duplicate comments (1)
packages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py (1)
1546-1607:⚠️ Potential issue | 🟡 MinorAdd the same regression coverage for
AnthropicAsyncStream.These tests only exercise
AnthropicStream, but this PR changed the mirrored async branch inAnthropicAsyncStream._complete_instrumentationtoo. Please add async equivalents for both cases so the two paths cannot drift again.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py` around lines 1546 - 1607, Add two async test functions mirroring test_streaming_sets_token_usage_from_api_without_enrich_flag and test_streaming_skips_token_usage_without_api_data_and_enrich_disabled but targeting AnthropicAsyncStream and its async completion method: create an AnthropicAsyncStream via _make_anthropic_stream(span) (or the async equivalent), set stream._complete_response the same way, patch opentelemetry.instrumentation.anthropic.streaming.Config to set enrich_token_usage=False, then await stream._complete_instrumentation() (or the async handler used by AnthropicAsyncStream) and assert the same attributes/behavior (GEN_AI_USAGE_INPUT_TOKENS and GEN_AI_USAGE_OUTPUT_TOKENS present for the API usage case, absent for the no-usage case, stream._instrumentation_completed True, and span.end called once). Mark the tests with pytest.mark.asyncio so they run as async tests and use the same unique symbols: AnthropicAsyncStream and _complete_instrumentation.
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (1)
246-279: Extract the token-usage resolution into one helper.These sync and async branches are now effectively copy-pasted, and they already diverge slightly in the warning call. Pulling the API-usage/fallback logic into a shared helper would make future token-accounting fixes much less likely to land in only one path.
Also applies to: 408-441
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py` around lines 246 - 279, The token-usage calculation and fallback logic (reading self._complete_response["usage"], falling back to Config.enrich_token_usage with count_prompt_tokens_from_request and self._instance.count_tokens, then calling _set_token_usage) is duplicated between the sync and async branches; extract this into a single helper method (e.g., _resolve_and_set_token_usage or similar) that accepts self, metric_attributes and uses self._complete_response, self._instance, _set_token_usage, self._span, self._token_histogram and self._choice_counter to perform the same logic and logging, then replace both inline blocks (the current block and the matching block around lines 408-441) with calls to that helper so both paths share identical behavior and warning handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In
`@packages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py`:
- Around line 1546-1607: Add two async test functions mirroring
test_streaming_sets_token_usage_from_api_without_enrich_flag and
test_streaming_skips_token_usage_without_api_data_and_enrich_disabled but
targeting AnthropicAsyncStream and its async completion method: create an
AnthropicAsyncStream via _make_anthropic_stream(span) (or the async equivalent),
set stream._complete_response the same way, patch
opentelemetry.instrumentation.anthropic.streaming.Config to set
enrich_token_usage=False, then await stream._complete_instrumentation() (or the
async handler used by AnthropicAsyncStream) and assert the same
attributes/behavior (GEN_AI_USAGE_INPUT_TOKENS and GEN_AI_USAGE_OUTPUT_TOKENS
present for the API usage case, absent for the no-usage case,
stream._instrumentation_completed True, and span.end called once). Mark the
tests with pytest.mark.asyncio so they run as async tests and use the same
unique symbols: AnthropicAsyncStream and _complete_instrumentation.
---
Nitpick comments:
In
`@packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py`:
- Around line 246-279: The token-usage calculation and fallback logic (reading
self._complete_response["usage"], falling back to Config.enrich_token_usage with
count_prompt_tokens_from_request and self._instance.count_tokens, then calling
_set_token_usage) is duplicated between the sync and async branches; extract
this into a single helper method (e.g., _resolve_and_set_token_usage or similar)
that accepts self, metric_attributes and uses self._complete_response,
self._instance, _set_token_usage, self._span, self._token_histogram and
self._choice_counter to perform the same logic and logging, then replace both
inline blocks (the current block and the matching block around lines 408-441)
with calls to that helper so both paths share identical behavior and warning
handling.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 09e7c3e6-31dc-47aa-9a8e-74a2d35107d1
📒 Files selected for processing (2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.pypackages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py
c999e5f to
037af04
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (1)
275-291: Consider extracting shared token-finalization block to avoid sync/async drift.Line 275-Line 291 and Line 419-Line 435 duplicate the same resolve/set/log flow. A tiny shared helper would reduce maintenance risk.
♻️ Suggested refactor
+def _apply_stream_token_usage( + span, + complete_response, + instance, + kwargs, + metric_attributes, + token_histogram, + choice_counter, +): + try: + prompt_tokens, completion_tokens = _resolve_stream_token_usage( + complete_response, instance, kwargs + ) + if prompt_tokens is not None: + _set_token_usage( + span, + complete_response, + prompt_tokens, + completion_tokens or 0, + metric_attributes, + token_histogram, + choice_counter, + ) + except Exception as e: + logger.warning("Failed to set token usage, error: %s", str(e))Then call
_apply_stream_token_usage(...)from both completion methods.Also applies to: 419-435
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py` around lines 275 - 291, Extract the duplicated resolve/set/log logic into a single helper (e.g. _apply_stream_token_usage) that takes the common operands (self._complete_response, self._instance, self._kwargs, self._span, metric_attributes, self._token_histogram, self._choice_counter), performs the _resolve_stream_token_usage call, calls _set_token_usage when prompt_tokens is not None, and wraps everything in the existing try/except that logs failures; then replace the duplicated blocks in both completion paths with a call to this new helper to avoid sync/async drift and reduce maintenance risk.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py`:
- Around line 275-291: Extract the duplicated resolve/set/log logic into a
single helper (e.g. _apply_stream_token_usage) that takes the common operands
(self._complete_response, self._instance, self._kwargs, self._span,
metric_attributes, self._token_histogram, self._choice_counter), performs the
_resolve_stream_token_usage call, calls _set_token_usage when prompt_tokens is
not None, and wraps everything in the existing try/except that logs failures;
then replace the duplicated blocks in both completion paths with a call to this
new helper to avoid sync/async drift and reduce maintenance risk.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c02d7315-7c52-4ce0-8876-f6ebb9491cce
📒 Files selected for processing (2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.pypackages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/opentelemetry-instrumentation-anthropic/tests/test_semconv_span_attrs.py
max-deygin-traceloop
left a comment
There was a problem hiding this comment.
Minor dock string nitpick, otherwise LGTM
…/instrumentation/anthropic/streaming.py Thank you max-deygin-traceloop for the inconsistency callout. Co-authored-by: max-deygin-traceloop <max@traceloop.com>
|
LGTM @carsonjc04 if you don't mind, please sign the CLA and we can merge it |
The streaming _handle_completion() and _complete_instrumentation() methods gated all token-usage recording behind
if Config.enrich_token_usage. Since that flag defaults to False, streaming spans never received gen_ai.usage.input_tokens — even when the API provided real usage data in the SSE stream events.Downstream tools (e.g. Langfuse) would then fall back to tokenizing the raw input content, which includes base64 image data, producing massively inflated token counts (e.g. 1,633 instead of 343).
Restructure the logic so that:
This aligns the streaming path with the non-streaming create() path, which already sets token attributes unconditionally.
Closes #3949
feat(instrumentation): ...orfix(instrumentation): ....Summary by CodeRabbit
Bug Fixes
Tests