diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 2bcc6d355e..626a02199b 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -59,6 +59,62 @@ class AgentExecutorResponse: agent_response: AgentResponse full_conversation: list[Message] + def with_text(self, text: str) -> "AgentExecutorResponse": + """Create a new AgentExecutorResponse with replaced text, preserving the conversation history. + + Use this in custom executors that transform agent output text (e.g. upper-casing, summarising) + when you need downstream AgentExecutors to still have access to the full prior conversation. + + Without this helper, sending a plain ``str`` from a custom executor breaks the context chain: + the downstream ``AgentExecutor.from_str`` handler only adds that one string to its cache and + loses all prior messages. By using ``with_text`` the response type stays + ``AgentExecutorResponse``, so ``AgentExecutor.from_response`` is invoked instead and the full + conversation is preserved. + + Args: + text: The replacement assistant message text. + + Returns: + A new ``AgentExecutorResponse`` whose ``agent_response`` contains a single assistant + message with ``text``, and whose ``full_conversation`` is the prior conversation + (everything before the original agent turn) followed by the new assistant message. + + Example: + .. code-block:: python + + from agent_framework import AgentExecutorResponse, WorkflowContext, executor + + + @executor( + id="upper_case_executor", + input=AgentExecutorResponse, + output=AgentExecutorResponse, + workflow_output=str, + ) + async def upper_case( + response: AgentExecutorResponse, + ctx: WorkflowContext[AgentExecutorResponse, str], + ) -> None: + upper_text = response.agent_response.text.upper() + await ctx.send_message(response.with_text(upper_text)) + await ctx.yield_output(upper_text) + """ + new_message = Message("assistant", [text]) + new_agent_response = AgentResponse(messages=[new_message]) + + # Strip off the original agent turn and replace with the new text. + n_agent_messages = len(self.agent_response.messages) + prior_messages = ( + self.full_conversation[:-n_agent_messages] if n_agent_messages else list(self.full_conversation) + ) + new_full_conversation = [*prior_messages, new_message] + + return AgentExecutorResponse( + executor_id=self.executor_id, + agent_response=new_agent_response, + full_conversation=new_full_conversation, + ) + class AgentExecutor(Executor): """built-in executor that wraps an agent for handling messages. @@ -183,7 +239,25 @@ async def from_str( """Accept a raw user prompt string and run the agent. The new string input will be added to the cache which is used as the conversation context for the agent run. + + Warning: + If the upstream executor received an ``AgentExecutorResponse`` but emits a plain + ``str``, this handler will be invoked instead of ``from_response``. This resets + the conversation context because only the new string is added to the cache and + all prior messages from the upstream agent are lost. + + To preserve the full conversation when transforming agent output in a custom + executor, use ``AgentExecutorResponse.with_text(...)`` so that the message type + stays ``AgentExecutorResponse`` and ``from_response`` is called instead. """ + if not self._cache and ctx.source_executor_ids != ["Workflow"]: + logger.warning( + "AgentExecutor '%s': from_str handler invoked with an empty cache. " + "If you are chaining from an AgentExecutor, the upstream custom executor may be " + "emitting a plain str instead of using AgentExecutorResponse.with_text(...), " + "which causes the full conversation context to be lost.", + self.id, + ) self._cache.extend(normalize_messages_input(text)) await self._run_agent_and_emit(ctx) diff --git a/python/packages/core/agent_framework/_workflows/_function_executor.py b/python/packages/core/agent_framework/_workflows/_function_executor.py index 038d12cf89..0d46c0daa3 100644 --- a/python/packages/core/agent_framework/_workflows/_function_executor.py +++ b/python/packages/core/agent_framework/_workflows/_function_executor.py @@ -268,6 +268,19 @@ async def process(self, data: str, ctx: WorkflowContext[str]): forward references. When provided, takes precedence over introspection from the ``WorkflowContext`` second generic parameter (W_OutT). + Warning: + When placing a custom ``@executor`` **between** two ``AgentExecutor`` nodes, be + careful about the output type. If the custom executor receives an + ``AgentExecutorResponse`` but emits a plain ``str``, the downstream + ``AgentExecutor.from_str`` handler is invoked instead of ``from_response``. + This resets the conversation context because only the new string is added to + the cache and all prior messages from the upstream agent are lost. + + To preserve the full conversation, use + ``AgentExecutorResponse.with_text(new_text)`` to create a new response that + keeps the prior history, and set ``output=AgentExecutorResponse`` on the + decorator. + Returns: A FunctionExecutor instance that can be wired into a Workflow. diff --git a/python/packages/core/tests/workflow/test_full_conversation.py b/python/packages/core/tests/workflow/test_full_conversation.py index b38b9400a2..5d9ce45018 100644 --- a/python/packages/core/tests/workflow/test_full_conversation.py +++ b/python/packages/core/tests/workflow/test_full_conversation.py @@ -23,6 +23,7 @@ WorkflowBuilder, WorkflowContext, WorkflowRunState, + executor, handler, ) from agent_framework.orchestrations import SequentialBuilder @@ -478,3 +479,90 @@ async def test_from_response_preserves_service_session_id() -> None: assert result.get_outputs() is not None assert spy_agent._captured_service_session_id == "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] + + +@executor( + id="upper_case_executor", + input=AgentExecutorResponse, + output=AgentExecutorResponse, + workflow_output=str, +) +async def _upper_case_executor( + response: AgentExecutorResponse, + ctx: WorkflowContext[AgentExecutorResponse, str], +) -> None: + upper_text = response.agent_response.text.upper() + await ctx.send_message(response.with_text(upper_text)) + await ctx.yield_output(upper_text) + + +async def test_with_text_preserves_full_conversation_through_custom_executor() -> None: + """Custom executor using with_text must preserve the full conversation chain.""" + # Mirrors the reproduction from issue #5246: + # agent1 ("User likes sky red") -> agent2 ("User likes sky blue") -> upper_case -> agent3 ("User likes sky green") + agent1 = AgentExecutor( + _SimpleAgent(id="agent1", name="ContextAgent1", reply_text="User likes sky red"), id="agent1" + ) + agent2 = AgentExecutor( + _SimpleAgent(id="agent2", name="ContextAgent2", reply_text="User likes sky blue"), id="agent2" + ) + agent3 = AgentExecutor( + _SimpleAgent(id="agent3", name="ContextAgent3", reply_text="User likes sky green"), id="agent3" + ) + capturer = _CaptureFullConversation(id="capture") + + wf = ( + WorkflowBuilder(start_executor=agent1, output_executors=[capturer]) + .add_chain([agent1, agent2, _upper_case_executor, agent3, capturer]) + .build() + ) + + result = await wf.run("") + payload = next(o for o in result.get_outputs() if isinstance(o, dict)) + + # The final agent must see the full conversation: user, agent1, UPPER(agent2), agent3 + assert payload["roles"] == ["user", "assistant", "assistant", "assistant"] + assert payload["texts"][1] == "User likes sky red" + assert payload["texts"][2] == "USER LIKES SKY BLUE" + assert payload["texts"][3] == "User likes sky green" + + +async def test_with_text_does_not_mutate_original() -> None: + """with_text returns a new instance; the original must be unmodified.""" + original = AgentExecutorResponse( + executor_id="test_exec", + agent_response=AgentResponse(messages=[Message("assistant", ["original reply"])]), + full_conversation=[Message("user", ["prompt"]), Message("assistant", ["original reply"])], + ) + + new = original.with_text("transformed reply") + + assert new is not original + assert new.agent_response.text == "transformed reply" + assert new.full_conversation[-1].text == "transformed reply" + assert new.full_conversation[-1].role == "assistant" + # Original unchanged + assert original.agent_response.text == "original reply" + assert original.full_conversation[-1].text == "original reply" + + +async def test_with_text_strips_multi_message_agent_turn() -> None: + """When the agent turn has multiple messages (tool calls), with_text strips all of them.""" + tool_call = Message("assistant", [""]) + tool_result = Message("tool", [""]) + final_reply = Message("assistant", ["actual answer"]) + user_msg = Message("user", ["question"]) + + original = AgentExecutorResponse( + executor_id="exec", + agent_response=AgentResponse(messages=[tool_call, tool_result, final_reply]), + full_conversation=[user_msg, tool_call, tool_result, final_reply], + ) + + new = original.with_text("summarised answer") + + # Only the pre-agent-turn messages should remain, plus the replacement + assert len(new.full_conversation) == 2 + assert new.full_conversation[0].text == "question" + assert new.full_conversation[1].text == "summarised answer" + assert new.agent_response.text == "summarised answer"