diff --git a/1.png b/1.png new file mode 100644 index 0000000000..26e10d1050 Binary files /dev/null and b/1.png differ diff --git a/2.png b/2.png new file mode 100644 index 0000000000..e115576916 Binary files /dev/null and b/2.png differ diff --git a/3.png b/3.png new file mode 100644 index 0000000000..a1a40fc420 Binary files /dev/null and b/3.png differ diff --git a/CHANGELOG.md b/CHANGELOG.md index 2337bac404..7fa8a5a463 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## Unreleased + +### Fix + +- **openai**: use monotonic timing for streaming spans, redact headers, add prompt hash + receipt attributes + ## v0.47.3 (2025-09-21) ### Fix diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py index 904b0b38d2..fe288856dc 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py @@ -26,6 +26,30 @@ logger = logging.getLogger(__name__) +SENSITIVE_HEADER_KEYS = {"authorization", "proxy-authorization"} + + +def _scrub_headers(headers): + if not headers: + return None + + try: + items = headers.items() + except AttributeError: + return None + + cleaned = {} + for key, value in items: + if key is None: + continue + lowered = key.lower() + if lowered in SENSITIVE_HEADER_KEYS: + cleaned[key] = "" + else: + cleaned[key] = value + + return cleaned if cleaned else None + def _set_span_attribute(span, name, value): if value is None or value == "": @@ -132,12 +156,23 @@ def _set_request_attributes(span, kwargs, instance=None): span, SpanAttributes.LLM_PRESENCE_PENALTY, kwargs.get("presence_penalty") ) _set_span_attribute(span, SpanAttributes.LLM_USER, kwargs.get("user")) - _set_span_attribute(span, SpanAttributes.LLM_HEADERS, str(kwargs.get("headers"))) - # The new OpenAI SDK removed the `headers` and create new field called `extra_headers` - if kwargs.get("extra_headers") is not None: + + scrubbed_headers = _scrub_headers(kwargs.get("headers")) + extra_headers = _scrub_headers(kwargs.get("extra_headers")) + headers_payload = {} + if scrubbed_headers: + headers_payload.update(scrubbed_headers) + if extra_headers: + headers_payload.update(extra_headers) + + if headers_payload: _set_span_attribute( - span, SpanAttributes.LLM_HEADERS, str(kwargs.get("extra_headers")) + span, + SpanAttributes.LLM_HEADERS, + json.dumps(headers_payload, sort_keys=True), ) + if kwargs.get("seed") is not None: + _set_span_attribute(span, SpanAttributes.LLM_REPRODUCIBLE_RUN, True) _set_span_attribute( span, SpanAttributes.LLM_IS_STREAMING, kwargs.get("stream") or False ) @@ -203,7 +238,9 @@ def _set_response_attributes(span, response): if response_model: response_model = _extract_model_name_from_provider_format(response_model) _set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response_model) - _set_span_attribute(span, GEN_AI_RESPONSE_ID, response.get("id")) + response_id = response.get("id") + _set_span_attribute(span, GEN_AI_RESPONSE_ID, response_id) + _set_span_attribute(span, SpanAttributes.LLM_RECEIPT_ID, response_id) _set_span_attribute( span, diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py index 0b8c9d6b84..8af50017c4 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py @@ -1,8 +1,9 @@ import copy +import hashlib import json import logging import threading -import time +from time import perf_counter from functools import singledispatch from typing import List, Optional, Union @@ -61,6 +62,19 @@ logger = logging.getLogger(__name__) +PROMPT_PREVIEW_LIMIT = 128 + + +def _monotonic_now() -> float: + return perf_counter() + + +def _prompt_preview_and_hash(content: str) -> tuple[str, str]: + encoded = content.encode("utf-8") + digest = hashlib.sha256(encoded).hexdigest() + return content[:PROMPT_PREVIEW_LIMIT], digest + + @_with_chat_telemetry_wrapper def chat_wrapper( tracer: Tracer, @@ -91,11 +105,11 @@ def chat_wrapper( with trace.use_span(span, end_on_exit=False): run_async(_handle_request(span, kwargs, instance)) try: - start_time = time.time() + start_time = _monotonic_now() response = wrapped(*args, **kwargs) - end_time = time.time() + end_time = _monotonic_now() except Exception as e: # pylint: disable=broad-except - end_time = time.time() + end_time = _monotonic_now() duration = end_time - start_time if "start_time" in locals() else 0 attributes = { @@ -190,11 +204,11 @@ async def achat_wrapper( await _handle_request(span, kwargs, instance) try: - start_time = time.time() + start_time = _monotonic_now() response = await wrapped(*args, **kwargs) - end_time = time.time() + end_time = _monotonic_now() except Exception as e: # pylint: disable=broad-except - end_time = time.time() + end_time = _monotonic_now() duration = end_time - start_time if "start_time" in locals() else 0 common_attributes = Config.get_common_metrics_attributes() @@ -448,7 +462,7 @@ async def _set_prompts(span, messages): if msg.get("content"): content = copy.deepcopy(msg.get("content")) if isinstance(content, list): - content = [ + processed_content = [ ( await _process_image_item( item, span.context.trace_id, span.context.span_id, i, j @@ -458,9 +472,19 @@ async def _set_prompts(span, messages): ) for j, item in enumerate(content) ] + serialized_content = json.dumps(processed_content) + elif isinstance(content, dict): + serialized_content = json.dumps(content) + else: + serialized_content = str(content) - content = json.dumps(content) - _set_span_attribute(span, f"{prefix}.content", content) + preview, digest = _prompt_preview_and_hash(serialized_content) + _set_span_attribute(span, f"{prefix}.content", preview) + _set_span_attribute( + span, + f"{prefix}.{SpanAttributes.LLM_CONTENT_HASH_ATTRIBUTE}", + digest, + ) if msg.get("tool_call_id"): _set_span_attribute( span, f"{prefix}.tool_call_id", msg.get("tool_call_id")) @@ -510,11 +534,11 @@ def _set_completions(span, choices): _set_span_attribute(span, f"{prefix}.role", "assistant") _set_span_attribute(span, f"{prefix}.content", "FILTERED") - return + continue message = choice.get("message") if not message: - return + continue _set_span_attribute(span, f"{prefix}.role", message.get("role")) @@ -714,7 +738,7 @@ def _process_item(self, item): name=f"{SpanAttributes.LLM_CONTENT_COMPLETION_CHUNK}") if self._first_token and self._streaming_time_to_first_token: - self._time_of_first_token = time.time() + self._time_of_first_token = _monotonic_now() self._streaming_time_to_first_token.record( self._time_of_first_token - self._start_time, attributes=self._shared_attributes(), @@ -753,7 +777,7 @@ def _process_complete_response(self): # duration metrics if self._start_time and isinstance(self._start_time, (float, int)): - duration = time.time() - self._start_time + duration = _monotonic_now() - self._start_time else: duration = None if duration and isinstance(duration, (float, int)) and self._duration_histogram: @@ -762,7 +786,7 @@ def _process_complete_response(self): ) if self._streaming_time_to_generate and self._time_of_first_token: self._streaming_time_to_generate.record( - time.time() - self._time_of_first_token, + _monotonic_now() - self._time_of_first_token, attributes=self._shared_attributes(), ) @@ -822,7 +846,7 @@ def _record_partial_metrics(self): """Record metrics based on available partial data""" # Always record duration if we have start time if self._start_time and isinstance(self._start_time, (float, int)) and self._duration_histogram: - duration = time.time() - self._start_time + duration = _monotonic_now() - self._start_time self._duration_histogram.record( duration, attributes=self._shared_attributes() ) @@ -877,7 +901,7 @@ def _build_from_streaming_response( item_to_yield = item if first_token and streaming_time_to_first_token: - time_of_first_token = time.time() + time_of_first_token = _monotonic_now() streaming_time_to_first_token.record( time_of_first_token - start_time) first_token = False @@ -904,13 +928,13 @@ def _build_from_streaming_response( # duration metrics if start_time and isinstance(start_time, (float, int)): - duration = time.time() - start_time + duration = _monotonic_now() - start_time else: duration = None if duration and isinstance(duration, (float, int)) and duration_histogram: duration_histogram.record(duration, attributes=shared_attributes) if streaming_time_to_generate and time_of_first_token: - streaming_time_to_generate.record(time.time() - time_of_first_token) + streaming_time_to_generate.record(_monotonic_now() - time_of_first_token) _set_response_attributes(span, complete_response) if should_emit_events(): @@ -948,7 +972,7 @@ async def _abuild_from_streaming_response( item_to_yield = item if first_token and streaming_time_to_first_token: - time_of_first_token = time.time() + time_of_first_token = _monotonic_now() streaming_time_to_first_token.record( time_of_first_token - start_time) first_token = False @@ -975,13 +999,13 @@ async def _abuild_from_streaming_response( # duration metrics if start_time and isinstance(start_time, (float, int)): - duration = time.time() - start_time + duration = _monotonic_now() - start_time else: duration = None if duration and isinstance(duration, (float, int)) and duration_histogram: duration_histogram.record(duration, attributes=shared_attributes) if streaming_time_to_generate and time_of_first_token: - streaming_time_to_generate.record(time.time() - time_of_first_token) + streaming_time_to_generate.record(_monotonic_now() - time_of_first_token) _set_response_attributes(span, complete_response) if should_emit_events(): @@ -1125,8 +1149,12 @@ def _accumulate_stream_items(item, complete_response): if is_openai_v1(): item = model_as_dict(item) - complete_response["model"] = item.get("model") - complete_response["id"] = item.get("id") + model = item.get("model") + if model: + complete_response["model"] = model + response_id = item.get("id") + if response_id: + complete_response["id"] = response_id # capture usage information from the last stream chunks if item.get("usage"): @@ -1157,7 +1185,15 @@ def _accumulate_stream_items(item, complete_response): delta = choice.get("delta") if delta and delta.get("content"): - complete_choice["message"]["content"] += delta.get("content") + chunk = delta.get("content") + if isinstance(chunk, list): + chunk_text = "".join( + part.get("text", "") if isinstance(part, dict) else str(part) + for part in chunk + ) + else: + chunk_text = chunk + complete_choice["message"]["content"] += chunk_text if delta and delta.get("role"): complete_choice["message"]["role"] = delta.get("role") diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py index 254608068c..ba18e64bb2 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py @@ -1,3 +1,4 @@ +import hashlib import logging from opentelemetry import context as context_api @@ -41,6 +42,14 @@ logger = logging.getLogger(__name__) +PROMPT_PREVIEW_LIMIT = 128 + + +def _prompt_preview_and_hash(content: str) -> tuple[str, str]: + encoded = content.encode("utf-8") + digest = hashlib.sha256(encoded).hexdigest() + return content[:PROMPT_PREVIEW_LIMIT], digest + @_with_tracer_wrapper def completion_wrapper(tracer, wrapped, instance, args, kwargs): @@ -158,11 +167,25 @@ def _set_prompts(span, prompt): if not span.is_recording() or not prompt: return - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.0.user", - prompt[0] if isinstance(prompt, list) else prompt, - ) + if isinstance(prompt, list): + for idx, value in enumerate(prompt): + preview, digest = _prompt_preview_and_hash(str(value)) + prefix = f"{SpanAttributes.LLM_PROMPTS}.{idx}" + _set_span_attribute(span, f"{prefix}.user", preview) + _set_span_attribute( + span, + f"{prefix}.{SpanAttributes.LLM_CONTENT_HASH_ATTRIBUTE}", + digest, + ) + else: + preview, digest = _prompt_preview_and_hash(str(prompt)) + prefix = f"{SpanAttributes.LLM_PROMPTS}.0" + _set_span_attribute(span, f"{prefix}.user", preview) + _set_span_attribute( + span, + f"{prefix}.{SpanAttributes.LLM_CONTENT_HASH_ATTRIBUTE}", + digest, + ) @dont_throw @@ -254,8 +277,12 @@ def _accumulate_streaming_response(complete_response, item): if is_openai_v1(): item = model_as_dict(item) - complete_response["model"] = item.get("model") - complete_response["id"] = item.get("id") + model = item.get("model") + if model: + complete_response["model"] = model + response_id = item.get("id") + if response_id: + complete_response["id"] = response_id # capture usage information from the stream chunks if item.get("usage"): diff --git a/packages/opentelemetry-instrumentation-openai/tests/traces/test_chat_wrappers_unit.py b/packages/opentelemetry-instrumentation-openai/tests/traces/test_chat_wrappers_unit.py new file mode 100644 index 0000000000..593d720c4f --- /dev/null +++ b/packages/opentelemetry-instrumentation-openai/tests/traces/test_chat_wrappers_unit.py @@ -0,0 +1,148 @@ +import hashlib +import json +from typing import Any, Dict + +import pytest + +from opentelemetry.semconv_ai import SpanAttributes + +import opentelemetry.instrumentation.openai.shared as shared +from opentelemetry.instrumentation.openai.shared.chat_wrappers import ( + _accumulate_stream_items, + _set_prompts as set_chat_prompts, +) +from opentelemetry.instrumentation.openai.shared.completion_wrappers import ( + _accumulate_streaming_response, + _set_prompts as set_completion_prompts, +) + + +class RecordingSpan: + def __init__(self) -> None: + self.attributes: Dict[str, Any] = {} + + def set_attribute(self, key: str, value: Any) -> None: + self.attributes[key] = value + + def is_recording(self) -> bool: + return True + +@pytest.fixture(autouse=True) +def reset_shared_helpers(monkeypatch): + monkeypatch.setattr(shared, "_set_api_attributes", lambda *args, **kwargs: None) + monkeypatch.setattr(shared, "_get_openai_base_url", lambda instance=None: "") + monkeypatch.setattr(shared, "_get_vendor_from_url", lambda base_url: "openai") + monkeypatch.setattr(shared, "_cross_region_check", lambda value: value) + monkeypatch.setattr(shared, "_extract_model_name_from_provider_format", lambda value: value) + monkeypatch.setattr( + "opentelemetry.instrumentation.openai.shared.is_openai_v1", lambda: False + ) + yield + + +def test_accumulate_stream_items_preserves_model(monkeypatch): + monkeypatch.setattr( + "opentelemetry.instrumentation.openai.shared.chat_wrappers.is_openai_v1", + lambda: False, + ) + + complete = {"choices": [], "model": "", "id": ""} + first = { + "model": "gpt-4o", + "id": "resp-123", + "usage": None, + "choices": [ + {"index": 0, "delta": {"content": "Hello"}, "content_filter_results": None}, + ], + } + second = { + "choices": [ + {"index": 0, "delta": {"content": " there"}, "content_filter_results": None}, + ], + } + + _accumulate_stream_items(first, complete) + _accumulate_stream_items(second, complete) + + assert complete["model"] == "gpt-4o" + assert complete["id"] == "resp-123" + assert complete["choices"][0]["message"]["content"] == "Hello there" + + +@pytest.mark.asyncio +async def test_set_prompts_truncates_and_hashes(): + span = RecordingSpan() + long_prompt = "A" * 200 + + await set_chat_prompts(span, [{"role": "user", "content": long_prompt}]) + + preview = span.attributes[f"{SpanAttributes.LLM_PROMPTS}.0.content"] + digest = span.attributes[ + f"{SpanAttributes.LLM_PROMPTS}.0.{SpanAttributes.LLM_CONTENT_HASH_ATTRIBUTE}" + ] + + assert len(preview) == 128 + assert preview == long_prompt[:128] + assert digest == hashlib.sha256(long_prompt.encode("utf-8")).hexdigest() + + +def test_set_completion_prompts_hashes_each_prompt(): + span = RecordingSpan() + prompts = ["foo", "bar"] + + set_completion_prompts(span, prompts) + + first_hash = span.attributes[ + f"{SpanAttributes.LLM_PROMPTS}.0.{SpanAttributes.LLM_CONTENT_HASH_ATTRIBUTE}" + ] + second_hash = span.attributes[ + f"{SpanAttributes.LLM_PROMPTS}.1.{SpanAttributes.LLM_CONTENT_HASH_ATTRIBUTE}" + ] + + assert first_hash == hashlib.sha256("foo".encode("utf-8")).hexdigest() + assert second_hash == hashlib.sha256("bar".encode("utf-8")).hexdigest() + + +def test_request_headers_are_scrubbed_and_sorted(): + span = RecordingSpan() + kwargs = { + "model": "gpt-4", + "headers": {"Authorization": "Bearer 123", "X-Request-Id": "abc"}, + "seed": 42, + } + + shared._set_request_attributes(span, kwargs) + + headers_json = span.attributes[SpanAttributes.LLM_HEADERS] + headers = json.loads(headers_json) + + assert headers["Authorization"] == "" + assert headers["X-Request-Id"] == "abc" + assert span.attributes[SpanAttributes.LLM_REPRODUCIBLE_RUN] is True + + +def test_response_receipt_id_is_set(): + span = RecordingSpan() + response = {"id": "resp-456", "model": "gpt-4-turbo"} + + shared._set_response_attributes(span, response) + + assert span.attributes[SpanAttributes.LLM_RECEIPT_ID] == "resp-456" + assert span.attributes[SpanAttributes.LLM_RESPONSE_MODEL] == "gpt-4-turbo" + + +def test_accumulate_streaming_response_preserves_model(): + complete = {"choices": [], "model": "", "id": ""} + first = { + "model": "text-davinci", + "id": "resp-789", + "choices": [{"index": 0, "text": "Hi"}], + } + second = {"choices": [{"index": 0, "text": "!*"}]} + + _accumulate_streaming_response(complete, first) + _accumulate_streaming_response(complete, second) + + assert complete["model"] == "text-davinci" + assert complete["id"] == "resp-789" + assert complete["choices"][0]["text"] == "Hi!*" diff --git a/packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py b/packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py index a080ef2d90..3ca6cd9b44 100644 --- a/packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py +++ b/packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py @@ -92,6 +92,9 @@ class SpanAttributes: LLM_USAGE_TOKEN_TYPE = "llm.usage.token_type" LLM_USER = "llm.user" LLM_HEADERS = "llm.headers" + LLM_RECEIPT_ID = "llm.receipt_id" + LLM_REPRODUCIBLE_RUN = "llm.reproducible_run" + LLM_CONTENT_HASH_ATTRIBUTE = "content_hash" LLM_TOP_K = "llm.top_k" LLM_IS_STREAMING = "llm.is_streaming" LLM_FREQUENCY_PENALTY = "llm.frequency_penalty"