Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions examples/tracing/openai/openai_tracing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,40 @@
")"
]
},
{
"cell_type": "markdown",
"id": "09d39983",
"metadata": {},
"source": [
"#### Parse method (Structured Outputs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9a86642c",
"metadata": {},
"outputs": [],
"source": [
"from pydantic import BaseModel\n",
"\n",
"\n",
"class CalendarEvent(BaseModel):\n",
" name: str\n",
" date: str\n",
" participants: list[str]\n",
"\n",
"\n",
"completion = openai_client.chat.completions.parse(\n",
" model=\"gpt-4o-mini\",\n",
" messages=[\n",
" {\"role\": \"system\", \"content\": \"Extract the event information.\"},\n",
" {\"role\": \"user\", \"content\": \"Alice and Bob are going to a science fair on Friday.\"},\n",
" ],\n",
" response_format=CalendarEvent,\n",
")\n"
]
},
{
"cell_type": "markdown",
"id": "4e6fb396",
Expand All @@ -131,7 +165,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "bedrock-test",
"display_name": "base",
"language": "python",
"name": "python3"
},
Expand All @@ -145,7 +179,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
"version": "3.12.7"
}
},
"nbformat": 4,
Expand Down
211 changes: 211 additions & 0 deletions src/openlayer/lib/integrations/async_openai_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
create_trace_args,
add_to_trace,
parse_non_streaming_output_data,
parse_structured_output_data,
# Import Responses API helper functions
extract_responses_chunk_data,
extract_responses_inputs,
Expand Down Expand Up @@ -98,6 +99,33 @@ async def traced_chat_create_func(*args, **kwargs):

client.chat.completions.create = traced_chat_create_func

# Patch parse method if it exists
if hasattr(client.chat.completions, 'parse'):
parse_func = client.chat.completions.parse

@wraps(parse_func)
async def traced_parse_func(*args, **kwargs):
inference_id = kwargs.pop("inference_id", None)
stream = kwargs.get("stream", False)

if stream:
return handle_async_streaming_parse(
*args,
**kwargs,
parse_func=parse_func,
inference_id=inference_id,
is_azure_openai=is_azure_openai,
)
return await handle_async_non_streaming_parse(
*args,
**kwargs,
parse_func=parse_func,
inference_id=inference_id,
is_azure_openai=is_azure_openai,
)

client.chat.completions.parse = traced_parse_func

# Patch Responses API (if available)
if hasattr(client, "responses"):
responses_create_func = client.responses.create
Expand Down Expand Up @@ -466,3 +494,186 @@ async def handle_async_responses_non_streaming_create(
logger.error("Failed to trace the Responses API request with Openlayer. %s", e)

return response


async def handle_async_streaming_parse(
parse_func: callable,
*args,
is_azure_openai: bool = False,
inference_id: Optional[str] = None,
**kwargs,
) -> AsyncIterator[Any]:
"""Handles the parse method when streaming is enabled.

Parameters
----------
parse_func : callable
The parse method to handle.
is_azure_openai : bool, optional
Whether the client is an Azure OpenAI client, by default False
inference_id : Optional[str], optional
A user-generated inference id, by default None

Returns
-------
AsyncIterator[Any]
A generator that yields the chunks of the completion.
"""
chunks = await parse_func(*args, **kwargs)

# Create and return a new async generator that processes chunks
collected_output_data = []
collected_function_call = {
"name": "",
"arguments": "",
}
raw_outputs = []
start_time = time.time()
end_time = None
first_token_time = None
num_of_completion_tokens = None
latency = None
try:
i = 0
async for chunk in chunks:
raw_outputs.append(chunk.model_dump())
if i == 0:
first_token_time = time.time()
if i > 0:
num_of_completion_tokens = i + 1
i += 1

delta = chunk.choices[0].delta

if delta.content:
collected_output_data.append(delta.content)
elif delta.function_call:
if delta.function_call.name:
collected_function_call["name"] += delta.function_call.name
if delta.function_call.arguments:
collected_function_call[
"arguments"
] += delta.function_call.arguments
elif delta.tool_calls:
if delta.tool_calls[0].function.name:
collected_function_call["name"] += delta.tool_calls[0].function.name
if delta.tool_calls[0].function.arguments:
collected_function_call["arguments"] += delta.tool_calls[
0
].function.arguments

yield chunk

end_time = time.time()
latency = (end_time - start_time) * 1000
# pylint: disable=broad-except
except Exception as e:
logger.error("Failed yield chunk. %s", e)
finally:
# Try to add step to the trace
try:
collected_output_data = [
message for message in collected_output_data if message is not None
]
if collected_output_data:
output_data = "".join(collected_output_data)
else:
collected_function_call["arguments"] = json.loads(
collected_function_call["arguments"]
)
output_data = collected_function_call

trace_args = create_trace_args(
end_time=end_time,
inputs={"prompt": kwargs["messages"]},
output=output_data,
latency=latency,
tokens=num_of_completion_tokens,
prompt_tokens=0,
completion_tokens=num_of_completion_tokens,
model=kwargs.get("model"),
model_parameters=get_model_parameters(kwargs),
raw_output=raw_outputs,
id=inference_id,
metadata={
"timeToFirstToken": (
(first_token_time - start_time) * 1000
if first_token_time
else None
),
"method": "parse",
"response_format": kwargs.get("response_format"),
},
)
add_to_trace(
**trace_args,
is_azure_openai=is_azure_openai,
)

# pylint: disable=broad-except
except Exception as e:
logger.error(
"Failed to trace the parse chat completion request with Openlayer. %s",
e,
)


async def handle_async_non_streaming_parse(
parse_func: callable,
*args,
is_azure_openai: bool = False,
inference_id: Optional[str] = None,
**kwargs,
) -> Any:
"""Handles the parse method when streaming is disabled.

Parameters
----------
parse_func : callable
The parse method to handle.
is_azure_openai : bool, optional
Whether the client is an Azure OpenAI client, by default False
inference_id : Optional[str], optional
A user-generated inference id, by default None

Returns
-------
Any
The parsed completion response.
"""
start_time = time.time()
response = await parse_func(*args, **kwargs)
end_time = time.time()

# Try to add step to the trace
try:
output_data = parse_structured_output_data(response)
trace_args = create_trace_args(
end_time=end_time,
inputs={"prompt": kwargs["messages"]},
output=output_data,
latency=(end_time - start_time) * 1000,
tokens=response.usage.total_tokens,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
model=response.model,
model_parameters=get_model_parameters(kwargs),
raw_output=response.model_dump(),
id=inference_id,
metadata={
"method": "parse",
"response_format": kwargs.get("response_format"),
},
)

add_to_trace(
is_azure_openai=is_azure_openai,
**trace_args,
)
# pylint: disable=broad-except
except Exception as e:
logger.error(
"Failed to trace the parse chat completion request with Openlayer. %s", e
)

return response
Loading