Skip to content

Commit 44f1253

Browse files
authored
Python: Agent response callbacks to provide full context (#12501)
### Motivation and Context <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> Currently, the agent response callbacks in the `Orchestrations` are triggered only when the final responses from the agents become available. This limits developers to create solutions that can expose the internal processing of the agents in multi agent orchestrations. ### Description <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> The callbacks will now be triggered by the intermediate messages of the agents: 1. A new sample 2. Tests P.s. Fix a bug where the name of the intermediate messages is not assigned. ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [x] The code builds clean without any errors or warnings - [x] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [x] All unit tests pass, and I have added new tests where possible - [x] I didn't break anyone 😄
1 parent 8d1b3fd commit 44f1253

File tree

13 files changed

+441
-45
lines changed

13 files changed

+441
-45
lines changed

python/samples/getting_started_with_agents/multi_agent_orchestration/step2b_sequential_streaming_agent_response_callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from semantic_kernel.agents import Agent, ChatCompletionAgent, SequentialOrchestration
66
from semantic_kernel.agents.runtime import InProcessRuntime
77
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
8-
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
8+
from semantic_kernel.contents import StreamingChatMessageContent
99

1010
"""
1111
The following sample demonstrates how to create a sequential orchestration for

python/samples/getting_started_with_agents/multi_agent_orchestration/step4_handoff.py

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs
66
from semantic_kernel.agents.runtime import InProcessRuntime
77
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
8-
from semantic_kernel.contents import AuthorRole, ChatMessageContent
8+
from semantic_kernel.contents import AuthorRole, ChatMessageContent, FunctionCallContent, FunctionResultContent
99
from semantic_kernel.functions import kernel_function
1010

1111
"""
@@ -120,8 +120,18 @@ def get_agents() -> tuple[list[Agent], OrchestrationHandoffs]:
120120

121121

122122
def agent_response_callback(message: ChatMessageContent) -> None:
123-
"""Observer function to print the messages from the agents."""
123+
"""Observer function to print the messages from the agents.
124+
125+
Please note that this function is called whenever the agent generates a response,
126+
including the internal processing messages (such as tool calls) that are not visible
127+
to other agents in the orchestration.
128+
"""
124129
print(f"{message.name}: {message.content}")
130+
for item in message.items:
131+
if isinstance(item, FunctionCallContent):
132+
print(f"Calling '{item.name}' with arguments '{item.arguments}'")
133+
if isinstance(item, FunctionResultContent):
134+
print(f"Result from '{item.name}' is '{item.result}'")
125135

126136

127137
def human_response_function() -> ChatMessageContent:
@@ -147,7 +157,7 @@ async def main():
147157

148158
# 3. Invoke the orchestration with a task and the runtime
149159
orchestration_result = await handoff_orchestration.invoke(
150-
task="A customer is on the line.",
160+
task="Greet the customer who is reaching out for support.",
151161
runtime=runtime,
152162
)
153163

@@ -160,24 +170,48 @@ async def main():
160170

161171
"""
162172
Sample output:
163-
TriageAgent: Hello! Thank you for reaching out. How can I assist you today?
173+
TriageAgent: Hello! Thank you for reaching out for support. How can I assist you today?
164174
User: I'd like to track the status of my order
165-
OrderStatusAgent: Sure, I can help you with that. Could you please provide me with your order ID?
175+
TriageAgent:
176+
Calling 'Handoff-transfer_to_OrderStatusAgent' with arguments '{}'
177+
TriageAgent:
178+
Result from 'Handoff-transfer_to_OrderStatusAgent' is 'None'
179+
OrderStatusAgent: Could you please provide me with your order ID so I can check the status for you?
166180
User: My order ID is 123
167-
OrderStatusAgent: Your order with ID 123 has been shipped and is expected to arrive in 2-3 days. Is there anything
168-
else I can assist you with?
181+
OrderStatusAgent:
182+
Calling 'OrderStatusPlugin-check_order_status' with arguments '{"order_id":"123"}'
183+
OrderStatusAgent:
184+
Result from 'OrderStatusPlugin-check_order_status' is 'Order 123 is shipped and will arrive in 2-3 days.'
185+
OrderStatusAgent: Your order with ID 123 has been shipped and is expected to arrive in 2-3 days. If you have any
186+
more questions, feel free to ask!
169187
User: I want to return another order of mine
170-
OrderReturnAgent: I can help you with returning your order. Could you please provide the order ID for the return
171-
and the reason you'd like to return it?
188+
OrderStatusAgent: I can help you with that. Could you please provide me with the order ID of the order you want
189+
to return?
172190
User: Order ID 321
173-
OrderReturnAgent: Please provide the reason for returning the order with ID 321.
191+
OrderStatusAgent:
192+
Calling 'Handoff-transfer_to_TriageAgent' with arguments '{}'
193+
OrderStatusAgent:
194+
Result from 'Handoff-transfer_to_TriageAgent' is 'None'
195+
TriageAgent:
196+
Calling 'Handoff-transfer_to_OrderReturnAgent' with arguments '{}'
197+
TriageAgent:
198+
Result from 'Handoff-transfer_to_OrderReturnAgent' is 'None'
199+
OrderReturnAgent: Could you please provide me with the reason for the return for order ID 321?
174200
User: Broken item
175201
Processing return for order 321 due to: Broken item
176-
OrderReturnAgent: The return for your order with ID 321 has been successfully processed due to the broken item.
177-
Is there anything else I can assist you with?
202+
OrderReturnAgent:
203+
Calling 'OrderReturnPlugin-process_return' with arguments '{"order_id":"321","reason":"Broken item"}'
204+
OrderReturnAgent:
205+
Result from 'OrderReturnPlugin-process_return' is 'Return for order 321 has been processed successfully.'
206+
OrderReturnAgent: The return for order ID 321 has been processed successfully due to a broken item. If you need
207+
further assistance or have any other questions, feel free to let me know!
178208
User: No, bye
179-
Task is completed with summary: Handled order return for order ID 321 due to a broken item, and successfully
180-
processed the return.
209+
Task is completed with summary: Processed the return request for order ID 321 due to a broken item.
210+
OrderReturnAgent:
211+
Calling 'Handoff-complete_task' with arguments '{"task_summary":"Processed the return request for order ID 321
212+
due to a broken item."}'
213+
OrderReturnAgent:
214+
Result from 'Handoff-complete_task' is 'None'
181215
"""
182216

183217

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
# Copyright (c) Microsoft. All rights reserved.
2+
3+
import asyncio
4+
5+
from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs
6+
from semantic_kernel.agents.runtime import InProcessRuntime
7+
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
8+
from semantic_kernel.contents import (
9+
AuthorRole,
10+
ChatMessageContent,
11+
FunctionCallContent,
12+
FunctionResultContent,
13+
StreamingChatMessageContent,
14+
)
15+
from semantic_kernel.functions import kernel_function
16+
17+
"""
18+
The following sample demonstrates how to create a handoff orchestration that represents
19+
a customer support triage system. The orchestration consists of 4 agents, each specialized
20+
in a different area of customer support: triage, refunds, order status, and order returns.
21+
22+
The orchestration is configured with a streaming agent response callback that prints the
23+
messages from the agents as they are generated.
24+
25+
Depending on the customer's request, agents can hand off the conversation to the appropriate
26+
agent.
27+
28+
Human in the loop is achieved via a callback function similar to the one used in group chat
29+
orchestration. Except that in the handoff orchestration, all agents have access to the
30+
human response function, whereas in the group chat orchestration, only the manager has access
31+
to the human response function.
32+
33+
This sample demonstrates the basic steps of creating and starting a runtime, creating
34+
a handoff orchestration, invoking the orchestration, and finally waiting for the results.
35+
"""
36+
37+
38+
class OrderStatusPlugin:
39+
@kernel_function
40+
def check_order_status(self, order_id: str) -> str:
41+
"""Check the status of an order."""
42+
# Simulate checking the order status
43+
return f"Order {order_id} is shipped and will arrive in 2-3 days."
44+
45+
46+
class OrderRefundPlugin:
47+
@kernel_function
48+
def process_refund(self, order_id: str, reason: str) -> str:
49+
"""Process a refund for an order."""
50+
# Simulate processing a refund
51+
print(f"Processing refund for order {order_id} due to: {reason}")
52+
return f"Refund for order {order_id} has been processed successfully."
53+
54+
55+
class OrderReturnPlugin:
56+
@kernel_function
57+
def process_return(self, order_id: str, reason: str) -> str:
58+
"""Process a return for an order."""
59+
# Simulate processing a return
60+
print(f"Processing return for order {order_id} due to: {reason}")
61+
return f"Return for order {order_id} has been processed successfully."
62+
63+
64+
def get_agents() -> tuple[list[Agent], OrchestrationHandoffs]:
65+
"""Return a list of agents that will participate in the Handoff orchestration and the handoff relationships.
66+
67+
Feel free to add or remove agents and handoff connections.
68+
"""
69+
support_agent = ChatCompletionAgent(
70+
name="TriageAgent",
71+
description="A customer support agent that triages issues.",
72+
instructions="Handle customer requests.",
73+
service=AzureChatCompletion(),
74+
)
75+
76+
refund_agent = ChatCompletionAgent(
77+
name="RefundAgent",
78+
description="A customer support agent that handles refunds.",
79+
instructions="Handle refund requests.",
80+
service=AzureChatCompletion(),
81+
plugins=[OrderRefundPlugin()],
82+
)
83+
84+
order_status_agent = ChatCompletionAgent(
85+
name="OrderStatusAgent",
86+
description="A customer support agent that checks order status.",
87+
instructions="Handle order status requests.",
88+
service=AzureChatCompletion(),
89+
plugins=[OrderStatusPlugin()],
90+
)
91+
92+
order_return_agent = ChatCompletionAgent(
93+
name="OrderReturnAgent",
94+
description="A customer support agent that handles order returns.",
95+
instructions="Handle order return requests.",
96+
service=AzureChatCompletion(),
97+
plugins=[OrderReturnPlugin()],
98+
)
99+
100+
# Define the handoff relationships between agents
101+
handoffs = (
102+
OrchestrationHandoffs()
103+
.add_many(
104+
source_agent=support_agent.name,
105+
target_agents={
106+
refund_agent.name: "Transfer to this agent if the issue is refund related",
107+
order_status_agent.name: "Transfer to this agent if the issue is order status related",
108+
order_return_agent.name: "Transfer to this agent if the issue is order return related",
109+
},
110+
)
111+
.add(
112+
source_agent=refund_agent.name,
113+
target_agent=support_agent.name,
114+
description="Transfer to this agent if the issue is not refund related",
115+
)
116+
.add(
117+
source_agent=order_status_agent.name,
118+
target_agent=support_agent.name,
119+
description="Transfer to this agent if the issue is not order status related",
120+
)
121+
.add(
122+
source_agent=order_return_agent.name,
123+
target_agent=support_agent.name,
124+
description="Transfer to this agent if the issue is not order return related",
125+
)
126+
)
127+
128+
return [support_agent, refund_agent, order_status_agent, order_return_agent], handoffs
129+
130+
131+
# Flag to indicate if a new message is being received
132+
is_new_message = True
133+
134+
135+
def streaming_agent_response_callback(message: StreamingChatMessageContent, is_final: bool) -> None:
136+
"""Observer function to print the messages from the agents.
137+
138+
Please note that this function is called whenever the agent generates a response,
139+
including the internal processing messages (such as tool calls) that are not visible
140+
to other agents in the orchestration.
141+
142+
In streaming mode, the FunctionCallContent and FunctionResultContent are provided as a
143+
complete message.
144+
145+
Args:
146+
message (StreamingChatMessageContent): The streaming message content from the agent.
147+
is_final (bool): Indicates if this is the final part of the message.
148+
"""
149+
global is_new_message
150+
if is_new_message:
151+
print(f"{message.name}: ", end="", flush=True)
152+
is_new_message = False
153+
print(message.content, end="", flush=True)
154+
155+
for item in message.items:
156+
if isinstance(item, FunctionCallContent):
157+
print(f"Calling '{item.name}' with arguments '{item.arguments}'", end="", flush=True)
158+
if isinstance(item, FunctionResultContent):
159+
print(f"Result from '{item.name}' is '{item.result}'", end="", flush=True)
160+
161+
if is_final:
162+
print()
163+
is_new_message = True
164+
165+
166+
def human_response_function() -> ChatMessageContent:
167+
"""Observer function to print the messages from the agents."""
168+
user_input = input("User: ")
169+
return ChatMessageContent(role=AuthorRole.USER, content=user_input)
170+
171+
172+
async def main():
173+
"""Main function to run the agents."""
174+
# 1. Create a handoff orchestration with multiple agents
175+
agents, handoffs = get_agents()
176+
handoff_orchestration = HandoffOrchestration(
177+
members=agents,
178+
handoffs=handoffs,
179+
streaming_agent_response_callback=streaming_agent_response_callback,
180+
human_response_function=human_response_function,
181+
)
182+
183+
# 2. Create a runtime and start it
184+
runtime = InProcessRuntime()
185+
runtime.start()
186+
187+
# 3. Invoke the orchestration with a task and the runtime
188+
orchestration_result = await handoff_orchestration.invoke(
189+
task="Greet the customer who is reaching out for support.",
190+
runtime=runtime,
191+
)
192+
193+
# 4. Wait for the results
194+
value = await orchestration_result.get()
195+
print(value)
196+
197+
# 5. Stop the runtime after the invocation is complete
198+
await runtime.stop_when_idle()
199+
200+
"""
201+
Sample output:
202+
TriageAgent: Hello! Thank you for reaching out for support. How can I assist you today?
203+
User: I'd like to track the status of my order
204+
TriageAgent: Calling 'Handoff-transfer_to_OrderStatusAgent' with arguments '{}'
205+
TriageAgent: Result from 'Handoff-transfer_to_OrderStatusAgent' is 'None'
206+
OrderStatusAgent: Could you please provide me with your order ID? This will help me check the status of your order.
207+
User: My order ID is 123
208+
OrderStatusAgent: Calling 'OrderStatusPlugin-check_order_status' with arguments '{"order_id":"123"}'
209+
OrderStatusAgent: Result from 'OrderStatusPlugin-check_order_status' is 'Order 123 is shipped and will arrive in
210+
2-3 days.'
211+
OrderStatusAgent: Your order with ID 123 has been shipped and is expected to arrive in 2-3 days. If you have any
212+
more questions, feel free to ask!
213+
User: I want to return another order of mine
214+
OrderStatusAgent: Calling 'Handoff-transfer_to_TriageAgent' with arguments '{}'
215+
OrderStatusAgent: Result from 'Handoff-transfer_to_TriageAgent' is 'None'
216+
TriageAgent: Calling 'Handoff-transfer_to_OrderReturnAgent' with arguments '{}'
217+
TriageAgent: Result from 'Handoff-transfer_to_OrderReturnAgent' is 'None'
218+
OrderReturnAgent: Could you please provide me with the order ID for the order you would like to return, as well
219+
as the reason for the return?
220+
User: Order ID 321
221+
OrderReturnAgent: What is the reason for returning order ID 321?
222+
User: Broken item
223+
Processing return for order 321 due to: Broken item
224+
OrderReturnAgent: Calling 'OrderReturnPlugin-process_return' with arguments '{"order_id":"321","reason":"Broken
225+
item"}'
226+
OrderReturnAgent: Result from 'OrderReturnPlugin-process_return' is 'Return for order 321 has been processed
227+
successfully.'
228+
OrderReturnAgent: Task is completed with summary: Processed return for order ID 321 due to a broken item.
229+
Calling 'Handoff-complete_task' with arguments '{"task_summary":"Processed return for order ID 321 due to a
230+
broken item."}'
231+
OrderReturnAgent: Result from 'Handoff-complete_task' is 'None'
232+
"""
233+
234+
235+
if __name__ == "__main__":
236+
asyncio.run(main())

python/semantic_kernel/agents/chat_completion/chat_completion_agent.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ async def _drain_mutated_messages(
596596
drained: list[ChatMessageContent] = []
597597
for i in range(start, len(history)):
598598
msg: ChatMessageContent = history[i] # type: ignore
599+
msg.name = self.name
599600
await thread.on_new_message(msg)
600601
drained.append(msg)
601602
return drained

python/semantic_kernel/agents/orchestration/agent_actor_base.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,12 @@ async def _invoke_agent(self, additional_messages: DefaultTypeAlias | None = Non
111111
streaming_message_buffer: list[StreamingChatMessageContent] = []
112112
messages = self._create_messages(additional_messages)
113113

114-
async for response_item in self._agent.invoke_stream(messages=messages, thread=self._agent_thread, **kwargs): # type: ignore[arg-type]
114+
async for response_item in self._agent.invoke_stream(
115+
messages, # type: ignore[arg-type]
116+
thread=self._agent_thread,
117+
on_intermediate_message=self._handle_intermediate_message,
118+
**kwargs,
119+
):
115120
# Buffer message chunks and stream them with correct is_final flag.
116121
streaming_message_buffer.append(response_item.message)
117122
if len(streaming_message_buffer) > 1:
@@ -149,3 +154,24 @@ def _create_messages(self, additional_messages: DefaultTypeAlias | None = None)
149154
if isinstance(additional_messages, list):
150155
return base_messages + additional_messages
151156
return [*base_messages, additional_messages]
157+
158+
async def _handle_intermediate_message(self, message: ChatMessageContent) -> None:
159+
"""Handle intermediate messages from the agent.
160+
161+
This method is called with messages produced during streaming agent responses.
162+
Although the parameter is typed as `ChatMessageContent` (to match the `invoke_stream` callback signature),
163+
the actual object will always be a `StreamingChatMessageContent` (a subclass of `ChatMessageContent`).
164+
165+
The agent response callback expects a `ChatMessageContent`, so we can pass the message directly.
166+
However, the streaming agent response callback specifically requires a `StreamingChatMessageContent`.
167+
To avoid type errors from the static type checker due to down casting (from `ChatMessageContent` to
168+
`StreamingChatMessageContent`), we check that the message is of the correct type before calling the callbacks.
169+
Since it will always be a `StreamingChatMessageContent`, this check is safe.
170+
"""
171+
if not isinstance(message, StreamingChatMessageContent):
172+
raise TypeError(
173+
f"Expected message to be of type 'StreamingChatMessageContent', "
174+
f"but got '{type(message).__name__}' instead."
175+
)
176+
await self._call_agent_response_callback(message)
177+
await self._call_streaming_agent_response_callback(message, is_final=True)

0 commit comments

Comments
 (0)