Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/opentelemetry-instrumentation-alephalpha/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ python = ">=3.9,<4"
opentelemetry-api = "^1.27.0"
opentelemetry-instrumentation = "^0.48b0"
opentelemetry-semantic-conventions = "^0.48b0"
opentelemetry-semantic-conventions-ai = "0.4.1"
opentelemetry-semantic-conventions-ai = "0.4.2"

[tool.poetry.group.dev.dependencies]
autopep8 = "^2.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
build_from_streaming_response,
)
from opentelemetry.instrumentation.anthropic.utils import (
acount_prompt_tokens_from_request,
dont_throw,
error_metrics_attributes,
count_prompt_tokens_from_request,
set_span_attribute,
shared_metrics_attributes,
should_send_prompts,
Expand Down Expand Up @@ -59,6 +61,18 @@
"method": "stream",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.resources.beta.prompt_caching.messages",
"object": "Messages",
"method": "create",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.resources.beta.prompt_caching.messages",
"object": "Messages",
"method": "stream",
"span_name": "anthropic.chat",
},
]
WRAPPED_AMETHODS = [
{
Expand All @@ -79,6 +93,18 @@
"method": "stream",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.resources.beta.prompt_caching.messages",
"object": "AsyncMessages",
"method": "create",
"span_name": "anthropic.chat",
},
{
"package": "anthropic.resources.beta.prompt_caching.messages",
"object": "AsyncMessages",
"method": "stream",
"span_name": "anthropic.chat",
},
]


Expand All @@ -102,6 +128,11 @@ async def _dump_content(message_index, content, span):
if isinstance(content, str):
return content
elif isinstance(content, list):
# If the content is a list of text blocks, concatenate them.
# This is more commonly used in prompt caching.
if all([item.get("type") == "text" for item in content]):
return "".join([item.get("text") for item in content])

content = [
(
await _process_image_item(
Expand Down Expand Up @@ -220,27 +251,23 @@ async def _aset_token_usage(
if usage := response.get("usage"):
prompt_tokens = usage.input_tokens
else:
prompt_tokens = 0
if hasattr(anthropic, "count_tokens"):
if request.get("prompt"):
prompt_tokens = await anthropic.count_tokens(request.get("prompt"))
elif request.get("messages"):
prompt_tokens = 0
for m in request.get("messages"):
content = m.get("content")
if isinstance(content, str):
prompt_tokens += await anthropic.count_tokens(content)
elif isinstance(content, list):
for item in content:
# TODO: handle image tokens
if isinstance(item, dict) and item.get("type") == "text":
prompt_tokens += await anthropic.count_tokens(
item.get("text", "")
)

if token_histogram and type(prompt_tokens) is int and prompt_tokens >= 0:
prompt_tokens = await acount_prompt_tokens_from_request(anthropic, request)

if usage := response.get("usage"):
cache_read_tokens = dict(usage).get("cache_read_input_tokens", 0)
else:
cache_read_tokens = 0

if usage := response.get("usage"):
cache_creation_tokens = dict(usage).get("cache_creation_input_tokens", 0)
else:
cache_creation_tokens = 0

input_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens

if token_histogram and type(input_tokens) is int and input_tokens >= 0:
token_histogram.record(
prompt_tokens,
input_tokens,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to discussing whether this must be input tokens (i.e. all tokens sent by the user) or prompt tokens (i.e. the new tokens that anthropic has neither written to nor read from cache)

attributes={
**metric_attributes,
SpanAttributes.LLM_TOKEN_TYPE: "input",
Expand Down Expand Up @@ -268,7 +295,7 @@ async def _aset_token_usage(
},
)

total_tokens = prompt_tokens + completion_tokens
total_tokens = input_tokens + completion_tokens

choices = 0
if type(response.get("content")) is list:
Expand All @@ -285,12 +312,19 @@ async def _aset_token_usage(
},
)

set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, prompt_tokens)
set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, input_tokens)
set_span_attribute(
span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens
)
set_span_attribute(span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens)

set_span_attribute(
span, SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS, cache_read_tokens
)
set_span_attribute(
span, SpanAttributes.LLM_USAGE_CACHE_CREATION_INPUT_TOKENS, cache_creation_tokens
)


@dont_throw
def _set_token_usage(
Expand All @@ -308,27 +342,23 @@ def _set_token_usage(
if usage := response.get("usage"):
prompt_tokens = usage.input_tokens
else:
prompt_tokens = 0
if hasattr(anthropic, "count_tokens"):
if request.get("prompt"):
prompt_tokens = anthropic.count_tokens(request.get("prompt"))
elif request.get("messages"):
prompt_tokens = 0
for m in request.get("messages"):
content = m.get("content")
if isinstance(content, str):
prompt_tokens += anthropic.count_tokens(content)
elif isinstance(content, list):
for item in content:
# TODO: handle image tokens
if isinstance(item, dict) and item.get("type") == "text":
prompt_tokens += anthropic.count_tokens(
item.get("text", "")
)

if token_histogram and type(prompt_tokens) is int and prompt_tokens >= 0:
prompt_tokens = count_prompt_tokens_from_request(anthropic, request)

if usage := response.get("usage"):
cache_read_tokens = dict(usage).get("cache_read_input_tokens", 0)
else:
cache_read_tokens = 0

if usage := response.get("usage"):
cache_creation_tokens = dict(usage).get("cache_creation_input_tokens", 0)
else:
cache_creation_tokens = 0

input_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't you double-calculating the number of input tokens like this? From my understanding cache_read_tokens+cache_creation_tokens should be exactly the number of tokens in the input. Or is it the case that either prompt_tokens is set (for non-cached requests) OR cache_read_tokens+cache_creation_tokens?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, Anthropic fills all three in. If it is a cache write, cache_read_tokens == 0. If it as a cache read, cache_creation_tokens == 0. prompt_tokens are tokens from uncached parts of the messages. For cache writes and reads is always about 3-4. I am assuming this is some control tokens or stop sequences or something. You can see some numbers I hard-coded in the tests here.

For example, if I send two text blocks of sizes 1200 and 100 tokens in one message and only direct Anthropic to cache the first one, the usage will be:

  • {"cache_read_input_tokens": 0, "cache_creation_input_tokens": 1200, "input_tokens": 104, "output_tokens": ...} for the first call, and
  • {"cache_read_input_tokens": 1200, "cache_creation_input_tokens": 0, "input_tokens": 104, "output_tokens": ...} for the second

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is that we should keep the input_tokens constant across providers - so it should always be the number of tokens in the input - regardless if some of them were cached and some weren't

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(which is what I think you did - right?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, except for one place where I accidentally added just uncached tokens. Fixed in the last commit now.


if token_histogram and type(input_tokens) is int and input_tokens >= 0:
token_histogram.record(
prompt_tokens,
input_tokens,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

attributes={
**metric_attributes,
SpanAttributes.LLM_TOKEN_TYPE: "input",
Expand All @@ -354,7 +384,7 @@ def _set_token_usage(
},
)

total_tokens = prompt_tokens + completion_tokens
total_tokens = input_tokens + completion_tokens

choices = 0
if type(response.get("content")) is list:
Expand All @@ -371,12 +401,19 @@ def _set_token_usage(
},
)

set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, prompt_tokens)
set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, input_tokens)
set_span_attribute(
span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens
)
set_span_attribute(span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens)

set_span_attribute(
span, SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS, cache_read_tokens
)
set_span_attribute(
span, SpanAttributes.LLM_USAGE_CACHE_CREATION_INPUT_TOKENS, cache_creation_tokens
)


@dont_throw
def _set_response_attributes(span, response):
Expand Down Expand Up @@ -687,7 +724,7 @@ def __init__(
get_common_metrics_attributes: Callable[[], dict] = lambda: {},
upload_base64_image: Optional[
Callable[[str, str, str, str], Coroutine[None, None, str]]
] = lambda *args: "",
] = None,
):
super().__init__()
Config.exception_logger = exception_logger
Expand Down Expand Up @@ -771,8 +808,9 @@ def _uninstrument(self, **kwargs):
wrapped_method.get("method"),
)
for wrapped_method in WRAPPED_AMETHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")
unwrap(
f"anthropic.resources.completions.{wrap_object}",
f"{wrap_package}.{wrap_object}",
wrapped_method.get("method"),
)
Loading