Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,7 @@ python/dotnet-ref

# Generated filtered solution files (created by eng/scripts/New-FilteredSolution.ps1)
dotnet/filtered-*.slnx

# Local tool state
.omc/
.omx/
113 changes: 93 additions & 20 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ async def _run_impl(
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=client_kwargs,
):
if event.type == "output" or event.type == "request_info":
if event.type in ("output", "request_info") or (
event.type == "data" and isinstance(event.data, (AgentResponse, AgentResponseUpdate))
):
Comment thread
moonbox3 marked this conversation as resolved.
output_events.append(event)

result = self._convert_workflow_events_to_agent_response(response_id, output_events)
Expand Down Expand Up @@ -527,7 +529,14 @@ def _convert_workflow_events_to_agent_response(
)
raw_representations.append(output_event)
else:
# `data` events carry intermediate participant responses (e.g., orchestration
# agents emitting via emit_data_events). Reframe their text content as
# `text_reasoning` so consumers can render them like agent thinking, mirroring
# how reasoning-capable agents (Claude thinking, OpenAI reasoning) already
# surface intermediate content. `output` events pass through unchanged.
as_reasoning = output_event.type == "data"
data = output_event.data

if isinstance(data, AgentResponseUpdate):
# We cannot support AgentResponseUpdate in non-streaming mode. This is because the message
# sequence cannot be guaranteed when there are streaming updates in between non-streaming
Expand All @@ -538,7 +547,7 @@ def _convert_workflow_events_to_agent_response(
)

if isinstance(data, AgentResponse):
messages.extend(data.messages)
messages.extend(self._msg_maybe_reasoning(m, as_reasoning=as_reasoning) for m in data.messages)
raw_representations.append(data.raw_representation)
merged_usage = add_usage_details(merged_usage, data.usage_details)
latest_created_at = (
Expand All @@ -549,16 +558,18 @@ def _convert_workflow_events_to_agent_response(
else latest_created_at
)
elif isinstance(data, Message):
messages.append(data)
messages.append(self._msg_maybe_reasoning(data, as_reasoning=as_reasoning))
raw_representations.append(data.raw_representation)
elif is_instance_of(data, list[Message]):
chat_messages = cast(list[Message], data)
messages.extend(chat_messages)
messages.extend(self._msg_maybe_reasoning(m, as_reasoning=as_reasoning) for m in chat_messages)
raw_representations.append(data)
else:
contents = self._extract_contents(data)
if not contents:
continue
if as_reasoning:
contents = self._rewrite_text_to_reasoning(contents)

messages.append(
Message(
Expand Down Expand Up @@ -618,33 +629,47 @@ def _convert_workflow_event_to_agent_response_updates(
) -> list[AgentResponseUpdate]:
"""Convert a workflow event to a list of AgentResponseUpdate objects.

Events with type='output' and type='request_info' are processed.
Other workflow events are ignored as they are workflow-internal.

For 'output' events, AgentExecutor yields AgentResponseUpdate for streaming updates
via ctx.yield_output(). This method converts those to agent response updates.
Processes `output` and `request_info` events, plus `data` events carrying
`AgentResponse` or `AgentResponseUpdate` (emitted by orchestrations to surface
Comment thread
moonbox3 marked this conversation as resolved.
intermediate participants when `intermediate_outputs=True`). Other event types
are workflow-internal and ignored.

Returns:
A list of AgentResponseUpdate objects. Empty list if the event is not relevant.
"""
if event.type == "output":
# Convert workflow output to agent response updates.
# Handle different data types appropriately.
data = event.data
data: Any = event.data
if event.type == "output" or (event.type == "data" and isinstance(data, (AgentResponse, AgentResponseUpdate))):
# `data` events carry intermediate participant content (e.g., orchestration agents
# via emit_data_events). Reframe their text content as `text_reasoning` so consumers
# render them as agent thinking. `output` events pass through unchanged.
as_reasoning = event.type == "data"
executor_id = event.executor_id

def _contents(src: Sequence[Content]) -> list[Content]:
return self._rewrite_text_to_reasoning(src) if as_reasoning else list(src)

if isinstance(data, AgentResponseUpdate):
# Pass through AgentResponseUpdate directly (streaming from AgentExecutor)
if not data.author_name:
data.author_name = executor_id
return [data]
# Construct a fresh AgentResponseUpdate so we don't mutate a payload
# that AgentExecutor (and the data-event publisher) still hold references
# to in their `updates` list / output channel.
return [
AgentResponseUpdate(
contents=_contents(data.contents),
role=data.role,
author_name=data.author_name or executor_id,
response_id=data.response_id,
message_id=data.message_id,
created_at=data.created_at,
raw_representation=data.raw_representation,
)
]
if isinstance(data, AgentResponse):
# Convert each message in AgentResponse to an AgentResponseUpdate
updates: list[AgentResponseUpdate] = []
for msg in data.messages:
updates.append(
AgentResponseUpdate(
contents=list(msg.contents),
contents=_contents(msg.contents),
role=msg.role,
author_name=msg.author_name or executor_id,
response_id=data.response_id or response_id,
Expand All @@ -658,7 +683,7 @@ def _convert_workflow_event_to_agent_response_updates(
if isinstance(data, Message):
return [
AgentResponseUpdate(
contents=list(data.contents),
contents=_contents(data.contents),
role=data.role,
author_name=data.author_name or executor_id,
response_id=response_id,
Expand All @@ -674,7 +699,7 @@ def _convert_workflow_event_to_agent_response_updates(
for msg in chat_messages:
updates.append(
AgentResponseUpdate(
contents=list(msg.contents),
contents=_contents(msg.contents),
role=msg.role,
author_name=msg.author_name or executor_id,
response_id=response_id,
Expand All @@ -687,6 +712,8 @@ def _convert_workflow_event_to_agent_response_updates(
contents = self._extract_contents(data)
if not contents:
return []
if as_reasoning:
contents = self._rewrite_text_to_reasoning(contents)
return [
AgentResponseUpdate(
contents=contents,
Expand Down Expand Up @@ -791,6 +818,52 @@ def _extract_contents(self, data: Any) -> list[Content]:
return [Content.from_text(text=data)]
return [Content.from_text(text=str(data))]

@staticmethod
def _rewrite_text_to_reasoning(contents: Sequence[Content]) -> list[Content]:
"""Rewrite TextContent blocks as TextReasoningContent.

Used by WorkflowAgent to reframe content arriving on the workflow's `data` channel —
e.g., intermediate participants in an orchestration — as reasoning content from the
perspective of the wrapped workflow agent. This aligns workflow-as-agent intermediate
output with how reasoning-capable agents (Claude thinking, OpenAI reasoning) already
emit thinking content, so consumers can use one rendering path.

Non-text content (function calls, results, already-reasoning text, hosted files, etc.)
passes through unchanged.
"""
rewritten: list[Content] = []
for content in contents:
if content.type == "text":
rewritten.append(
Content.from_text_reasoning(
id=content.id,
text=content.text,
annotations=content.annotations,
additional_properties=content.additional_properties,
raw_representation=content.raw_representation,
)
)
else:
rewritten.append(content)
return rewritten

@classmethod
def _msg_as_reasoning(cls, msg: Message) -> Message:
"""Return a copy of `msg` with text content rewritten as reasoning content."""
return Message(
role=msg.role,
contents=cls._rewrite_text_to_reasoning(msg.contents),
author_name=msg.author_name,
message_id=msg.message_id,
additional_properties=msg.additional_properties,
raw_representation=msg.raw_representation,
)

@classmethod
def _msg_maybe_reasoning(cls, msg: Message, *, as_reasoning: bool) -> Message:
"""Conditional `_msg_as_reasoning`: rewrite when `as_reasoning` is True, pass through otherwise."""
return cls._msg_as_reasoning(msg) if as_reasoning else msg

class _ResponseState(TypedDict):
"""State for grouping response updates by message_id."""

Expand Down
12 changes: 12 additions & 0 deletions python/packages/core/agent_framework/_workflows/_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .._types import AgentResponse, AgentResponseUpdate, Message, ResponseStream
from ._agent_utils import resolve_agent_id
from ._const import GLOBAL_KWARGS_KEY, WORKFLOW_RUN_KWARGS_KEY
from ._events import WorkflowEvent
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._request_info_mixin import response_handler
Expand Down Expand Up @@ -141,6 +142,7 @@ def __init__(
id: str | None = None,
context_mode: Literal["full", "last_agent", "custom"] | None = None,
context_filter: Callable[[list[Message]], list[Message]] | None = None,
emit_data_events: bool = False,
):
"""Initialize the executor with a unique identifier.

Expand All @@ -158,6 +160,10 @@ def __init__(
as context for the agent run.
context_filter: An optional function for filtering conversation context when context_mode is set
to "custom".
emit_data_events: When True, additionally emits `data` events (via
`WorkflowEvent.emit`) carrying each AgentResponse / AgentResponseUpdate alongside
the existing `output` events. Orchestrations use this to surface intermediate
participants while reserving `output` events for the workflow's final answer.
"""
# Prefer provided id; else use agent.name if present; else generate deterministic prefix
exec_id = id or resolve_agent_id(agent)
Expand All @@ -183,6 +189,8 @@ def __init__(
if self._context_mode == "custom" and not self._context_filter:
raise ValueError("context_filter must be provided when context_mode is set to 'custom'.")

self._emit_data_events = emit_data_events

@property
def agent(self) -> SupportsAgentRun:
"""Get the underlying agent wrapped by this executor."""
Expand Down Expand Up @@ -429,6 +437,8 @@ async def _run_agent(self, ctx: WorkflowContext[Never, AgentResponse]) -> AgentR
client_kwargs=client_kwargs,
)
await ctx.yield_output(response)
if self._emit_data_events:
await ctx.add_event(WorkflowEvent.emit(self.id, response))

# Handle any user input requests
if response.user_input_requests:
Expand Down Expand Up @@ -472,6 +482,8 @@ async def _run_agent_streaming(self, ctx: WorkflowContext[Never, AgentResponseUp
async for update in stream:
updates.append(update)
await ctx.yield_output(update)
if self._emit_data_events:
await ctx.add_event(WorkflowEvent.emit(self.id, update))
if update.user_input_requests:
streamed_user_input_requests.extend(update.user_input_requests)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,13 @@ async def _process_workflow_result(
else:
await asyncio.gather(*[ctx.send_message(output) for output in outputs])

# Forward data events from the sub-workflow so that intermediate
# observations (e.g. emit_data_events from AgentExecutor) are
# visible in the parent workflow's event stream.
data_events = [event for event in result if isinstance(event, WorkflowEvent) and event.type == "data"]
for data_event in data_events:
await ctx.add_event(WorkflowEvent.emit(data_event.executor_id or "", data_event.data))

# Process request info events
for event in request_info_events:
request_id = event.request_id
Expand Down
58 changes: 58 additions & 0 deletions python/packages/core/tests/workflow/test_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,61 @@ async def test_resolve_executor_kwargs_empty_per_executor_does_not_fallback_to_g
resolved = {"exec_a": {}, GLOBAL_KWARGS_KEY: {"global_key": "global_val"}}
result = executor._resolve_executor_kwargs(resolved) # pyright: ignore[reportPrivateUsage]
assert result == {}


async def test_emit_data_events_mirrors_yield_output_non_streaming() -> None:
"""When emit_data_events=True, AgentExecutor emits a data event with the AgentResponse."""
agent = _CountingAgent(id="agent_a", name="AgentA")
executor = AgentExecutor(agent, id="exec_a", emit_data_events=True)
workflow = WorkflowBuilder(start_executor=executor).build()

output_events: list[WorkflowEvent[Any]] = []
data_events: list[WorkflowEvent[Any]] = []
for event in await workflow.run("hello"):
if event.type == "output":
output_events.append(event)
elif event.type == "data":
data_events.append(event)

# Output event still emitted (existing behavior unchanged)
assert len(output_events) == 1
assert isinstance(output_events[0].data, AgentResponse)
# Plus a parallel data event with the same AgentResponse payload
assert len(data_events) == 1
assert data_events[0].executor_id == "exec_a"
assert isinstance(data_events[0].data, AgentResponse)
assert data_events[0].data.messages[0].text == output_events[0].data.messages[0].text


async def test_emit_data_events_mirrors_yield_output_streaming() -> None:
"""When emit_data_events=True and streaming, data events accompany each AgentResponseUpdate."""
agent = _CountingAgent(id="agent_a", name="AgentA")
executor = AgentExecutor(agent, id="exec_a", emit_data_events=True)
workflow = WorkflowBuilder(start_executor=executor).build()

output_updates: list[WorkflowEvent[Any]] = []
data_updates: list[WorkflowEvent[Any]] = []
async for event in workflow.run("hello", stream=True):
if event.type == "output":
output_updates.append(event)
elif event.type == "data":
data_updates.append(event)

assert output_updates and all(isinstance(e.data, AgentResponseUpdate) for e in output_updates)
assert len(data_updates) == len(output_updates)
assert all(isinstance(e.data, AgentResponseUpdate) for e in data_updates)
assert all(e.executor_id == "exec_a" for e in data_updates)


async def test_emit_data_events_default_false_no_data_events() -> None:
"""When emit_data_events is not set, no extra data events are emitted (default behavior)."""
agent = _CountingAgent(id="agent_a", name="AgentA")
executor = AgentExecutor(agent, id="exec_a") # default: emit_data_events=False
workflow = WorkflowBuilder(start_executor=executor).build()

data_events: list[WorkflowEvent[Any]] = []
for event in await workflow.run("hello"):
if event.type == "data":
data_events.append(event)

assert data_events == []
Loading
Loading