Skip to content

Commit e14fb8f

Browse files
authored
OTel GenAI Traces for Agent and Tool (#6653)
Add OTel GenAI traces: - `create_agent` - `invoke_agnet` - `execute_tool` Introduces context manager helpers to create these traces. The helpers also serve as instrumentation points for other instrumentation libraries. Resolves #6644
1 parent 892492f commit e14fb8f

File tree

22 files changed

+813
-597
lines changed

22 files changed

+813
-597
lines changed

python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1323,7 +1323,7 @@ async def _execute_tool_call(
13231323
for handoff_tool in handoff_tools:
13241324
if tool_call.name == handoff_tool.name:
13251325
# Run handoff tool call.
1326-
result = await handoff_tool.run_json(arguments, cancellation_token)
1326+
result = await handoff_tool.run_json(arguments, cancellation_token, call_id=tool_call.id)
13271327
result_as_str = handoff_tool.return_value_as_string(result)
13281328
return (
13291329
tool_call,
@@ -1343,6 +1343,7 @@ async def _execute_tool_call(
13431343
name=tool_call.name,
13441344
arguments=arguments,
13451345
cancellation_token=cancellation_token,
1346+
call_id=tool_call.id,
13461347
)
13471348
return (
13481349
tool_call,

python/packages/autogen-agentchat/src/autogen_agentchat/agents/_base_chat_agent.py

Lines changed: 81 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from abc import ABC, abstractmethod
22
from typing import Any, AsyncGenerator, List, Mapping, Sequence
33

4-
from autogen_core import CancellationToken, ComponentBase
4+
from autogen_core import CancellationToken, ComponentBase, trace_create_agent_span, trace_invoke_agent_span
55
from pydantic import BaseModel
66

77
from ..base import ChatAgent, Response, TaskResult
@@ -39,10 +39,15 @@ class BaseChatAgent(ChatAgent, ABC, ComponentBase[BaseModel]):
3939
component_type = "agent"
4040

4141
def __init__(self, name: str, description: str) -> None:
42-
self._name = name
43-
if self._name.isidentifier() is False:
44-
raise ValueError("The agent name must be a valid Python identifier.")
45-
self._description = description
42+
"""Initialize the agent with a name and description."""
43+
with trace_create_agent_span(
44+
agent_name=name,
45+
agent_description=description,
46+
):
47+
self._name = name
48+
if self._name.isidentifier() is False:
49+
raise ValueError("The agent name must be a valid Python identifier.")
50+
self._description = description
4651

4752
@property
4853
def name(self) -> str:
@@ -110,34 +115,38 @@ async def run(
110115
cancellation_token: CancellationToken | None = None,
111116
) -> TaskResult:
112117
"""Run the agent with the given task and return the result."""
113-
if cancellation_token is None:
114-
cancellation_token = CancellationToken()
115-
input_messages: List[BaseChatMessage] = []
116-
output_messages: List[BaseAgentEvent | BaseChatMessage] = []
117-
if task is None:
118-
pass
119-
elif isinstance(task, str):
120-
text_msg = TextMessage(content=task, source="user")
121-
input_messages.append(text_msg)
122-
output_messages.append(text_msg)
123-
elif isinstance(task, BaseChatMessage):
124-
input_messages.append(task)
125-
output_messages.append(task)
126-
else:
127-
if not task:
128-
raise ValueError("Task list cannot be empty.")
129-
# Task is a sequence of messages.
130-
for msg in task:
131-
if isinstance(msg, BaseChatMessage):
132-
input_messages.append(msg)
133-
output_messages.append(msg)
134-
else:
135-
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
136-
response = await self.on_messages(input_messages, cancellation_token)
137-
if response.inner_messages is not None:
138-
output_messages += response.inner_messages
139-
output_messages.append(response.chat_message)
140-
return TaskResult(messages=output_messages)
118+
with trace_invoke_agent_span(
119+
agent_name=self.name,
120+
agent_description=self.description,
121+
):
122+
if cancellation_token is None:
123+
cancellation_token = CancellationToken()
124+
input_messages: List[BaseChatMessage] = []
125+
output_messages: List[BaseAgentEvent | BaseChatMessage] = []
126+
if task is None:
127+
pass
128+
elif isinstance(task, str):
129+
text_msg = TextMessage(content=task, source="user")
130+
input_messages.append(text_msg)
131+
output_messages.append(text_msg)
132+
elif isinstance(task, BaseChatMessage):
133+
input_messages.append(task)
134+
output_messages.append(task)
135+
else:
136+
if not task:
137+
raise ValueError("Task list cannot be empty.")
138+
# Task is a sequence of messages.
139+
for msg in task:
140+
if isinstance(msg, BaseChatMessage):
141+
input_messages.append(msg)
142+
output_messages.append(msg)
143+
else:
144+
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
145+
response = await self.on_messages(input_messages, cancellation_token)
146+
if response.inner_messages is not None:
147+
output_messages += response.inner_messages
148+
output_messages.append(response.chat_message)
149+
return TaskResult(messages=output_messages)
141150

142151
async def run_stream(
143152
self,
@@ -147,42 +156,46 @@ async def run_stream(
147156
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
148157
"""Run the agent with the given task and return a stream of messages
149158
and the final task result as the last item in the stream."""
150-
if cancellation_token is None:
151-
cancellation_token = CancellationToken()
152-
input_messages: List[BaseChatMessage] = []
153-
output_messages: List[BaseAgentEvent | BaseChatMessage] = []
154-
if task is None:
155-
pass
156-
elif isinstance(task, str):
157-
text_msg = TextMessage(content=task, source="user")
158-
input_messages.append(text_msg)
159-
output_messages.append(text_msg)
160-
yield text_msg
161-
elif isinstance(task, BaseChatMessage):
162-
input_messages.append(task)
163-
output_messages.append(task)
164-
yield task
165-
else:
166-
if not task:
167-
raise ValueError("Task list cannot be empty.")
168-
for msg in task:
169-
if isinstance(msg, BaseChatMessage):
170-
input_messages.append(msg)
171-
output_messages.append(msg)
172-
yield msg
173-
else:
174-
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
175-
async for message in self.on_messages_stream(input_messages, cancellation_token):
176-
if isinstance(message, Response):
177-
yield message.chat_message
178-
output_messages.append(message.chat_message)
179-
yield TaskResult(messages=output_messages)
159+
with trace_invoke_agent_span(
160+
agent_name=self.name,
161+
agent_description=self.description,
162+
):
163+
if cancellation_token is None:
164+
cancellation_token = CancellationToken()
165+
input_messages: List[BaseChatMessage] = []
166+
output_messages: List[BaseAgentEvent | BaseChatMessage] = []
167+
if task is None:
168+
pass
169+
elif isinstance(task, str):
170+
text_msg = TextMessage(content=task, source="user")
171+
input_messages.append(text_msg)
172+
output_messages.append(text_msg)
173+
yield text_msg
174+
elif isinstance(task, BaseChatMessage):
175+
input_messages.append(task)
176+
output_messages.append(task)
177+
yield task
180178
else:
181-
yield message
182-
if isinstance(message, ModelClientStreamingChunkEvent):
183-
# Skip the model client streaming chunk events.
184-
continue
185-
output_messages.append(message)
179+
if not task:
180+
raise ValueError("Task list cannot be empty.")
181+
for msg in task:
182+
if isinstance(msg, BaseChatMessage):
183+
input_messages.append(msg)
184+
output_messages.append(msg)
185+
yield msg
186+
else:
187+
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
188+
async for message in self.on_messages_stream(input_messages, cancellation_token):
189+
if isinstance(message, Response):
190+
yield message.chat_message
191+
output_messages.append(message.chat_message)
192+
yield TaskResult(messages=output_messages)
193+
else:
194+
yield message
195+
if isinstance(message, ModelClientStreamingChunkEvent):
196+
# Skip the model client streaming chunk events.
197+
continue
198+
output_messages.append(message)
186199

187200
@abstractmethod
188201
async def on_reset(self, cancellation_token: CancellationToken) -> None:

python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_chat_agent_container.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Any, List, Mapping
22

3-
from autogen_core import DefaultTopicId, MessageContext, event, rpc
3+
from autogen_core import DefaultTopicId, MessageContext, event, rpc, trace_invoke_agent_span
44

55
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, MessageFactory
66

@@ -73,36 +73,41 @@ async def handle_reset(self, message: GroupChatReset, ctx: MessageContext) -> No
7373
async def handle_request(self, message: GroupChatRequestPublish, ctx: MessageContext) -> None:
7474
"""Handle a content request event by passing the messages in the buffer
7575
to the delegate agent and publish the response."""
76-
try:
77-
# Pass the messages in the buffer to the delegate agent.
78-
response: Response | None = None
79-
async for msg in self._agent.on_messages_stream(self._message_buffer, ctx.cancellation_token):
80-
if isinstance(msg, Response):
81-
await self._log_message(msg.chat_message)
82-
response = msg
83-
else:
84-
await self._log_message(msg)
85-
if response is None:
86-
raise ValueError(
87-
"The agent did not produce a final response. Check the agent's on_messages_stream method."
76+
with trace_invoke_agent_span(
77+
agent_name=self._agent.name,
78+
agent_description=self._agent.description,
79+
agent_id=str(self.id),
80+
):
81+
try:
82+
# Pass the messages in the buffer to the delegate agent.
83+
response: Response | None = None
84+
async for msg in self._agent.on_messages_stream(self._message_buffer, ctx.cancellation_token):
85+
if isinstance(msg, Response):
86+
await self._log_message(msg.chat_message)
87+
response = msg
88+
else:
89+
await self._log_message(msg)
90+
if response is None:
91+
raise ValueError(
92+
"The agent did not produce a final response. Check the agent's on_messages_stream method."
93+
)
94+
# Publish the response to the group chat.
95+
self._message_buffer.clear()
96+
await self.publish_message(
97+
GroupChatAgentResponse(agent_response=response, agent_name=self._agent.name),
98+
topic_id=DefaultTopicId(type=self._parent_topic_type),
99+
cancellation_token=ctx.cancellation_token,
88100
)
89-
# Publish the response to the group chat.
90-
self._message_buffer.clear()
91-
await self.publish_message(
92-
GroupChatAgentResponse(agent_response=response, agent_name=self._agent.name),
93-
topic_id=DefaultTopicId(type=self._parent_topic_type),
94-
cancellation_token=ctx.cancellation_token,
95-
)
96-
except Exception as e:
97-
# Publish the error to the group chat.
98-
error_message = SerializableException.from_exception(e)
99-
await self.publish_message(
100-
GroupChatError(error=error_message),
101-
topic_id=DefaultTopicId(type=self._parent_topic_type),
102-
cancellation_token=ctx.cancellation_token,
103-
)
104-
# Raise the error to the runtime.
105-
raise
101+
except Exception as e:
102+
# Publish the error to the group chat.
103+
error_message = SerializableException.from_exception(e)
104+
await self.publish_message(
105+
GroupChatError(error=error_message),
106+
topic_id=DefaultTopicId(type=self._parent_topic_type),
107+
cancellation_token=ctx.cancellation_token,
108+
)
109+
# Raise the error to the runtime.
110+
raise
106111

107112
def _buffer_message(self, message: BaseChatMessage) -> None:
108113
if not self._message_factory.is_registered(message.__class__):
Lines changed: 2 additions & 2 deletions
Loading

0 commit comments

Comments
 (0)