Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import logging
import sys
import uuid
from enum import Enum
from collections.abc import Mapping
from dataclasses import dataclass
from decimal import Decimal as _Decimal
from typing import Any, Literal, cast

from agent_framework import (
Executor,
Message,
WorkflowContext,
)
from agent_framework._workflows._state import State
Expand Down Expand Up @@ -120,7 +122,20 @@ def _make_powerfx_safe(value: Any) -> Any:
Returns:
A PowerFx-safe representation of the value
"""
if value is None or isinstance(value, _POWERFX_SAFE_TYPES):
if value is None:
return value

# Enum coercion must run BEFORE the primitive type check: many MAF
# enums (e.g. MessageRole) are ``str``-subclass enums, so they pass
# ``isinstance(v, str)`` but pythonnet refuses to convert them to
# ``System.String`` and raises ``'MessageRole' value cannot be
# converted to System.<X>'`` for every PowerFx primitive type. Reduce
# to the underlying value (or its string form) so PowerFx sees a
# plain ``str``/``int``.
if isinstance(value, Enum):
return _make_powerfx_safe(value.value)

if isinstance(value, _POWERFX_SAFE_TYPES):
return value

if isinstance(value, dict):
Expand Down Expand Up @@ -873,6 +888,15 @@ async def _ensure_state_initialized(
Follows .NET's DefaultTransform pattern - accepts any input type:
- dict/Mapping: Used directly as workflow.inputs
- str: Converted to {"input": value}
- list[Message]: Treated as the agent-facing message contract
(e.g. from WorkflowAgent / as_agent()). The full message list is
stored in ``Conversation.messages``/``Conversation.history`` and
mirrored to ``System.conversations.{id}.messages`` so workflows
that reference ``=Conversation.messages`` (e.g. InvokeAzureAgent)
see the complete history including assistant turns and non-text
content. The last user message's text is also used as the string
input (``Inputs.input``) and surfaced via ``System.LastMessage*``
for backward compatibility with simple text-only workflows.
- DeclarativeMessage: Internal message, no initialization needed
- Any other type: Converted via str() to {"input": str(value)}

Expand All @@ -888,6 +912,77 @@ async def _ensure_state_initialized(
if isinstance(trigger, dict):
# Structured inputs - use directly
state.initialize(trigger) # type: ignore
elif isinstance(trigger, list) and all(isinstance(m, Message) for m in trigger):
# list[Message] (e.g. from WorkflowAgent / as_agent()).
# Populate the full conversation rather than collapsing to a
# single string, so workflows that operate on the message list
# (InvokeAzureAgent with =Conversation.messages, history-aware
# agents, multi-modal content, etc.) see the complete input.
messages_list = cast(list[Message], trigger)

# Locate the trailing user message: WorkflowAgent merges session
# history with the caller's new input and forwards the combined
# list, so the most recent user message represents "this turn"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reduces the newest user turn to last_user_msg.text, but Message.text only concatenates text content. Downstream, InvokeAzureAgentExecutor reconstructs the turn as Message(role="user", contents=[input_text]), so images, tool responses, approvals, or any other non-text content on the current turn are silently dropped. A safer design is to keep the full Message in state and only derive System.LastMessageText as a compatibility view.

# (everything before it is prior history). InvokeAzureAgent's
# contract is that Conversation.messages holds PRIOR turns only -
# the executor appends the new user input itself before invoking
# the agent. To avoid duplicating the latest user turn we split
# the trigger at that boundary.
last_user_index = -1
for idx in range(len(messages_list) - 1, -1, -1):
if str(messages_list[idx].role).lower() == "user":
last_user_index = idx
break

if last_user_index >= 0:
last_user_msg = messages_list[last_user_index]
last_user_text = last_user_msg.text or ""
last_user_id = getattr(last_user_msg, "message_id", "") or ""
# Prior history excludes the latest user turn; trailing
# non-user messages (e.g. tool results) are preserved so
# later actions still see them in Conversation.messages.
history_messages = (
messages_list[:last_user_index] + messages_list[last_user_index + 1:]
)
else:
# No user message in the list - rare path (e.g. resume after
# an assistant-only sequence). Treat the whole list as prior
# history and surface the last message's text for backwards
# compatibility with =System.LastMessageText.
history_messages = list(messages_list)
tail = messages_list[-1] if messages_list else None
last_user_text = (tail.text or "") if tail is not None else ""
last_user_id = (
getattr(tail, "message_id", "") or "" if tail is not None else ""
)

# Initialize state. Using the last user text as Inputs.input
# keeps simple yamls (=inputs.input / =System.LastMessageText)
# working, and matches what InvokeAzureAgent expects to find via
# its input_text fallback chain.
state.initialize({"input": last_user_text})

# Populate Conversation.messages/.history with PRIOR turns only
# (matching the executor contract above). Raw Message objects
# are stored - matching what agent executors append at runtime.
for msg in history_messages:
state.append("Conversation.messages", msg)
state.append("Conversation.history", msg)

# Mirror to System.conversations.{ConversationId}.messages so
# actions resolving conversation-scoped paths see the same
# history.
conversation_id = state.get("System.ConversationId")
if conversation_id:
conv_path = f"System.conversations.{conversation_id}.messages"
for msg in history_messages:
state.append(conv_path, msg)

# System.LastMessage* mirrors the most recent USER message
# (matching .NET DefaultTransform semantics for agent input).
state.set("System.LastMessage", {"Text": last_user_text, "Id": last_user_id})
state.set("System.LastMessageText", last_user_text)
state.set("System.LastMessageId", last_user_id)
elif isinstance(trigger, str):
# String input - wrap in dict and populate System.LastMessage.Text
# so YAML expressions like =System.LastMessage.Text see the user input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, cast

from agent_framework import (
Message,
WorkflowContext,
handler,
)
Expand Down Expand Up @@ -492,7 +493,13 @@ class JoinExecutor(DeclarativeActionExecutor):
@handler
async def handle_action(
self,
trigger: dict[str, Any] | str | ActionTrigger | ActionComplete | ConditionResult | LoopIterationResult,
trigger: dict[str, Any]
| str
| list[Message]
| ActionTrigger
| ActionComplete
| ConditionResult
| LoopIterationResult,
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Simply pass through to continue the workflow."""
Expand Down
28 changes: 28 additions & 0 deletions python/packages/declarative/tests/test_workflow_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,34 @@ async def test_entry_join_executor_initializes_workflow_inputs_string(self):
outputs = result.get_outputs()
assert any("hello-world" in str(o) for o in outputs), f"Expected 'hello-world' in outputs but got: {outputs}"

@pytest.mark.asyncio
async def test_as_agent_round_trip_with_last_message_text(self):
"""Regression test: a declarative workflow built via WorkflowFactory must be
consumable as an AIAgent via Workflow.as_agent().

Specifically, the declarative start executor must accept list[Message]
(the input passed by WorkflowAgent) and populate System.LastMessageText
so =System.LastMessageText is resolvable in the YAML.
"""
factory = WorkflowFactory()
workflow = factory.create_workflow_from_yaml("""
name: as-agent-roundtrip-test
actions:
- kind: SetVariable
variable: Local.echo
value: =System.LastMessageText
- kind: SendActivity
activity:
text: =Local.echo
""")

agent = workflow.as_agent(name="echo-agent")
response = await agent.run("Hello there")

assert "Hello there" in response.text, (
f"Expected 'Hello there' in agent response text but got: {response.text!r}"
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only exercises the simplest case: a single user string through as_agent(). The new _ensure_state_initialized branch has ~70 lines of branching logic for multi-message history splitting, the no-user-message fallback, and Conversation.messages/history population — none of which are covered. Consider adding unit-level tests that call handle_action with multi-message list[Message] inputs and inspect state, e.g.: (1) a three-message sequence [user, assistant, user] verifying System.LastMessageText equals the last user text and Conversation.messages contains only the first two; (2) an assistant-only list verifying the no-user-message fallback populates System.LastMessageText from the tail.



class TestWorkflowFactoryAgentRegistration:
"""Tests for agent registration."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,19 @@ async def _handle_inner_workflow(
input_messages = _items_to_messages(input_items)
is_streaming_request = request.stream is not None and request.stream is True

# Fetch prior conversation history from Foundry storage so workflow
# agents see the same history their non-workflow counterparts get
# (see _handle_inner_agent which builds messages from history +
# current input). Without this, declarative workflows triggered via
# WorkflowAgent.as_agent only ever see the latest user turn, even
# though the host's checkpoint replay restores the workflow's
# internal state - declarative workflows reset Conversation.messages
# on every new run, so cross-turn context has to come from the
# message list passed in, not from checkpointed workflow state.
history = await context.get_history()
history_messages = _output_items_to_messages(history)
full_messages = [*history_messages, *input_messages]

_, are_options_set = _to_chat_options(request)
if are_options_set:
logger.warning("Workflow agent doesn't support runtime options. They will be ignored.")
Expand Down Expand Up @@ -307,7 +320,7 @@ async def _handle_inner_workflow(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing full_messages here breaks the restored-checkpoint path. _handle_inner_workflow() restores the latest checkpoint just above, and WorkflowAgent uses that to rebuild pending_requests. On resume, _process_pending_requests() calls _extract_function_responses(), which explicitly rejects non-response history content while requests are pending. A resumed approval/tool-call flow will now receive the entire transcript and fail by design. The host should pass only the new response items when resuming pending requests, and reserve full_messages for fresh turns.

if not is_streaming_request:
# Run the agent in non-streaming mode
response = await self._agent.run(input_messages, stream=False, checkpoint_storage=checkpoint_storage)
response = await self._agent.run(full_messages, stream=False, checkpoint_storage=checkpoint_storage)

for message in response.messages:
for content in message.contents:
Expand All @@ -323,7 +336,7 @@ async def _handle_inner_workflow(
tracker = _OutputItemTracker(response_event_stream)

# Run the workflow agent in streaming mode
async for update in self._agent.run(input_messages, stream=True, checkpoint_storage=checkpoint_storage):
async for update in self._agent.run(full_messages, stream=True, checkpoint_storage=checkpoint_storage):
for content in update.contents:
for event in tracker.handle(content):
yield event
Expand Down
Loading