diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index e7af9fde9a..14ea3f7fc1 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -29,6 +29,7 @@ import logging import sys import uuid +from enum import Enum from collections.abc import Mapping from dataclasses import dataclass from decimal import Decimal as _Decimal @@ -36,6 +37,7 @@ from agent_framework import ( Executor, + Message, WorkflowContext, ) from agent_framework._workflows._state import State @@ -120,7 +122,20 @@ def _make_powerfx_safe(value: Any) -> Any: Returns: A PowerFx-safe representation of the value """ - if value is None or isinstance(value, _POWERFX_SAFE_TYPES): + if value is None: + return value + + # Enum coercion must run BEFORE the primitive type check: many MAF + # enums (e.g. MessageRole) are ``str``-subclass enums, so they pass + # ``isinstance(v, str)`` but pythonnet refuses to convert them to + # ``System.String`` and raises ``'MessageRole' value cannot be + # converted to System.'`` for every PowerFx primitive type. Reduce + # to the underlying value (or its string form) so PowerFx sees a + # plain ``str``/``int``. + if isinstance(value, Enum): + return _make_powerfx_safe(value.value) + + if isinstance(value, _POWERFX_SAFE_TYPES): return value if isinstance(value, dict): @@ -873,6 +888,15 @@ async def _ensure_state_initialized( Follows .NET's DefaultTransform pattern - accepts any input type: - dict/Mapping: Used directly as workflow.inputs - str: Converted to {"input": value} + - list[Message]: Treated as the agent-facing message contract + (e.g. from WorkflowAgent / as_agent()). The full message list is + stored in ``Conversation.messages``/``Conversation.history`` and + mirrored to ``System.conversations.{id}.messages`` so workflows + that reference ``=Conversation.messages`` (e.g. InvokeAzureAgent) + see the complete history including assistant turns and non-text + content. The last user message's text is also used as the string + input (``Inputs.input``) and surfaced via ``System.LastMessage*`` + for backward compatibility with simple text-only workflows. - DeclarativeMessage: Internal message, no initialization needed - Any other type: Converted via str() to {"input": str(value)} @@ -888,6 +912,77 @@ async def _ensure_state_initialized( if isinstance(trigger, dict): # Structured inputs - use directly state.initialize(trigger) # type: ignore + elif isinstance(trigger, list) and all(isinstance(m, Message) for m in trigger): + # list[Message] (e.g. from WorkflowAgent / as_agent()). + # Populate the full conversation rather than collapsing to a + # single string, so workflows that operate on the message list + # (InvokeAzureAgent with =Conversation.messages, history-aware + # agents, multi-modal content, etc.) see the complete input. + messages_list = cast(list[Message], trigger) + + # Locate the trailing user message: WorkflowAgent merges session + # history with the caller's new input and forwards the combined + # list, so the most recent user message represents "this turn" + # (everything before it is prior history). InvokeAzureAgent's + # contract is that Conversation.messages holds PRIOR turns only - + # the executor appends the new user input itself before invoking + # the agent. To avoid duplicating the latest user turn we split + # the trigger at that boundary. + last_user_index = -1 + for idx in range(len(messages_list) - 1, -1, -1): + if str(messages_list[idx].role).lower() == "user": + last_user_index = idx + break + + if last_user_index >= 0: + last_user_msg = messages_list[last_user_index] + last_user_text = last_user_msg.text or "" + last_user_id = getattr(last_user_msg, "message_id", "") or "" + # Prior history excludes the latest user turn; trailing + # non-user messages (e.g. tool results) are preserved so + # later actions still see them in Conversation.messages. + history_messages = ( + messages_list[:last_user_index] + messages_list[last_user_index + 1:] + ) + else: + # No user message in the list - rare path (e.g. resume after + # an assistant-only sequence). Treat the whole list as prior + # history and surface the last message's text for backwards + # compatibility with =System.LastMessageText. + history_messages = list(messages_list) + tail = messages_list[-1] if messages_list else None + last_user_text = (tail.text or "") if tail is not None else "" + last_user_id = ( + getattr(tail, "message_id", "") or "" if tail is not None else "" + ) + + # Initialize state. Using the last user text as Inputs.input + # keeps simple yamls (=inputs.input / =System.LastMessageText) + # working, and matches what InvokeAzureAgent expects to find via + # its input_text fallback chain. + state.initialize({"input": last_user_text}) + + # Populate Conversation.messages/.history with PRIOR turns only + # (matching the executor contract above). Raw Message objects + # are stored - matching what agent executors append at runtime. + for msg in history_messages: + state.append("Conversation.messages", msg) + state.append("Conversation.history", msg) + + # Mirror to System.conversations.{ConversationId}.messages so + # actions resolving conversation-scoped paths see the same + # history. + conversation_id = state.get("System.ConversationId") + if conversation_id: + conv_path = f"System.conversations.{conversation_id}.messages" + for msg in history_messages: + state.append(conv_path, msg) + + # System.LastMessage* mirrors the most recent USER message + # (matching .NET DefaultTransform semantics for agent input). + state.set("System.LastMessage", {"Text": last_user_text, "Id": last_user_id}) + state.set("System.LastMessageText", last_user_text) + state.set("System.LastMessageId", last_user_id) elif isinstance(trigger, str): # String input - wrap in dict and populate System.LastMessage.Text # so YAML expressions like =System.LastMessage.Text see the user input diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py index 0aa660b3ea..f5baf80a9d 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py @@ -17,6 +17,7 @@ from typing import Any, cast from agent_framework import ( + Message, WorkflowContext, handler, ) @@ -492,7 +493,13 @@ class JoinExecutor(DeclarativeActionExecutor): @handler async def handle_action( self, - trigger: dict[str, Any] | str | ActionTrigger | ActionComplete | ConditionResult | LoopIterationResult, + trigger: dict[str, Any] + | str + | list[Message] + | ActionTrigger + | ActionComplete + | ConditionResult + | LoopIterationResult, ctx: WorkflowContext[ActionComplete], ) -> None: """Simply pass through to continue the workflow.""" diff --git a/python/packages/declarative/tests/test_workflow_factory.py b/python/packages/declarative/tests/test_workflow_factory.py index f08f5993e5..e9988ea97c 100644 --- a/python/packages/declarative/tests/test_workflow_factory.py +++ b/python/packages/declarative/tests/test_workflow_factory.py @@ -228,6 +228,34 @@ async def test_entry_join_executor_initializes_workflow_inputs_string(self): outputs = result.get_outputs() assert any("hello-world" in str(o) for o in outputs), f"Expected 'hello-world' in outputs but got: {outputs}" + @pytest.mark.asyncio + async def test_as_agent_round_trip_with_last_message_text(self): + """Regression test: a declarative workflow built via WorkflowFactory must be + consumable as an AIAgent via Workflow.as_agent(). + + Specifically, the declarative start executor must accept list[Message] + (the input passed by WorkflowAgent) and populate System.LastMessageText + so =System.LastMessageText is resolvable in the YAML. + """ + factory = WorkflowFactory() + workflow = factory.create_workflow_from_yaml(""" +name: as-agent-roundtrip-test +actions: + - kind: SetVariable + variable: Local.echo + value: =System.LastMessageText + - kind: SendActivity + activity: + text: =Local.echo +""") + + agent = workflow.as_agent(name="echo-agent") + response = await agent.run("Hello there") + + assert "Hello there" in response.text, ( + f"Expected 'Hello there' in agent response text but got: {response.text!r}" + ) + class TestWorkflowFactoryAgentRegistration: """Tests for agent registration.""" diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index 9078c59d22..a6238b746a 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -256,6 +256,19 @@ async def _handle_inner_workflow( input_messages = _items_to_messages(input_items) is_streaming_request = request.stream is not None and request.stream is True + # Fetch prior conversation history from Foundry storage so workflow + # agents see the same history their non-workflow counterparts get + # (see _handle_inner_agent which builds messages from history + + # current input). Without this, declarative workflows triggered via + # WorkflowAgent.as_agent only ever see the latest user turn, even + # though the host's checkpoint replay restores the workflow's + # internal state - declarative workflows reset Conversation.messages + # on every new run, so cross-turn context has to come from the + # message list passed in, not from checkpointed workflow state. + history = await context.get_history() + history_messages = _output_items_to_messages(history) + full_messages = [*history_messages, *input_messages] + _, are_options_set = _to_chat_options(request) if are_options_set: logger.warning("Workflow agent doesn't support runtime options. They will be ignored.") @@ -307,7 +320,7 @@ async def _handle_inner_workflow( if not is_streaming_request: # Run the agent in non-streaming mode - response = await self._agent.run(input_messages, stream=False, checkpoint_storage=checkpoint_storage) + response = await self._agent.run(full_messages, stream=False, checkpoint_storage=checkpoint_storage) for message in response.messages: for content in message.contents: @@ -323,7 +336,7 @@ async def _handle_inner_workflow( tracker = _OutputItemTracker(response_event_stream) # Run the workflow agent in streaming mode - async for update in self._agent.run(input_messages, stream=True, checkpoint_storage=checkpoint_storage): + async for update in self._agent.run(full_messages, stream=True, checkpoint_storage=checkpoint_storage): for content in update.contents: for event in tracker.handle(content): yield event