# Microsoft Agent Framework - Learning Notebook

**Purpose:** This notebook is designed to explore all the capabilities of the Microsoft Agent Framework for learning and experimentation.

---

## What is Microsoft Agent Framework?

Microsoft Agent Framework is an **open-source development kit** for building AI agents and multi-agent workflows for .NET and Python. It brings together and extends ideas from **Semantic Kernel** and **AutoGen** projects, combining their strengths while adding new capabilities.

### Key Capabilities

| Category | Description |
|----------|-------------|
| **AI Agents** | Individual agents that use LLMs to process user inputs, call tools and MCP servers to perform actions, and generate responses |
| **Workflows** | Graph-based workflows that connect multiple agents and functions to perform complex, multi-step tasks |

### Building Blocks

The framework provides foundational components:
- **Model Clients** - Chat completions and responses (Azure OpenAI, OpenAI, Azure AI)
- **Agent Session** - State management for conversations
- **Context Providers** - Agent memory capabilities
- **Middleware** - Intercepting agent actions
- **MCP Clients** - Tool integration via Model Context Protocol

### When to Use AI Agents

AI agents excel at:
- üéß **Customer Support** - Multi-modal queries with tool lookups
- üìö **Education & Tutoring** - Personalized learning with knowledge bases
- üíª **Code Generation** - Implementation, reviews, and debugging
- üî¨ **Research Assistance** - Web search, document summarization

### When NOT to Use AI Agents

> *"If you can write a function to handle the task, do that instead of using an AI agent."*

Avoid agents for:
- Highly structured tasks with predefined rules
- Well-defined sequences of operations
- Tasks requiring more than ~20 tools (use workflows instead)

---

## Prerequisites

Before running this notebook:

1. ‚úÖ **Azure subscription** with access to Azure OpenAI
2. ‚úÖ **Azure OpenAI resource** with a deployed model (e.g., `gpt-4o-mini`)
3. ‚úÖ **Azure CLI** installed and authenticated (`az login`)
4. ‚úÖ **`.env` file** with your configuration (see README.md)

## Install Python Packages

To use Microsoft Agent Framework with Azure OpenAI, install the following Python packages:

```bash
pip install agent-framework --pre python-dotenv nest_asyncio
```

> **Note:** `nest_asyncio` is required for Python 3.10 compatibility to allow nested event loops in Jupyter notebooks.

In [None]:
%pip install agent-framework --pre python-dotenv nest_asyncio

## Load Environment Variables

The `.env` file contains your Azure OpenAI configuration. The `python-dotenv` library loads these variables into the environment so the SDK can access them automatically.

Required variables:
- `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` - Your deployed model name
- `AZURE_OPENAI_ENDPOINT` - Your Azure OpenAI endpoint URL
- `AZURE_OPENAI_API_KEY` - Your API key (optional if using Azure CLI auth)
- `API_VERSION` - The API version to use

In [None]:
import nest_asyncio
nest_asyncio.apply()

from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

print("Environment variables loaded successfully")

## Create the Agent

First, create a chat client for communicating with Azure OpenAI using the environment variables configured earlier.

Then, create the agent by providing instructions and a name for the agent.

In [None]:
import asyncio
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions="You are good at telling jokes.",
    name="Joker"
)

## Run the Agent

To run the agent, call the `run` method on the agent instance, providing the user input. The agent will return a response object, and accessing the `.text` property provides the text result from the agent.

In [None]:
async def run_agent():
    result = await agent.run("Tell me a joke about a pirate.")
    print(result.text)

asyncio.run(run_agent())

```markdown
## Run the Agent with Streaming

To run the agent with streaming, call the `run_stream` method on the agent instance, providing the user input. The agent will stream a list of update objects, and accessing the `.text` property on each update object provides the part of the text result contained in that update.
```

In [None]:
# Stream the response token by token
async def stream_agent():
    async for update in agent.run_stream("Tell me a joke about a pirate."):
        if update.text:
            print(update.text, end="", flush=True)
    print()  # New line after streaming is complete

asyncio.run(stream_agent())

## Multimodal Input with ChatMessage

You can pass `ChatMessage` objects with multiple content types, including text and images, to enable multimodal interactions with the agent. This is useful when you want the agent to analyze or respond to visual content along with textual instructions.

In [None]:
from agent_framework import ChatMessage, Content, Role

message = ChatMessage(
    role=Role.USER,
    contents=[
        Content.from_text("Tell me a joke about this image?"),
        Content.from_uri("https://media.gettyimages.com/id/1195994877/vector/democratic-donkey-and-republican-elephant-in-tv-debate.jpg?s=612x612&w=gi&k=20&c=1K-OwflyABXdG_xIbo_n7Ph3CRzI63vGx5G_sKmQz-Y=", media_type="image/jpg")
    ]
)

result = await agent.run(message)
print(result.text)

## Multi-Turn Conversations

Agents are **stateless** and do not maintain any state internally between calls. To have a multi-turn conversation with an agent, you need to create an object to hold the conversation state and pass this object to the agent when running it.

### Creating a Thread

To create the conversation state object, call the `get_new_thread()` method on the agent instance:

In [None]:
thread = agent.get_new_thread()

### Running with a Thread

You can then pass this thread object to the `run` and `run_stream` methods on the agent instance, along with the user input. This maintains the conversation state between calls, allowing the agent to refer to previous messages:

In [None]:
# First turn
result1 = await agent.run("Tell me a joke about a pirate.", thread=thread)
print("Turn 1:")
print(result1.text)
print()

# Second turn - agent remembers the previous joke
result2 = await agent.run("Now add some emojis to the joke and tell it in the voice of a pirate's parrot.", thread=thread)
print("Turn 2:")
print(result2.text)

### How Conversation History Works

> ‚ö†Ô∏è **Important:** The type of service used by the agent determines how conversation history is stored:
> 
> - **Chat Completion service** (like this example): Conversation history is stored in the `AgentThread` object and sent to the service on each call.
> - **Azure AI Agent service**: Conversation history is stored in the Azure AI Agent service and only a reference to the conversation is sent on each call.

### Multiple Independent Conversations

It's possible to have multiple, independent conversations with the same agent instance by creating multiple `AgentThread` objects. Since the agent doesn't maintain any state internally, these conversations will be fully independent:

In [None]:
# Create two separate conversation threads
thread1 = agent.get_new_thread()
thread2 = agent.get_new_thread()

# Start conversation 1 - about pirates
result1 = await agent.run("Tell me a joke about a pirate.", thread=thread1)
print("Thread 1 - Turn 1:")
print(result1.text)
print()

# Start conversation 2 - about robots (independent)
result2 = await agent.run("Tell me a joke about a robot.", thread=thread2)
print("Thread 2 - Turn 1:")
print(result2.text)
print()

# Continue conversation 1 - agent remembers the pirate joke
result3 = await agent.run("Now add some emojis to the joke and tell it in the voice of a pirate's parrot.", thread=thread1)
print("Thread 1 - Turn 2:")
print(result3.text)
print()

# Continue conversation 2 - agent remembers the robot joke
result4 = await agent.run("Now add some emojis to the joke and tell it in the voice of a robot.", thread=thread2)
print("Thread 2 - Turn 2:")
print(result4.text)

## Using Function Tools with an Agent

Function tools allow agents to call custom code when needed. You can turn any Python function into a function tool by passing it to the agent's `tools` parameter.

> ‚ö†Ô∏è **Important:** Not all agent types support function tools. Some might only support custom built-in tools. Agents created via chat clients (like this example) do support function tools.

### Creating a Simple Function Tool

Use Python's type annotations with `Annotated` and Pydantic's `Field` to provide descriptions that help the agent choose between different functions:

In [None]:
from typing import Annotated
from pydantic import Field

def get_weather(
    location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
    """Get the weather for a given location."""
    return f"The weather in {location} is cloudy with a high of 15¬∞C."

### Using the @tool Decorator

You can also use the `@tool` decorator to explicitly specify the function's name and description. If you don't specify them, the framework will automatically use the function's name and docstring as fallbacks:

In [None]:
from agent_framework import tool

@tool(name="weather_tool", description="Retrieves weather information for any location")
def get_weather_with_decorator(
    location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
    return f"The weather in {location} is cloudy with a high of 15¬∞C."

### Creating an Agent with Tools

When creating the agent, pass the function tool to the `tools` parameter:

In [None]:
# Create an agent with the weather tool
weather_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions="You are a helpful assistant that can check the weather.",
    tools=get_weather
)

### Running the Agent with Tools

The agent will automatically call the `get_weather` function when needed:

In [None]:
# Ask about the weather - the agent will call our function tool
result = await weather_agent.run("What is the weather like in Amsterdam?")
print(result.text)

### Class with Multiple Function Tools

You can create a class containing multiple function tools as methods. This is useful for organizing related functions together or when you want to share state between them:

In [None]:
class WeatherTools:
    def __init__(self):
        self.last_location = None

    def get_weather(
        self,
        location: Annotated[str, Field(description="The location to get the weather for.")],
    ) -> str:
        """Get the weather for a given location."""
        self.last_location = location
        return f"The weather in {location} is cloudy with a high of 15¬∞C."

    def get_weather_details(self) -> str:
        """Get the detailed weather for the last requested location."""
        if self.last_location is None:
            return "No location specified yet."
        return f"The detailed weather in {self.last_location} is cloudy with a high of 15¬∞C, low of 7¬∞C, and 60% humidity."

### Using Class Methods as Tools

Create an instance of the class and pass its methods to the agent's `tools` parameter:

In [None]:
# Create instance and agent with multiple tools
weather_tools = WeatherTools()

multi_tool_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions="You are a helpful weather assistant. You can get basic weather info and detailed weather info.",
    tools=[weather_tools.get_weather, weather_tools.get_weather_details]
)

# Test the multi-tool agent
result = await multi_tool_agent.run("What's the weather in Tokyo? Then give me the detailed forecast.")
print(result.text)

## Human-in-the-Loop: Function Tools Requiring Approval

When agents require user input (e.g., to approve a function call), this is called a **human-in-the-loop** pattern. An agent run that requires user input will complete with a response indicating what input is needed, instead of completing with a final answer.

The caller is responsible for getting the required input from the user and passing it back to the agent in a new run.

### Creating Tools with Approval Requirements

Use the `approval_mode="always_require"` parameter in the `@tool` decorator to indicate a function requires human approval:

In [None]:
# Standard tool - no approval needed
@tool
def get_basic_weather(location: Annotated[str, "The city and state, e.g. San Francisco, CA"]) -> str:
    """Get the current weather for a given location."""
    return f"The weather in {location} is cloudy with a high of 15¬∞C."

# Tool that requires approval before execution
@tool(approval_mode="always_require")
def get_detailed_weather(location: Annotated[str, "The city and state, e.g. San Francisco, CA"]) -> str:
    """Get detailed weather information for a given location (requires approval)."""
    return f"The weather in {location} is cloudy with a high of 15¬∞C, humidity 88%, wind 12 km/h NW."

### Creating the Agent with Approval-Required Tools

In [None]:
# Create an agent with both regular and approval-required tools
approval_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions="You are a helpful weather assistant. Use get_detailed_weather for detailed forecasts.",
    tools=[get_basic_weather, get_detailed_weather]
)

### Checking for Approval Requests

When the agent needs to call an approval-required function, it returns a response with `user_input_requests` instead of executing the function directly:

In [None]:
# Ask for detailed weather - this will trigger an approval request
result = await approval_agent.run("What is the detailed weather like in Amsterdam?")

# Check if approval is needed
if result.user_input_requests:
    print("üîí Approval Required!")
    for user_input_needed in result.user_input_requests:
        print(f"  Function: {user_input_needed.function_call.name}")
        print(f"  Arguments: {user_input_needed.function_call.arguments}")
else:
    print(result.text)

### Providing Approval and Continuing

Once you have the approval request, use `to_function_approval_response()` on the Content object - pass `True` to approve or `False` to reject. Then continue the conversation with the approval:

In [None]:
# If there was an approval request, provide approval and continue
if result.user_input_requests:
    user_input_needed = result.user_input_requests[0]
    
    # Simulate user approval (in a real app, this would be interactive)
    user_approval = True  # Set to False to reject
    print(f"‚úÖ User approved: {user_approval}")
    
    # Create the approval response message using to_function_approval_response()
    approval_message = ChatMessage(
        role=Role.USER,
        contents=[user_input_needed.to_function_approval_response(user_approval)]
    )
    
    # Continue the conversation with the approval
    final_result = await approval_agent.run([
        "What is the detailed weather like in Amsterdam?",
        ChatMessage(role=Role.ASSISTANT, contents=[user_input_needed]),
        approval_message
    ])
    print(f"\nüìä Final Result:\n{final_result.text}")

## Adding Middleware to Agents

Middleware allows you to **intercept and modify agent interactions** at different points in the execution pipeline. This is a powerful pattern for implementing cross-cutting concerns without modifying your core agent logic.

### Why Use Middleware?

| Use Case | Description |
|----------|-------------|
| **Logging & Monitoring** | Track all agent runs and function calls for debugging and analytics |
| **Security** | Validate inputs, sanitize outputs, enforce access controls |
| **Rate Limiting** | Control how often agents or functions can be called |
| **Caching** | Cache expensive function results to improve performance |
| **Error Handling** | Implement retry logic or graceful degradation |
| **Metrics & Telemetry** | Collect timing data and usage statistics |

### Real-World Example Use Cases

1. **Production Logging**: Log every agent interaction to a monitoring system (e.g., Azure Monitor, Datadog)
2. **Cost Control**: Track token usage and enforce budget limits per user
3. **Compliance**: Audit all AI interactions for regulated industries (healthcare, finance)
4. **A/B Testing**: Route requests to different model versions and compare results

### Creating Agent Middleware

Agent middleware intercepts the entire agent execution. It receives an `AgentRunContext` and a `next` function to continue execution:

In [None]:
from typing import Callable, Awaitable
from agent_framework import AgentRunContext
import time

async def logging_agent_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
    """Simple middleware that logs agent execution with timing."""
    # context.messages contains the input messages
    print(f"üöÄ Agent starting... (messages: {len(context.messages)} message(s))")
    start_time = time.time()

    # Continue to agent execution
    await next(context)

    elapsed = time.time() - start_time
    print(f"‚úÖ Agent finished! (took {elapsed:.2f}s)")

### Adding Middleware to Your Agent

Pass the middleware function to the `middleware` parameter when creating your agent:

In [None]:
# Create an agent with the logging middleware
middleware_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="GreetingAgent",
    instructions="You are a friendly greeting assistant.",
    middleware=[logging_agent_middleware]  # Add middleware here (must be a list)
)

# Test the agent - you'll see the middleware logs
result = await middleware_agent.run("Hello! How are you today?")
print(f"\nüí¨ Response: {result.text}")

### Function Middleware

If your agent uses function tools, you can intercept function calls with `FunctionInvocationContext`. This is useful for logging function usage, validating inputs, or caching results:

In [None]:
from agent_framework import FunctionInvocationContext
from datetime import datetime

def get_current_time() -> str:
    """Get the current time."""
    return datetime.now().strftime("%H:%M:%S")

async def logging_function_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]],
) -> None:
    """Middleware that logs function calls with inputs and outputs."""
    print(f"üìû Calling function: {context.function.name}")
    print(f"   Arguments: {context.arguments}")

    await next(context)

    print(f"   Result: {context.result}")

# Create agent with both the function tool and function middleware
time_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="TimeAgent",
    instructions="You can tell the current time when asked.",
    tools=[get_current_time],
    middleware=[logging_function_middleware]  # Function middleware
)

# Test the agent - you'll see function call logs
result = await time_agent.run("What time is it right now?")
print(f"\nüí¨ Response: {result.text}")

## Adding Memory to an Agent

Memory allows agents to **remember information across conversations** and provide personalized responses. This is implemented using `ContextProvider`, which can run custom logic before and after agent invocations.

### Why Use Memory?

| Use Case | Description |
|----------|-------------|
| **User Personalization** | Remember user preferences, name, settings |
| **Conversation Context** | Track topics discussed across sessions |
| **Learning & Adaptation** | Build knowledge about user over time |
| **State Management** | Persist important information between runs |

### Real-World Example Use Cases

1. **Customer Support Bot** - Remember customer's order history and preferences
2. **Personal Assistant** - Learn user's schedule, preferences, and communication style
3. **Educational Tutor** - Track student's progress and areas needing improvement
4. **Healthcare Assistant** - Remember patient's medical history and medications

> ‚ö†Ô∏è **Important:** Not all agent types support `ContextProvider`. The `ChatAgent` used in this example does support it.

### How ContextProvider Works

The `ContextProvider` class has two key methods:

| Method | When Called | Purpose |
|--------|-------------|---------|
| `invoking` | Before inference | Provide additional context (instructions, tools, messages) |
| `invoked` | After inference | Inspect request/response and update state |

### Step 1: Create a Model for Memories

First, define a Pydantic model to hold the information you want to remember:

In [None]:
from pydantic import BaseModel

class UserInfo(BaseModel):
    """Model to store user information in memory."""
    name: str | None = None
    age: int | None = None

### Step 2: Implement the ContextProvider

Create a custom `ContextProvider` that:
- **Extracts** user information from messages after each call (`invoked`)
- **Provides** remembered context before each call (`invoking`)
- **Serializes** state for persistence (`serialize`)

In [None]:
from collections.abc import MutableSequence, Sequence
from typing import Any
import re

from agent_framework import ContextProvider, Context, ChatAgent, ChatClientProtocol, ChatMessage


class UserInfoMemory(ContextProvider):
    """A memory component that remembers user's name and age."""
    
    def __init__(self, chat_client: ChatClientProtocol, user_info: UserInfo | None = None, **kwargs: Any):
        """Create the memory.
        
        Args:
            chat_client: The chat client to use for extracting information
            user_info: Optional pre-existing user info
            **kwargs: Optional fields to initialize UserInfo with
        """
        self._chat_client = chat_client
        if user_info:
            self.user_info = user_info
        elif kwargs:
            self.user_info = UserInfo.model_validate(kwargs)
        else:
            self.user_info = UserInfo()

    def _extract_name(self, text: str) -> str | None:
        """Extract name from text using patterns."""
        patterns = [
            r"(?:my name is|i'm|i am|call me)\s+([A-Z][a-z]+)",
            r"(?:name is|name's)\s+([A-Z][a-z]+)",
        ]
        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                return match.group(1).capitalize()
        return None

    def _extract_age(self, text: str) -> int | None:
        """Extract age from text using patterns."""
        patterns = [
            r"(?:i'm|i am|am)\s+(\d{1,3})\s*(?:years? old|yrs? old)?",
            r"(\d{1,3})\s*years? old",
            r"age\s*(?:is)?\s*(\d{1,3})",
        ]
        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                age = int(match.group(1))
                if 0 < age < 150:  # Reasonable age range
                    return age
        return None

    async def invoked(
        self,
        request_messages: ChatMessage | Sequence[ChatMessage],
        response_messages: ChatMessage | Sequence[ChatMessage] | None = None,
        invoke_exception: Exception | None = None,
        **kwargs: Any,
    ) -> None:
        """Extract user information from messages after each agent call."""
        # Ensure request_messages is a list
        messages_list = [request_messages] if isinstance(request_messages, ChatMessage) else list(request_messages)

        # Look for user messages and extract info
        for msg in messages_list:
            if msg.role.value == "user":
                # Get the text content from the message
                text = ""
                if msg.contents:
                    for content in msg.contents:
                        if hasattr(content, 'text'):
                            text += content.text + " "
                
                # Try to extract name if not already known
                if self.user_info.name is None:
                    extracted_name = self._extract_name(text)
                    if extracted_name:
                        self.user_info.name = extracted_name
                        print(f"   üß† Memory updated: name = {extracted_name}")
                
                # Try to extract age if not already known
                if self.user_info.age is None:
                    extracted_age = self._extract_age(text)
                    if extracted_age:
                        self.user_info.age = extracted_age
                        print(f"   üß† Memory updated: age = {extracted_age}")

    async def invoking(self, messages: ChatMessage | MutableSequence[ChatMessage], **kwargs: Any) -> Context:
        """Provide user information context before each agent call."""
        instructions: list[str] = []

        if self.user_info.name is None:
            instructions.append(
                "Ask the user for their name and politely decline to answer any questions until they provide it."
            )
        else:
            instructions.append(f"The user's name is {self.user_info.name}.")

        if self.user_info.age is None:
            instructions.append(
                "Ask the user for their age and politely decline to answer any questions until they provide it."
            )
        else:
            instructions.append(f"The user's age is {self.user_info.age}.")

        # Return context with additional instructions
        return Context(instructions=" ".join(instructions))

    def serialize(self) -> str:
        """Serialize the user info for thread persistence."""
        return self.user_info.model_dump_json()

print("‚úÖ UserInfoMemory class defined with regex-based extraction")

### Step 3: Create an Agent with Memory

Attach the `ContextProvider` to the agent using the `context_providers` parameter:

In [None]:
# Create a chat client for the memory provider to use
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

# Create the memory provider
memory_provider = UserInfoMemory(chat_client)

# Create the agent with memory using ChatAgent
memory_agent = ChatAgent(
    chat_client=chat_client,
    instructions="You are a friendly assistant. Always address the user by their name when known.",
    context_providers=memory_provider,
)

print("‚úÖ Agent with memory created!")

### Step 4: Test the Memory in Action

Watch how the agent remembers information across the conversation:

In [None]:
# Create a new thread for the conversation
memory_thread = memory_agent.get_new_thread()

# Turn 1: Ask a question without providing name/age
print("Turn 1: Asking a question without providing info")
print("-" * 50)
result1 = await memory_agent.run("Hello, what is the square root of 9?", thread=memory_thread)
print(f"Agent: {result1.text}")
print()

# Turn 2: Provide name
print("Turn 2: Providing name")
print("-" * 50)
result2 = await memory_agent.run("My name is Alex", thread=memory_thread)
print(f"Agent: {result2.text}")
print()

# Turn 3: Provide age
print("Turn 3: Providing age")
print("-" * 50)
result3 = await memory_agent.run("I am 25 years old", thread=memory_thread)
print(f"Agent: {result3.text}")
print()

# Turn 4: Now ask the original question - agent should use our name!
print("Turn 4: Asking the original question again")
print("-" * 50)
result4 = await memory_agent.run("Now can you tell me the square root of 9?", thread=memory_thread)
print(f"Agent: {result4.text}")

### Step 5: Inspect the Memory State

You can access the memory provider through the thread to see what was remembered:

In [None]:
# Access the memory component and inspect what was remembered
print("üß† Memory State:")
print("=" * 50)
print(f"   Name: {memory_provider.user_info.name}")
print(f"   Age: {memory_provider.user_info.age}")
print()

# Show serialized state (useful for persistence)
print("üì¶ Serialized State (for persistence):")
print(f"   {memory_provider.serialize()}")

### Pre-Populated Memory

You can also create a memory provider with pre-existing information:

In [None]:
# Create memory with pre-existing user info
pre_populated_memory = UserInfoMemory(
    chat_client=chat_client,
    user_info=UserInfo(name="Jordan", age=30)
)

# Create agent with pre-populated memory
pre_populated_agent = ChatAgent(
    chat_client=chat_client,
    instructions="You are a friendly assistant. Always address the user by their name.",
    context_providers=pre_populated_memory,
)

# The agent already knows the user!
result = await pre_populated_agent.run("What's 2 + 2?")
print(f"Agent (with pre-populated memory): {result.text}")

## Sequential Workflows

Workflows connect multiple **executors** (processing nodes) into a pipeline.

| Concept | Description |
|---------|-------------|
| **Executor** | A unit of work (class with `@handler` OR function with `@executor`) |
| **WorkflowBuilder** | Connects executors with `add_edge()` |
| `ctx.send_message()` | Pass data to next executor |
| `ctx.yield_output()` | Return final workflow result |

### Two Ways to Define Executors

1. **Class-based** - `class MyExecutor(Executor)` with `@handler` method
2. **Function-based** - `@executor` decorator on async function

In [None]:
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, Executor, executor, handler

# Method 1: Class-based executor
class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert text to uppercase and send to next executor."""
        result = text.upper()
        await ctx.send_message(result)  # Pass to next node

# Method 2: Function-based executor
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse text and yield final output."""
    result = text[::-1]
    await ctx.yield_output(result)  # Final workflow output

print("‚úÖ Executors defined")

### Build & Run the Workflow

In [None]:
# Build the workflow: UpperCase -> ReverseText
upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)  # Connect executors
    .set_start_executor(upper_case)       # Entry point
    .build()
)

# Run with streaming events
async for event in workflow.run_stream("hello world"):
    print(f"Event: {event}")
    if isinstance(event, WorkflowOutputEvent):
        print(f"\nüéØ Final Result: {event.data}")

### Quick Reference

```
WorkflowContext[T_Out]           ‚Üí sends T_Out to next node via send_message()
WorkflowContext[T_Out, T_W_Out]  ‚Üí also yields T_W_Out as workflow output
WorkflowContext[Never, str]      ‚Üí terminal node, yields str output only
```

**Events:** `ExecutorInvokedEvent` ‚Üí `ExecutorCompletedEvent` ‚Üí `WorkflowOutputEvent`

## Concurrent Workflows (Fan-Out/Fan-In)

Process data in **parallel** using fan-out/fan-in patterns:

| Pattern | Method | Description |
|---------|--------|-------------|
| **Fan-Out** | `add_fan_out_edges(src, [a, b])` | Send same input to multiple executors |
| **Fan-In** | `add_fan_in_edges([a, b], dest)` | Collect results into a list |

```
         ‚îå‚îÄ‚ñ∫ Average ‚îÄ‚îê
Input ‚îÄ‚ñ∫ Dispatcher ‚îÄ‚î§              ‚îú‚îÄ‚ñ∫ Aggregator ‚îÄ‚ñ∫ Output
         ‚îî‚îÄ‚ñ∫ Sum ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

In [None]:
import random

# Dispatcher: sends input to parallel executors
class Dispatcher(Executor):
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        await ctx.send_message(numbers)

# Parallel executors (run concurrently)
class Average(Executor):
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        await ctx.send_message(sum(numbers) / len(numbers))

class Sum(Executor):
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        await ctx.send_message(sum(numbers))

# Aggregator: collects results as list[int | float]
class Aggregator(Executor):
    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        await ctx.yield_output(results)

print("‚úÖ Concurrent executors defined")

### Build & Run Concurrent Workflow

In [None]:
# Create executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="sum")
aggregator = Aggregator(id="aggregator")

# Build workflow with fan-out and fan-in
concurrent_workflow = (
    WorkflowBuilder()
    .set_start_executor(dispatcher)
    .add_fan_out_edges(dispatcher, [average, summation])  # Fan-out: 1 ‚Üí many
    .add_fan_in_edges([average, summation], aggregator)   # Fan-in: many ‚Üí 1
    .build()
)

# Run with random numbers
numbers = [random.randint(1, 100) for _ in range(5)]
print(f"Input: {numbers}")

async for event in concurrent_workflow.run_stream(numbers):
    if isinstance(event, WorkflowOutputEvent):
        print(f"üéØ Results: {event.data}")  # [average, sum]

## Agents in Workflows

Combine **AI agents** with workflows for complex multi-agent collaboration.

### Why Use Agents in Workflows?

| Problem | Solution |
|---------|----------|
| Single agent can't do everything well | **Specialized agents** - each agent excels at one task |
| Need quality control on AI output | **Review pipelines** - Writer ‚Üí Reviewer ‚Üí Editor |
| Complex tasks need multiple perspectives | **Collaborative agents** - Research ‚Üí Analyze ‚Üí Summarize |
| Repetitive multi-step AI processes | **Automated pipelines** - consistent, scalable execution |

### Use Cases

| Use Case | Agents | Flow |
|----------|--------|------|
| **Content Creation** | Writer ‚Üí Reviewer ‚Üí Publisher | Draft ‚Üí Feedback ‚Üí Final |
| **Code Review** | Developer ‚Üí Reviewer ‚Üí Security | Write ‚Üí Review ‚Üí Audit |
| **Research** | Researcher ‚Üí Analyst ‚Üí Summarizer | Gather ‚Üí Analyze ‚Üí Report |
| **Customer Support** | Classifier ‚Üí Specialist ‚Üí QA | Route ‚Üí Handle ‚Üí Verify |

> **Key Insight:** Agents can be used directly as workflow executors!

### Example: Writer ‚Üí Reviewer Pipeline

In [None]:
from agent_framework import AgentRunUpdateEvent

# Create specialized agents
writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="Writer",
    instructions="You are an excellent content writer. Create engaging content based on prompts."
)

reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="Reviewer", 
    instructions="You are a content reviewer. Provide concise, actionable feedback on the content."
)

# Build workflow: Writer ‚Üí Reviewer (agents ARE executors!)
agent_workflow = (
    WorkflowBuilder()
    .set_start_executor(writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

print("‚úÖ Agent workflow created: Writer ‚Üí Reviewer")

### Run with Streaming

In [None]:
# Run the agent workflow with streaming
last_executor_id = None

async for event in agent_workflow.run_stream("Create a slogan for an affordable electric SUV"):
    if isinstance(event, AgentRunUpdateEvent):
        # Show which agent is responding
        if event.executor_id != last_executor_id:
            if last_executor_id:
                print("\n")
            print(f"üìù {event.executor_id}:", end=" ", flush=True)
            last_executor_id = event.executor_id
        if event.data:
            print(event.data, end="", flush=True)
    elif isinstance(event, WorkflowOutputEvent):
        print("\n\nüéØ Final Output:")
        print(event.data)

## 13. Workflow with Branching Logic

**Why Branching?** Real workflows need decisions - route emails differently if spam, escalate tickets by priority, process orders based on type. Branching logic enables dynamic routing based on runtime conditions.

### Three Routing Patterns:
| Pattern | Use Case | Targets |
|---------|----------|---------|
| **Conditional Edge** | Binary decision (if/else) | Exactly 1 |
| **Switch-Case** | Multi-way routing (enum values) | Exactly 1 |
| **Multi-Selection** | Dynamic fan-out (parallel paths) | 1 or more |

### 13.1 Conditional Edges - Binary Routing

**Scenario:** Email spam detection ‚Üí route to spam handler OR email assistant

In [None]:
# Conditional Edges - Email Spam Detection Workflow
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    executor,
)

# Data Models
class DetectionResult(BaseModel):
    """Spam detection result with routing flag."""
    is_spam: bool
    reason: str
    email_content: str  # Pass original content downstream

class EmailResponse(BaseModel):
    """Email assistant response."""
    response: str

# Condition Function - creates predicate for routing
def get_condition(expected_result: bool):
    """Factory: returns predicate matching is_spam value."""
    def condition(message: Any) -> bool:
        # Handle list wrapper if present
        if isinstance(message, list) and len(message) > 0:
            message = message[0]
        if not isinstance(message, AgentExecutorResponse):
            return False
        try:
            # AgentExecutorResponse has .agent_response (not agent_run_response)
            detection = DetectionResult.model_validate_json(message.agent_response.text)
            return detection.is_spam == expected_result
        except Exception:
            return False
    return condition

# Handler Executors
@executor(id="send_email")
async def handle_email_response(response: Any, ctx: WorkflowContext[Never, str]) -> None:
    """Handle legitimate emails - draft response."""
    if isinstance(response, list):
        response = response[0]
    email_response = EmailResponse.model_validate_json(response.agent_response.text)
    await ctx.yield_output(f"‚úâÔ∏è Email sent:\n{email_response.response}")

@executor(id="handle_spam")
async def handle_spam_response(response: Any, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam emails - mark as spam."""
    if isinstance(response, list):
        response = response[0]
    detection = DetectionResult.model_validate_json(response.agent_response.text)
    await ctx.yield_output(f"üö´ Marked as SPAM: {detection.reason}")

@executor(id="to_email_assistant_request")
async def to_email_assistant_request(response: Any, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Transform spam detection response ‚Üí email assistant request."""
    if isinstance(response, list):
        response = response[0]
    detection = DetectionResult.model_validate_json(response.agent_response.text)
    request = AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=detection.email_content)],
        should_respond=True
    )
    await ctx.send_message(request)

print("‚úÖ Conditional edge components defined")

In [None]:
# Build and run the Conditional Edge workflow
from agent_framework import WorkflowOutputEvent
from agent_framework._workflows._events import ExecutorCompletedEvent

async def run_conditional_workflow():
    # Create agents with structured output
    spam_detector = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are a spam detection assistant. "
                "Return JSON with: is_spam (bool), reason (string), email_content (string). "
                "Always include the original email in email_content."
            ),
            response_format=DetectionResult,
        ),
        id="spam_detector",
    )
    
    email_assistant = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are an email assistant that drafts professional responses. "
                "Return JSON with: response (string) containing the drafted reply."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant",
    )
    
    # Build workflow with conditional routing
    workflow = (
        WorkflowBuilder()
        .set_start_executor(spam_detector)
        # Not spam ‚Üí transform ‚Üí assistant ‚Üí send
        .add_edge(spam_detector, to_email_assistant_request, condition=get_condition(False))
        .add_edge(to_email_assistant_request, email_assistant)
        .add_edge(email_assistant, handle_email_response)
        # Spam ‚Üí handle directly
        .add_edge(spam_detector, handle_spam_response, condition=get_condition(True))
        .build()
    )
    
    # Test with legitimate email
    legit_email = "Hi, I wanted to follow up on our meeting yesterday. Can we schedule a call this week?"
    print("üìß Testing LEGITIMATE email...")
    request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=legit_email)], should_respond=True)
    events = await workflow.run(request)
    
    # Show agent responses and final output
    for event in events:
        if isinstance(event, ExecutorCompletedEvent) and event.data:
            data = event.data[0] if isinstance(event.data, list) else event.data
            if hasattr(data, 'agent_response'):
                print(f"  [{event.executor_id}]: {data.agent_response.text[:150]}...")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"\n‚úÖ {event.data}")
    
    print("\n" + "="*50 + "\n")
    
    # Test with spam email
    spam_email = "CONGRATULATIONS! You've won $1,000,000! Click here NOW to claim your prize!"
    print("üìß Testing SPAM email...")
    request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=spam_email)], should_respond=True)
    events = await workflow.run(request)
    
    for event in events:
        if isinstance(event, ExecutorCompletedEvent) and event.data:
            data = event.data[0] if isinstance(event.data, list) else event.data
            if hasattr(data, 'agent_response'):
                print(f"  [{event.executor_id}]: {data.agent_response.text[:150]}...")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"\n‚úÖ {event.data}")

await run_conditional_workflow()

### 13.2 Switch-Case Edges - Multi-Way Routing

**Why Switch-Case?** When you need 3+ routing options (not just if/else). Cleaner than multiple conditional edges.

**Scenario:** Email classification ‚Üí NotSpam | Spam | Uncertain (needs human review)

In [None]:
# Switch-Case Edges - Three-Way Email Classification
from agent_framework import Case, Default

# Enhanced models for 3-way classification
class ThreeWayDetectionResult(BaseModel):
    """Three-way spam classification."""
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

@dataclass
class DetectionPayload:
    """Internal payload for routing."""
    spam_decision: str
    reason: str
    email_id: str

@dataclass
class StoredEmail:
    """Email stored in shared state."""
    email_id: str
    content: str

# Shared state keys
EMAIL_PREFIX = "email:"
CURRENT_EMAIL_KEY = "current_email_id"

# Condition factory for switch-case
def get_case(expected_decision: str):
    """Factory: returns predicate matching spam_decision value."""
    def condition(message: Any) -> bool:
        return isinstance(message, DetectionPayload) and message.spam_decision == expected_decision
    return condition

# Executors for switch-case workflow
@executor(id="store_and_analyze")
async def store_and_analyze(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email in shared state and send for analysis."""
    email = StoredEmail(email_id=str(uuid4()), content=email_text)
    await ctx.set_shared_state(f"{EMAIL_PREFIX}{email.email_id}", email)
    await ctx.set_shared_state(CURRENT_EMAIL_KEY, email.email_id)
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.content)], should_respond=True)
    )

@executor(id="to_detection_payload")
async def to_detection_payload(response: Any, ctx: WorkflowContext[DetectionPayload]) -> None:
    """Transform agent response ‚Üí typed payload for routing."""
    if isinstance(response, list):
        response = response[0]
    # Use agent_response (not agent_run_response)
    parsed = ThreeWayDetectionResult.model_validate_json(response.agent_response.text)
    email_id = await ctx.get_shared_state(CURRENT_EMAIL_KEY)
    await ctx.send_message(DetectionPayload(
        spam_decision=parsed.spam_decision,
        reason=parsed.reason,
        email_id=email_id
    ))

@executor(id="handle_not_spam_sc")
async def handle_not_spam(detection: DetectionPayload, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle NotSpam ‚Üí forward to email assistant."""
    email: StoredEmail = await ctx.get_shared_state(f"{EMAIL_PREFIX}{detection.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.content)], should_respond=True)
    )

@executor(id="finalize_response")
async def finalize_response(response: Any, ctx: WorkflowContext[Never, str]) -> None:
    """Finalize email assistant response."""
    if isinstance(response, list):
        response = response[0]
    parsed = EmailResponse.model_validate_json(response.agent_response.text)
    await ctx.yield_output(f"‚úâÔ∏è Response drafted: {parsed.response}")

@executor(id="handle_spam_sc")
async def handle_spam_sc(detection: DetectionPayload, ctx: WorkflowContext[Never, str]) -> None:
    """Handle Spam emails."""
    await ctx.yield_output(f"üö´ Marked as SPAM: {detection.reason}")

@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionPayload, ctx: WorkflowContext[Never, str]) -> None:
    """Handle Uncertain - flag for human review."""
    email: StoredEmail = await ctx.get_shared_state(f"{EMAIL_PREFIX}{detection.email_id}")
    await ctx.yield_output(f"‚ö†Ô∏è NEEDS REVIEW: {detection.reason}\nContent: {email.content[:100]}...")

print("‚úÖ Switch-case components defined")

In [None]:
# Build and run the Switch-Case workflow
async def run_switch_case_workflow():
    # Three-way classification agent
    classifier = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "You are an email classifier. Be LESS confident - use Uncertain when not sure. "
                "Return JSON with: spam_decision (NotSpam, Spam, or Uncertain), reason (string)."
            ),
            response_format=ThreeWayDetectionResult,
        ),
        id="classifier",
    )
    
    email_assistant = AgentExecutor(
        chat_client.as_agent(
            instructions="Draft professional email responses. Return JSON with: response (string).",
            response_format=EmailResponse,
        ),
        id="email_assistant_sc",
    )
    
    # Build workflow with switch-case routing
    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_and_analyze)
        .add_edge(store_and_analyze, classifier)
        .add_edge(classifier, to_detection_payload)
        # Switch-case: one edge group, multiple targets
        .add_switch_case_edge_group(
            to_detection_payload,
            [
                Case(condition=get_case("NotSpam"), target=handle_not_spam),
                Case(condition=get_case("Spam"), target=handle_spam_sc),
                Default(target=handle_uncertain),  # Catches Uncertain + unexpected values
            ],
        )
        # Continue NotSpam path
        .add_edge(handle_not_spam, email_assistant)
        .add_edge(email_assistant, finalize_response)
        .build()
    )
    
    # Test all three paths
    test_emails = [
        ("LEGITIMATE", "Hi team, please review the Q4 report attached. Thanks!"),
        ("SPAM", "URGENT: Your account will be SUSPENDED! Click NOW to verify!!!"),
        ("AMBIGUOUS", "Hey, saw your profile and thought we could connect. Let me know if interested."),
    ]
    
    for label, email in test_emails:
        print(f"üìß Testing {label} email...")
        events = await workflow.run(email)
        for event in events:
            if isinstance(event, ExecutorCompletedEvent) and event.data:
                data = event.data[0] if isinstance(event.data, list) else event.data
                if hasattr(data, 'agent_response'):
                    print(f"  [{event.executor_id}]: {data.agent_response.text[:100]}...")
            elif isinstance(event, WorkflowOutputEvent):
                print(f"  ‚úÖ {event.data}")
        print()

await run_switch_case_workflow()

### 13.3 Multi-Selection Edges - Dynamic Fan-Out

**Why Multi-Selection?** When one input should trigger **multiple parallel paths** based on content. Unlike switch-case (exactly 1 target), multi-selection can activate 0 to N targets.

**Scenario:** Long emails ‚Üí email assistant + summarizer (parallel). Short emails ‚Üí email assistant only.

In [None]:
# Multi-Selection Edges - Parallel Processing for Long Emails
from agent_framework._workflows._events import WorkflowEvent

# Additional models
class AnalysisResult(BaseModel):
    """Analysis result from classifier."""
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str

class EmailSummary(BaseModel):
    """Summary of long email."""
    summary: str

@dataclass
class AnalysisPayload:
    """Enriched payload with email metadata for routing."""
    spam_decision: str
    reason: str
    email_length: int  # Used for conditional fan-out
    email_id: str

# Custom event for tracking
class DatabaseEvent(WorkflowEvent):
    """Track database operations."""
    pass

# Selection function - heart of multi-selection
LONG_EMAIL_THRESHOLD = 100  # characters

def select_targets(payload: AnalysisPayload, target_ids: list[str]) -> list[str]:
    """
    Intelligent routing based on spam decision + email length.
    Returns LIST of target IDs to activate (can be multiple).
    """
    # Target order: [handle_spam_ms, respond_to_email, summarize_email, handle_uncertain_ms]
    handle_spam_id, respond_id, summarize_id, uncertain_id = target_ids
    
    if payload.spam_decision == "Spam":
        return [handle_spam_id]
    elif payload.spam_decision == "NotSpam":
        targets = [respond_id]  # Always respond
        if payload.email_length > LONG_EMAIL_THRESHOLD:
            targets.append(summarize_id)  # Also summarize if long
        return targets
    else:  # Uncertain
        return [uncertain_id]

# Executors for multi-selection
@executor(id="analyze_email")
async def analyze_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email and start analysis."""
    email = StoredEmail(email_id=str(uuid4()), content=email_text)
    await ctx.set_shared_state(f"{EMAIL_PREFIX}{email.email_id}", email)
    await ctx.set_shared_state(CURRENT_EMAIL_KEY, email.email_id)
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.content)], should_respond=True)
    )

@executor(id="to_analysis_payload")
async def to_analysis_payload(response: Any, ctx: WorkflowContext[AnalysisPayload]) -> None:
    """Create enriched payload with email metadata."""
    if isinstance(response, list):
        response = response[0]
    # Use agent_response (not agent_run_response)
    parsed = AnalysisResult.model_validate_json(response.agent_response.text)
    email_id = await ctx.get_shared_state(CURRENT_EMAIL_KEY)
    email: StoredEmail = await ctx.get_shared_state(f"{EMAIL_PREFIX}{email_id}")
    await ctx.send_message(AnalysisPayload(
        spam_decision=parsed.spam_decision,
        reason=parsed.reason,
        email_length=len(email.content),
        email_id=email_id
    ))

@executor(id="respond_to_email")
async def respond_to_email(payload: AnalysisPayload, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Forward to email assistant (always for NotSpam)."""
    email: StoredEmail = await ctx.get_shared_state(f"{EMAIL_PREFIX}{payload.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.content)], should_respond=True)
    )

@executor(id="finalize_email_ms")
async def finalize_email_ms(response: Any, ctx: WorkflowContext[Never, str]) -> None:
    """Output email response."""
    if isinstance(response, list):
        response = response[0]
    parsed = EmailResponse.model_validate_json(response.agent_response.text)
    await ctx.yield_output(f"‚úâÔ∏è Response: {parsed.response[:100]}...")

@executor(id="summarize_email")
async def summarize_email(payload: AnalysisPayload, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Summarize long emails (parallel with respond)."""
    email: StoredEmail = await ctx.get_shared_state(f"{EMAIL_PREFIX}{payload.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=f"Summarize: {email.content}")], should_respond=True)
    )

@executor(id="output_summary")
async def output_summary(response: Any, ctx: WorkflowContext[Never, str]) -> None:
    """Output email summary."""
    if isinstance(response, list):
        response = response[0]
    parsed = EmailSummary.model_validate_json(response.agent_response.text)
    await ctx.yield_output(f"üìã Summary: {parsed.summary}")

@executor(id="handle_spam_ms")
async def handle_spam_ms(payload: AnalysisPayload, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam."""
    await ctx.yield_output(f"üö´ SPAM: {payload.reason}")

@executor(id="handle_uncertain_ms")
async def handle_uncertain_ms(payload: AnalysisPayload, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain."""
    await ctx.yield_output(f"‚ö†Ô∏è UNCERTAIN: {payload.reason}")

print("‚úÖ Multi-selection components defined")

In [None]:
# Build and run the Multi-Selection workflow
async def run_multi_selection_workflow():
    # Classifier agent
    analyzer = AgentExecutor(
        chat_client.as_agent(
            instructions=(
                "Classify emails. Return JSON with: spam_decision (NotSpam, Spam, Uncertain), reason (string)."
            ),
            response_format=AnalysisResult,
        ),
        id="analyzer",
    )
    
    # Email assistant
    responder = AgentExecutor(
        chat_client.as_agent(
            instructions="Draft professional responses. Return JSON with: response (string).",
            response_format=EmailResponse,
        ),
        id="responder",
    )
    
    # Summarizer
    summarizer = AgentExecutor(
        chat_client.as_agent(
            instructions="Summarize emails concisely. Return JSON with: summary (string).",
            response_format=EmailSummary,
        ),
        id="summarizer",
    )
    
    # Build workflow with multi-selection
    workflow = (
        WorkflowBuilder()
        .set_start_executor(analyze_email)
        .add_edge(analyze_email, analyzer)
        .add_edge(analyzer, to_analysis_payload)
        # Multi-selection: one input ‚Üí multiple targets based on selection_func
        .add_multi_selection_edge_group(
            to_analysis_payload,
            [handle_spam_ms, respond_to_email, summarize_email, handle_uncertain_ms],
            selection_func=select_targets,
        )
        # Response path
        .add_edge(respond_to_email, responder)
        .add_edge(responder, finalize_email_ms)
        # Summary path (parallel with response for long emails)
        .add_edge(summarize_email, summarizer)
        .add_edge(summarizer, output_summary)
        .build()
    )
    
    # Test: Short email (only response)
    short_email = "Quick question: What time is the meeting?"
    print(f"üìß SHORT email ({len(short_email)} chars < {LONG_EMAIL_THRESHOLD} threshold)")
    print("Expected: Response ONLY\n")
    events = await workflow.run(short_email)
    for event in events:
        if isinstance(event, ExecutorCompletedEvent) and event.data:
            data = event.data[0] if isinstance(event.data, list) else event.data
            if hasattr(data, 'agent_response'):
                print(f"  [{event.executor_id}]: {data.agent_response.text[:80]}...")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"  ‚úÖ {event.data}")
    
    print("\n" + "="*50 + "\n")
    
    # Test: Long email (response + summary in parallel)
    long_email = """
    Dear Team,
    
    I wanted to provide a comprehensive update on our Q4 initiatives. First, the product launch 
    has been rescheduled to November 15th due to supply chain delays. Second, our marketing 
    campaign will begin two weeks prior with a focus on social media engagement. Third, the 
    budget allocation has been approved by finance with a 15% increase for digital advertising.
    
    Additionally, we need to finalize the vendor contracts by end of this week. Please review
    the attached proposals and provide your feedback. The legal team has already completed their
    preliminary review and flagged a few items that need attention.
    
    Let's schedule a follow-up meeting to discuss action items.
    
    Best regards,
    Sarah
    """
    print(f"üìß LONG email ({len(long_email)} chars > {LONG_EMAIL_THRESHOLD} threshold)")
    print("Expected: Response AND Summary (parallel)\n")
    events = await workflow.run(long_email)
    for event in events:
        if isinstance(event, ExecutorCompletedEvent) and event.data:
            data = event.data[0] if isinstance(event.data, list) else event.data
            if hasattr(data, 'agent_response'):
                print(f"  [{event.executor_id}]: {data.agent_response.text[:80]}...")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"  ‚úÖ {event.data}")

await run_multi_selection_workflow()