Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,19 @@ async def _run_stream_impl(

session_messages: list[Message] = session_context.get_messages(include_input=True)
all_updates: list[AgentResponseUpdate] = []
emitted_message_ids: set[str] = set()
async for event in self._run_core(
session_messages, checkpoint_id, checkpoint_storage, streaming=True, **kwargs
):
updates = self._convert_workflow_event_to_agent_response_updates(response_id, event)
for update in updates:
# Deduplicate: orchestrations (e.g. HandoffBuilder) may yield the full
# conversation at termination, re-emitting messages that were already
# streamed individually. Skip updates whose message_id was already sent.
if update.message_id and update.message_id in emitted_message_ids:
continue
if update.message_id:
emitted_message_ids.add(update.message_id)
all_updates.append(update)
yield update

Expand Down Expand Up @@ -449,6 +457,7 @@ def _convert_workflow_events_to_agent_response(
raw_representations: list[object] = []
merged_usage: UsageDetails | None = None
latest_created_at: str | None = None
seen_message_ids: set[str] = set()

for output_event in output_events:
if output_event.type == "request_info":
Expand All @@ -475,8 +484,17 @@ def _convert_workflow_events_to_agent_response(
)

if isinstance(data, AgentResponse):
messages.extend(data.messages)
raw_representations.append(data.raw_representation)
non_user_messages = [
msg for msg in data.messages
if msg.role != "user"
and not (msg.message_id and msg.message_id in seen_message_ids)
]
for msg in non_user_messages:
if msg.message_id:
seen_message_ids.add(msg.message_id)
messages.extend(non_user_messages)
if non_user_messages:
raw_representations.append(data.raw_representation)
merged_usage = add_usage_details(merged_usage, data.usage_details)
latest_created_at = (
data.created_at
Expand All @@ -486,12 +504,24 @@ def _convert_workflow_events_to_agent_response(
else latest_created_at
)
elif isinstance(data, Message):
messages.append(data)
raw_representations.append(data.raw_representation)
if data.role != "user" and not (data.message_id and data.message_id in seen_message_ids):
if data.message_id:
seen_message_ids.add(data.message_id)
messages.append(data)
raw_representations.append(data.raw_representation)
elif is_instance_of(data, list[Message]):
chat_messages = cast(list[Message], data)
messages.extend(chat_messages)
raw_representations.append(data)
non_user_messages = [
msg for msg in chat_messages
if msg.role != "user"
and not (msg.message_id and msg.message_id in seen_message_ids)
]
for msg in non_user_messages:
if msg.message_id:
seen_message_ids.add(msg.message_id)
messages.extend(non_user_messages)
if non_user_messages:
raw_representations.append(data)
else:
contents = self._extract_contents(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue: user messages are filtered from messages but raw_representations.append(data) on the next line still runs, even if every message in the list was user-role. Either filter data before appending to raw_representations, or centralise the filtering after collection so both lists stay consistent.

if not contents:
Expand Down Expand Up @@ -571,14 +601,20 @@ def _convert_workflow_event_to_agent_response_updates(
executor_id = event.executor_id

if isinstance(data, AgentResponseUpdate):
# Pass through AgentResponseUpdate directly (streaming from AgentExecutor)
# Pass through AgentResponseUpdate directly (streaming from AgentExecutor).
# Filter user-role updates: orchestrations (e.g. HandoffBuilder) may emit the
# full conversation including user messages, which should not be echoed back.
if data.role == "user":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New early-return for AgentResponseUpdate with role='user' has no test coverage. An executor could emit an AgentResponseUpdate directly — add a test that yields one with role='user' and verifies it is dropped from the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning early for user-role AgentResponseUpdate silently swallows the event. If a future workflow intentionally emits a user-role update (e.g., for a 'thinking' UI that replays the user prompt), this will be very hard to debug. Consider filtering at the handoff layer instead of the generic conversion layer, or at minimum add a comment explaining why user-role updates are universally suppressed.

return []
if not data.author_name:
data.author_name = executor_id
return [data]
if isinstance(data, AgentResponse):
# Convert each message in AgentResponse to an AgentResponseUpdate
updates: list[AgentResponseUpdate] = []
for msg in data.messages:
if msg.role == "user":
continue
updates.append(
AgentResponseUpdate(
contents=list(msg.contents),
Expand All @@ -593,6 +629,8 @@ def _convert_workflow_event_to_agent_response_updates(
)
return updates
if isinstance(data, Message):
if data.role == "user":
return []
return [
AgentResponseUpdate(
contents=list(data.contents),
Expand All @@ -609,6 +647,8 @@ def _convert_workflow_event_to_agent_response_updates(
chat_messages = cast(list[Message], data)
updates = []
for msg in chat_messages:
if msg.role == "user":
continue
updates.append(
AgentResponseUpdate(
contents=list(msg.contents),
Expand Down
194 changes: 184 additions & 10 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,11 @@ async def raw_yielding_executor(
assert updates[2].raw_representation.value == 42

async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None:
"""Test that yield_output with list[Message] extracts contents from all messages.
"""Test that yield_output with list[Message] extracts contents from non-user messages.

Note: Content items are coalesced by _finalize_response, so multiple text contents
become a single merged Content in the final response.
User-role messages are filtered out since agent responses should only contain
assistant/tool output. Content items are coalesced by _finalize_response, so
multiple text contents become a single merged Content in the final response.
"""

@executor
Expand All @@ -491,25 +492,198 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N
workflow = WorkflowBuilder(start_executor=list_yielding_executor).build()
agent = workflow.as_agent("list-msg-agent")

# Verify streaming returns the update with all 4 contents before coalescing
# Verify streaming returns updates for non-user messages only
updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

assert len(updates) == 3
assert len(updates) == 2
full_response = AgentResponse.from_updates(updates)
assert len(full_response.messages) == 3
assert len(full_response.messages) == 2
texts = [message.text for message in full_response.messages]
# Note: `from_agent_run_response_updates` coalesces multiple text contents into one content
assert texts == ["first message", "second message", "thirdfourth"]
# Note: AgentResponse.from_updates coalesces multiple text contents into a single merged Content
assert texts == ["second message", "thirdfourth"]

# Verify run()
result = await agent.run("test")

assert isinstance(result, AgentResponse)
assert len(result.messages) == 3
assert len(result.messages) == 2
texts = [message.text for message in result.messages]
assert texts == ["first message", "second message", "third fourth"]
assert texts == ["second message", "third fourth"]

async def test_workflow_as_agent_filters_user_role_agent_response_update(self) -> None:
"""Test that AgentResponseUpdate with role='user' is dropped from the stream."""

@executor
async def user_update_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponseUpdate]) -> None:
# Emit a user-role AgentResponseUpdate directly
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="echoed user input")],
role="user",
author_name="test",
response_id="resp-1",
message_id="msg-1",
created_at="2026-01-01T00:00:00Z",
)
)
# Emit a valid assistant-role update
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="assistant reply")],
role="assistant",
author_name="test",
response_id="resp-1",
message_id="msg-2",
created_at="2026-01-01T00:00:00Z",
)
)

workflow = WorkflowBuilder(start_executor=user_update_executor).build()
agent = workflow.as_agent("user-update-agent")

updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

# User-role update should be filtered out
assert len(updates) == 1
assert updates[0].role == "assistant"
assert updates[0].contents[0].text == "assistant reply"

async def test_workflow_as_agent_deduplicates_streaming_messages(self) -> None:
"""Test that duplicate messages are deduplicated in streaming mode.

Orchestrations like HandoffBuilder emit messages individually during streaming
then re-emit the full conversation at termination. The WorkflowAgent should
deduplicate so the caller doesn't see repeated messages.
"""
msg_id_1 = str(uuid.uuid4())
msg_id_2 = str(uuid.uuid4())

@executor
async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None:
# Simulate streaming: emit individual AgentResponseUpdate messages
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="first reply")],
role="assistant",
author_name="agent-a",
response_id="resp-1",
message_id=msg_id_1,
created_at="2026-01-01T00:00:00Z",
)
)
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="second reply")],
role="assistant",
author_name="agent-b",
response_id="resp-2",
message_id=msg_id_2,
created_at="2026-01-01T00:00:01Z",
)
)
# Simulate termination: re-emit the full conversation as list[Message]
# (this is what HandoffBuilder._check_terminate_and_yield does)
await ctx.yield_output([
Message(role="user", text="user input", message_id="user-msg-1"),
Message(role="assistant", text="first reply", message_id=msg_id_1),
Message(role="assistant", text="second reply", message_id=msg_id_2),
])

workflow = WorkflowBuilder(start_executor=dedup_executor).build()
agent = workflow.as_agent("dedup-agent")

updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

# Should have exactly 2 assistant updates — no duplicates from the list[Message] yield
assert len(updates) == 2
assert updates[0].contents[0].text == "first reply"
assert updates[1].contents[0].text == "second reply"

async def test_workflow_as_agent_deduplicates_non_streaming_messages(self) -> None:
"""Test that duplicate messages are deduplicated in non-streaming mode."""
msg_id_1 = str(uuid.uuid4())
msg_id_2 = str(uuid.uuid4())

@executor
async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None:
# Emit individual AgentResponse with messages
await ctx.yield_output(
AgentResponse(
messages=[
Message(role="assistant", text="first reply", message_id=msg_id_1),
],
response_id="resp-1",
)
)
await ctx.yield_output(
AgentResponse(
messages=[
Message(role="assistant", text="second reply", message_id=msg_id_2),
],
response_id="resp-2",
)
)
# Re-emit the full conversation at termination
await ctx.yield_output([
Message(role="user", text="user input", message_id="user-msg-1"),
Message(role="assistant", text="first reply", message_id=msg_id_1),
Message(role="assistant", text="second reply", message_id=msg_id_2),
])

workflow = WorkflowBuilder(start_executor=dedup_executor).build()
agent = workflow.as_agent("dedup-agent")

result = await agent.run("test")

# Should have exactly 2 assistant messages — no duplicates from the list[Message] yield
assert len(result.messages) == 2
assert result.messages[0].text == "first reply"
assert result.messages[1].text == "second reply"

async def test_workflow_as_agent_agent_response_raw_repr_consistency(self) -> None:
"""Test that AgentResponse with only user messages does not add orphan raw_representations."""

@executor
async def mixed_response_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
# Emit an AgentResponse with only user messages
await ctx.yield_output(
AgentResponse(
messages=[Message(role="user", text="user only")],
response_id="resp-user-only",
)
)
# Emit an AgentResponse with mixed messages
await ctx.yield_output(
AgentResponse(
messages=[
Message(role="user", text="user msg"),
Message(role="assistant", text="assistant msg"),
],
response_id="resp-mixed",
)
)

workflow = WorkflowBuilder(start_executor=mixed_response_executor).build()
agent = workflow.as_agent("mixed-response-agent")

result = await agent.run("test")

# Only the assistant message from the mixed response should appear
assert len(result.messages) == 1
assert result.messages[0].text == "assistant msg"

# raw_representation should only contain the mixed response's raw_representation,
# not the user-only response's (which produced no output messages)
assert isinstance(result.raw_representation, list)
assert len(result.raw_representation) == 1

async def test_session_conversation_history_included_in_workflow_run(self) -> None:
"""Test that messages provided to agent.run() are passed through to the workflow."""
Expand Down
45 changes: 45 additions & 0 deletions python/packages/orchestrations/tests/test_handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
from agent_framework import (
Agent,
AgentResponseUpdate,
BaseContextProvider,
ChatResponse,
ChatResponseUpdate,
Expand Down Expand Up @@ -1117,3 +1118,47 @@ def get_session(self, *, service_session_id, **kwargs):

with pytest.raises(TypeError, match="Participants must be Agent instances"):
HandoffBuilder().participants([fake])


async def test_handoff_as_agent_run_stream_does_not_echo_user_input() -> None:
"""WorkflowAgent wrapping a handoff workflow must not echo user input in streamed updates.

When HandoffAgentExecutor emits the full conversation via ctx.yield_output() on
termination, user-role messages from that list should not appear as
AgentResponseUpdate items in the stream returned by WorkflowAgent.run(..., stream=True).
"""
agent = MockHandoffAgent(name="single_agent")

workflow = (
HandoffBuilder(
participants=[agent],
# Terminate immediately after the agent responds (user msg + assistant msg = 2).
termination_condition=lambda conv: len(conv) >= 2,
)
.with_start_agent(agent)
.build()
)

workflow_agent = workflow.as_agent(name="test_workflow_agent")

user_input = "Hi! Can you help me with something?"
updates: list[AgentResponseUpdate] = []
async for update in workflow_agent.run(user_input, stream=True):
updates.append(update)

assert updates, "Expected at least one streaming update"

# The core assertion: no update should carry the user role.
user_role_updates = [u for u in updates if u.role == "user"]
assert not user_role_updates, (
f"User input was echoed back in the stream as {len(user_role_updates)} update(s). "
"Expected only assistant-role updates."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only verifies the streaming path. The non-streaming path through _convert_workflow_events_to_agent_response is a separate code path that also received user-filtering changes. Add a non-streaming assertion to cover it.

Suggested change
)
assert updates, "Expected at least one streaming update"
# The core assertion: no update should carry the user role.
user_role_updates = [u for u in updates if u.role == "user"]
assert not user_role_updates, (
f"User input was echoed back in the stream as {len(user_role_updates)} update(s). "
"Expected only assistant-role updates."
)
# Also verify non-streaming path filters user messages
result = await workflow_agent.run(user_input)
user_role_messages = [m for m in result.messages if m.role == "user"]
assert not user_role_messages, (
f"User input was echoed back in non-streaming result as {len(user_role_messages)} message(s). "
"Expected only assistant-role messages."
)


# Also verify non-streaming path filters user messages
result = await workflow_agent.run(user_input)
user_role_messages = [m for m in result.messages if m.role == "user"]
assert not user_role_messages, (
f"User input was echoed back in non-streaming result as {len(user_role_messages)} message(s). "
"Expected only assistant-role messages."
)
Loading