# Workflows in Agent Framework

### Understanding Workflows in Agent Framework

**AI Agents**

An AI agent is powered by a large language model (LLM) and can access various tools to accomplish tasks. Its behavior is dynamic - the steps it takes are determined by the LLM based on the context of the conversation and the instructions provided. Agents are great for handling open-ended tasks where flexibility and reasoning are key.

**Workflows**

A workflow is a predefined sequence of operations designed to handle more structured processes. It can include AI agents as components, along with human-in-the-loop steps and integrations with external systems. Unlike an agent’s dynamic decision-making, a workflow’s execution path is explicitly defined, giving you greater control and predictability. Workflows are ideal for orchestrating complex business processes that require coordination across multiple agents and systems.

### Key Features
- **Type Safety**: Strong typing ensures messages flow correctly between components, with comprehensive validation that prevents runtime errors.
- **Flexible Control Flow**: Graph-based architecture allows for intuitive modeling of complex workflows with `executors` and `edges`. Conditional routing, parallel processing, and dynamic execution paths are all supported.
- **External Integration**: Built-in request/response patterns for seamless integration with external APIs, and human-in-the-loop scenarios.
- **Checkpointing**: Save workflow states via checkpoints, enabling recovery and resumption of long-running processes on server sides.
- **Multi-Agent Orchestration**: Built-in patterns for coordinating multiple AI agents, including sequential, concurrent, hand-off, and magentic.

### Core Concepts
1. **Executors**: represent individual processing units within a workflow. They can be AI agents or custom logic components. They receive input messages, perform specific tasks, and produce output messages.
2. **Edges**: define the connections between executors, determining the flow of messages. They can include conditions to control routing based on message contents.
3. **Workflows**: are directed graphs composed of executors and edges. They define the overall process, starting from an initial executor and proceeding through various paths based on conditions and logic defined in the edges.

### Basic Executor Structure
Executors are the fundamental building blocks that process messages in a workflow. They are autonomous processing units that receive typed messages, perform operations, and can produce output messages or events.

Executors inherit from the `Executor` base class. Each executor has a unique identifier and can handle specific message types using methods decorated with the `@handler` decorator. 
Handlers must have the proper type annotations to specify the type of messages they can process.


In [None]:
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
    executor
)

# Executors inherit from the Executor base class. Each executor represents a node in the workflow graph.
# --- Approach 1: Class-based Executor ---
class UpperCase(Executor):

    @handler
    # Handler signature contract:
    # - First parameter is the typed input to this node (here: text: str)
    # - Second parameter is a WorkflowContext[T_Out], where T_Out is the type of data the node will emit via ctx.send_message  
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:    
        """Within a handler you typically:
        - Compute a result
        - Forward that result to downstream node(s) using ctx.send_message(result)"""
        await ctx.send_message(text.upper())    # Send the transformed message to the next executor in the workflow

# --- Approach 2: Function-based Executor --- 

# For simple steps you can skip subclassing and define an async function with the
# same signature pattern (typed input + WorkflowContext[T_Out, T_W_Out]) and decorate it with
# @executor. This creates a fully functional node that can be wired into a flow.
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Convert the input to uppercase and forward it to the next node.

    Note: The WorkflowContext is parameterized with the type this handler will
    emit. Here WorkflowContext[str] means downstream nodes should expect str.
    """
    await ctx.send_message(text.upper())

### The `WorkflowContext` Object
The `WorkflowContext` object provides methods for the handler to interact with the workflow during execution. The `WorkflowContext` is parameterized with the type of messages the handler will emit and the type of outputs it can yield.

The most commonly used method is `send_message`, which allows the handler to send messages to connected executors.
A handler can use yield_output to produce outputs that will be considered as workflow outputs and be returned/streamed to the caller as an output event.
If a handler neither sends messages nor yields outputs, no type parameter is needed for `WorkflowContext`


In [None]:
from agent_framework import WorkflowContext
from typing_extensions import Never

class SomeHandler(Executor):

    @handler
    async def send_message_example(self, message: str, ctx: WorkflowContext[str]) -> None:
        """
        Forward an intermediate message to the next executor.
        WorkflowContext[str] means downstream expects a string.
        """
        await ctx.send_message("Hello, World!")  # Sends to next step

    @handler
    async def yield_output_example(self, message: str, ctx: WorkflowContext[Never, str]) -> None:
        """
        Yield a final output without sending downstream.
        WorkflowContext[Never, str] means no intermediate messages, only final output of type str.
        """
        await ctx.yield_output("Hello, World!")  # Ends workflow

    @handler
    async def no_output_example(self, message: str, ctx: WorkflowContext) -> None:
        """
        Perform work without sending or yielding anything.
        WorkflowContext without type parameters means no strict typing.
        """
        print("Doing some work...")  # Side effect only

### Edge Patterns
Edges define how messages flow between executors with optional conditions. They represent the connections in the workflow graph and determine the data flow paths.

The framework supports several edge patterns:

1. **Direct Edges**: Simple one-to-one connections between executors
2. **Conditional Edges**: Edges with conditions that determine when messages should flow
3. **Fan-out Edges**: One executor sending messages to multiple targets
4. **Fan-in Edges**: Multiple executors sending messages to a single target

Explore implementation of all edge patterns in detail [here](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/core-concepts/edges?pivots=programming-language-python.) 

In [None]:
from agent_framework import WorkflowBuilder, WorkflowContext, executor

# Define three executors
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(text.upper())

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.yield_output(text[::-1])

@executor(id="append_exclamation_executor")
async def append_exclamation(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.yield_output(text + "!!!")

# Build a simple workflow with edges
workflow = (
    WorkflowBuilder()
    # Direct edge: always forward messages to reverse_text
    .add_edge(upper_case, reverse_text)
    # Conditional edge: forward to append_exclamation only if text length > 5
    .add_edge(upper_case, append_exclamation, condition=lambda msg: len(msg) > 5)
    .set_start_executor(upper_case)
    .build()
)

### Workflows
A **Workflow** ties everything together and manages execution. It's the orchestrator that coordinates executor execution, message routing, and event streaming.

Workflows are constructed using the `WorkflowBuilder` class, which provides a fluent API for defining the workflow structure. It is a general-purpose builder for creating workflows with any orchestration pattern.

#### Example - Create a Simple Sequential Workflow
Now let's put it all together and create our first workflow with two executors:

In [None]:
from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    handler,
)
from typing_extensions import Never

# Subclassing Executor lets you define a named node with lifecycle hooks if needed.
class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    # The work itself is implemented in an async method decorated with @handler.
    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)


# For simple steps you can skip subclassing and define an async function with the
# same signature pattern (typed input + WorkflowContext[T_Out, T_W_Out]) and decorate it with @executor.
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input string and yield the workflow output.

    This node yields the final output using ctx.yield_output(result).
    The workflow will complete when it becomes idle (no more work to do).

    """
    result = text[::-1]

    # Yield the output - the workflow will complete when idle
    await ctx.yield_output(result)

async def main():
    """Build and run a simple 2-step workflow using the fluent builder API."""

    upper_case = UpperCase(id="upper_case_executor")

    # Build the workflow using a fluent pattern:
    # 1) add_edge(from_node, to_node) defines a directed edge upper_case -> reverse_text
    # 2) set_start_executor(node) declares the entry point
    # 3) build() finalizes and returns an immutable Workflow object
    workflow = (WorkflowBuilder()
                .add_edge(upper_case, reverse_text)
                .set_start_executor(upper_case)
                .build()
    )

    # Run the workflow by sending the initial message to the start node.
    # The run(...) call returns an event collection; its get_outputs() method
    # retrieves the outputs yielded by any terminal nodes.
    events = await workflow.run("hello world")
    print(events.get_outputs())

    # Summarize the final run state (e.g., COMPLETED)
    print("Final state:", events.get_final_state())

await main()

### Workflow Events
In Agent Framework, workflows are not just about passing data between executors - when `workflow.run_stream(..)` is called, it emits events as it progresses.

These events allow developers to:

- Observe workflow progress (e.g., when an executor starts or completes).
- React to intermediate states (e.g., log outputs, trigger side effects).
- Handle errors gracefully (e.g., capture exceptions without breaking the workflow).

There are built-in events that provide observability into the workflow execution:

```python
# Workflow lifecycle events
WorkflowStartedEvent    # Workflow execution begins
WorkflowOutputEvent     # Fired when an executor calls ctx.yield_output, streams intermediate results
WorkflowStatusEvent     # Indicates a state change in the workflow
WorkflowErrorEvent      # Non-terminal error or warning during workflow execution
WorkflowFailedEvent     # Terminal event where workflow cannot continue

# Executor events
ExecutorInvokeEvent     # Fired when an executor is invoked
ExecutorCompleteEvent   # Fired when an executor finishes
ExecutorFailedEvent     # Signals that a specific executor node failed

# Request events
RequestInfoEvent        # A request is issued

```

Each event has its own properties. Below is a short reference to help you identify what you can extract from each event:

| Event Type              | Important Properties                                      |
|-------------------------|-----------------------------------------------------------|
| WorkflowStatusEvent     | `state`, `origin`                                        |
| WorkflowOutputEvent     | `data`, `source_executor_id`                             |
| WorkflowFailedEvent     | `details.error_type`, `details.message`, `origin`        |
| ExecutorFailedEvent     | `executor_id`, `details.error_type`, `details.message`   |
| WorkflowWarningEvent    | `data` (warning message), `origin`                       |
| WorkflowErrorEvent      | `data` (exception), `origin`                             |
| RequestInfoEvent        | `request_id`, `source_executor_id`, `request_type`, `response_type` |
| AgentRunUpdateEvent     | `executor_id`, `data` (streamed messages)                |
| AgentRunEvent           | `executor_id`, `data` (final agent response)             |

For more details, check the [implementation docs](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/using-agents?pivots=programming-language-python).

In addition to built-in events, you can define and emit your own custom events during workflow execution. This is useful for fine-grained monitoring, custom logging or metrics for specific steps and external action triggers. Learn more [here](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/core-concepts/events?pivots=programming-language-python).


In [None]:
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, executor
from agent_framework import (
    WorkflowOutputEvent,
)

@executor(id="upper_case_executor")
async def to_upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Transform the input to uppercase and forward it to the next step."""
    result = text.upper()

    # Send the intermediate result to the next executor
    await ctx.send_message(result)

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

workflow = (
    WorkflowBuilder()
    .add_edge(to_upper_case, reverse_text)
    .set_start_executor(to_upper_case)
    .build()
)

async def main():
    # Run the workflow, iterate over events and stream them
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")

        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

await main()


<details>
  <summary>See the solution</summary>
  
  ```python

from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, executor
from agent_framework import (
    WorkflowOutputEvent,
)

@executor(id="upper_case_executor")
async def to_upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Transform the input to uppercase and forward it to the next step."""
    result = text.upper()

    # Send the intermediate result to the next executor
    await ctx.send_message(result)

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

workflow = (
    WorkflowBuilder()
    .add_edge(to_upper_case, reverse_text)
    .set_start_executor(to_upper_case)
    .build()
)

async def main():
    # Run the workflow, iterate over events and stream them
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")

        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

await main()

```
</details>

### Exercise - Concurrent Workflows
Before we jump into using agents in a workflow, let's reinforce our understanding of workflow orchestration concepts and create a concurrent workflow. 
You'll learn to implement fan-out and fan-in patterns that enable parallel processing, allowing multiple executors or agents to work simultaneously and then aggregate their results.

You'll create a workflow that:

1. Takes a list of numbers as input
2. Distributes the list to two parallel executors (one calculating average, one calculating sum)
3. Aggregates the different result types (float and int) into a final output
4. Demonstrates how the framework handles different result types from concurrent executors

In [None]:
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

class Dispatcher(Executor):
    """Dispatch the input list to other executors."""
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)


class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors."""
        await ctx.yield_output(results)

async def main() -> None:

    numbers = [random.randint(1, 100) for _ in range(10)]
    print(f"Input numbers: {numbers}")
    print("=" * 60)


    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")


    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

    output: list[int | float] | None = None
    async for event in workflow.run_stream(numbers):
        print(f"{event}") 

        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print("\nFinal Aggregated Output:")
        print(f"Average: {output[0]} | Sum: {output[1]}")
        print("=" * 60)


await main()


<details>
  <summary>See the solution</summary>
  
  ```python

import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

class Dispatcher(Executor):
    """Dispatch the input list to other executors."""
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)


class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors."""
        await ctx.yield_output(results)

async def main() -> None:

    numbers = [random.randint(1, 100) for _ in range(10)]
    print(f"Input numbers: {numbers}")
    print("=" * 60)


    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")


    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

    output: list[int | float] | None = None
    async for event in workflow.run_stream(numbers):
        print(f"{event}") 

        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print("\nFinal Aggregated Output:")
        print(f"Average: {output[0]} | Sum: {output[1]}")
        print("=" * 60)


await main()

```
</details>

### Agents in a Workflow

To add intelligence to your workflows, you can use AI agents as part of your workflow execution. AI agents can be easily integrated into workflows, allowing you to create complex, intelligent solutions that were previously difficult to achieve.

#### What happens when you add an agent to a workflow

When you add an agent to a workflow in the Agent Framework (see example below), you’re essentially wrapping the agent inside a workflow executor so it can participate in the workflow orchestration. The executor is the one handling the communication of the agent with other workflow parts.
1. Whenever the executor receives a single or a list of chat messages, it will trigger the agent to respond with the response type of `AgentExecutorResponse` object. The class contains the following information about agent's response:
    - `executor_id` - the ID of the executor that produced the response
    - `agent_run_response` - the full response from the agent
    - `full_conversation` - the full conversation history up to this point
2. There are two possible event types related to agent responses that can be emitted:
    - `AgentRunUpdateEvent` - chunks of agent's responses as they are generated (streaming)
    - `AgentRunEvent` - full response from the agent (non-streaming)

By default, agents are wrapped in executors that run in streaming mode. You can customize this behavior by creating a custom executor. With custom executors, you can control invocation of the agent, message types it will handle, agent life cycle and other. 

Learn more on how to [create a Custom Agent Executor](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/using-agents?pivots=programming-language-python).



#### Example: Using Built-In Agent Executor

In this step, the agents we add to the workflow will be instantiated using the `AzureOpenAIChatClient` class that we already used earlier in *Section 01.1*:

In [None]:
import os
from dotenv import load_dotenv
from agent_framework.azure import AzureOpenAIChatClient

load_dotenv()
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_version = os.getenv("AZURE_OPENAI_API_VERSION")
deployment = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")

chat_client=AzureOpenAIChatClient(
        endpoint=endpoint,
        api_key=api_key,
        api_version=api_version,
        deployment_name=deployment,
    )

When you add an agent to a workflow:

- It behaves like an executor but runs with its own instructions and context.
- You can chain multiple agents together to create collaborative flows (e.g., Writer → Reviewer).
- The workflow orchestrates their interaction, passing messages and collecting outputs.

In the following example, two agents are created:
-  `writer_agent` generates and edits content and `reviewer_agent` reviews content and provides feedback
- The workflow setup uses `WorkflowBuilder` to set the start executor as the writer agent, add an edge from the writer to the reviewer and builds the workflow graph.

In [None]:
from agent_framework import AgentRunEvent, WorkflowBuilder

async def main():
    """Build and run a simple two node agent workflow: Writer then Reviewer."""
    
    writer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer",
    )

    reviewer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer",
    )

    workflow = (WorkflowBuilder()
                .set_start_executor(writer_agent)
                .add_edge(writer_agent, reviewer_agent)
                .build())

    # Run the workflow with the user's initial message. For clarity, use run (non streaming) and print the terminal event.
    events = await workflow.run("Create a slogan for a new electric SUV that is affordable and fun to drive.")

    # Print detailed agent run events
    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\nExecutor: {event.executor_id}")
            print(f"Output:\n{event.data}")
            print("-" * 40)

    # Print final workflow state
    print(f"Workflow State: {events.get_final_state()}")
    print("=" * 60)

await main()


<details>
  <summary>See the solution</summary>
  
  ```python

from agent_framework import AgentRunEvent, WorkflowBuilder

async def main():
    """Build and run a simple two node agent workflow: Writer then Reviewer."""

    
    writer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer",
    )

    reviewer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer",
    )

    workflow = (WorkflowBuilder()
                .set_start_executor(writer_agent)
                .add_edge(writer_agent, reviewer_agent)
                .build())

    # Run the workflow with the user's initial message. For clarity, use run (non streaming) and print the terminal event.
    events = await workflow.run("Create a slogan for a new electric SUV that is affordable and fun to drive.")

    # Print detailed agent run events
    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\nExecutor: {event.executor_id}")
            print(f"Output:\n{event.data}")
            print("-" * 40)

    # Print final workflow state
    print(f"Workflow State: {events.get_final_state()}")
    print("=" * 60)

await main()
```
</details>

### Exercise - Create a Simple Multi-Agent Workflow
In this exercise, you will design your own workflow by:

- Adding another agent to the process.
- Creating a scenario that reflects a real-world business process or planning task.
- Feel free to experiment with other concepts we have introduced so far:
    - Create an executor that performs deterministic tasks (sentiment check, calculations, pre-processing)
    - Add simple branching logic (execute two agent in parallel, for example)
    - Handle events or add simple middleware for observability

In [None]:
from agent_framework import AgentRunEvent, WorkflowBuilder

async def main():
    """Build and run a three-node agent workflow: Analyst → Finance → Approval."""

    analyst_agent = chat_client.create_agent(
        instructions=(
            "You are a business analyst. Draft a short business case based on the provided idea. "
        ),
        name="analyst",
    )

    finance_agent = chat_client.create_agent(
        instructions=(
            "You are a finance expert. Review the business case and highlight any financial risks or constraints."
        ),
        name="finance",
    )

    approval_agent = chat_client.create_agent(
        instructions=(
            "You are a senior manager. Decide whether to approve the business case based on strategic alignment. "
            "Respond with 'Approved' or 'Needs Revision' and provide reasoning."
        ),
        name="approval",
    )


    workflow = (
        WorkflowBuilder()
        .set_start_executor(analyst_agent)
        .add_edge(analyst_agent, finance_agent)
        .add_edge(finance_agent, approval_agent)
        .build()
    )


    events = await workflow.run("Develop a business case for implementing an AI-driven customer support service for handling complaints.")


    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\nExecutor: {event.executor_id}")
            print(f"Output:\n{event.data}")
            print("-" * 40)

    print(f"Workflow State: {events.get_final_state()}")
    print("=" * 60)


await main()

<details>
  <summary>See the solution</summary>
  
  ```python
from agent_framework import AgentRunEvent, WorkflowBuilder

async def main():
    """Build and run a three-node agent workflow: Analyst → Finance → Approval."""

    analyst_agent = chat_client.create_agent(
        instructions=(
            "You are a business analyst. Draft a short business case based on the provided idea. "
        ),
        name="analyst",
    )

    finance_agent = chat_client.create_agent(
        instructions=(
            "You are a finance expert. Review the business case and highlight any financial risks or constraints."
        ),
        name="finance",
    )

    approval_agent = chat_client.create_agent(
        instructions=(
            "You are a senior manager. Decide whether to approve the business case based on strategic alignment. "
            "Respond with 'Approved' or 'Needs Revision' and provide reasoning."
        ),
        name="approval",
    )


    workflow = (
        WorkflowBuilder()
        .set_start_executor(analyst_agent)
        .add_edge(analyst_agent, finance_agent)
        .add_edge(finance_agent, approval_agent)
        .build()
    )


    events = await workflow.run("Develop a business case for implementing an AI-driven customer support service for handling complaints.")


    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\nExecutor: {event.executor_id}")
            print(f"Output:\n{event.data}")
            print("-" * 40)

    print(f"Workflow State: {events.get_final_state()}")
    print("=" * 60)


await main()
```
</details>

### Example - Using Agents as Function Tools

In MAF, you can use agents as function tools by calling `.as_tool()` on an agent and providing it as a tool to another agent.
In this case, an agent's reasoning capability is wrapped so other agents can call it as part of their execution. This is useful for agent-to-agent collaboration - for example, one agent can delegate tasks to another specialised agent.

You can also customize the description, name and argument name when converting agent to a tool:
```python
refund_tool = refund_agent.as_tool(
    name="RefundAgent",
    description="Address and act on customer inquiries regarding refunds for damaged and returned items",
    arg_name="refund_inquiry",
)
```

In [None]:
import logging
from agent_framework import function_middleware
from collections.abc import Awaitable, Callable
from agent_framework import (
    FunctionInvocationContext
)

async def function_invocation_mw(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]],
) -> None:
    """A filter that will be called for each function call in the response"""
    print(f"---> Agent [{context.function.name}] called with messages: {context.arguments}")

    await next(context)
    
    print(f"---> Response from agent [{context.function.name}]: {context.result}")
    
    
refund_agent = chat_client.create_agent(
    name="refund_agent",
    instructions=(
        "You specialize in addressing customer inquiries regarding refunds for broken or returned products.\n"
        "For example, if required, you can refund a customer the price of a product, given a successful return.\n "
        "For this, you'll need the order id and for sake of now, just always say that the payment has been issued. "
        ),
)
logistics_agent = chat_client.create_agent(
    name="logistics_agent",
    instructions=(
        "You specialize in handling logistics for product shipments and returns.\n"
        "You can create an order for customer (requires customer id and product id(s)).\n"
        "You can also schedule the return of a product from a customer (requires order id and pickup date). "
        ),
)

customer_agent = chat_client.create_agent(
    name="customer_agent",
    instructions=(
        "Your role is triage the customer inquiries to the appropriate agent.\n"
        "When a user returns a product, schedule the logistics and refund.\n"
        "When a user wants to place an order, use the logistics agent.\n"
        ),
    tools=[refund_agent.as_tool(), logistics_agent.as_tool()], # Add the other two agents as tools
    middleware=[function_invocation_mw],
)

thread = customer_agent.get_new_thread()

messages = [
    "hi, i want to return an order",
    "the order id is 123 and please pick it up on May 10, 2025"
]


for message in messages:
    print("*** User:", message)
    response = await customer_agent.run(message, thread=thread)
    print("*** Agent:", response.text)


### Workflows as Agents
You can also turn workflows into MAF agents and interact with the workflow as if it were an agent. This allows to integrate workflows with APIs that support the agent interface and create more powerful agents.

To create an agent out of any workflow, use the `as_agent()` method:
```python
workflow_agent = workflow.as_agent(name="Workflow Agent")
```
You can use the workflow agent just as any other MAF agent.

### Example - Wrap agents in custom executors + stream workflow events

The following example builds a two‑node workflow using custom executors that each “own” a ChatAgent. A Writer agent generates content; a Reviewer agent refines and finalizes it. 
The workflow is run with streaming, so you can observe status and data events as they happen. 

Key objectives:
- **Learn how to wrap agents as executors** - Create role‑specific agents, attach them to workflow nodes by subclassing `Executor` and using `@handler`.
- **Type‑safe data flow with `WorkflowContext`** - Model contracts between nodes to explicitly define what each executor can receive and send.
- **Event‑driven observability** - Listen to the events emitted during execution to track status changes, capture live outputs and log errors.

In [None]:
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    ExecutorFailedEvent,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowFailedEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
)
from agent_framework._workflows._events import WorkflowOutputEvent
from agent_framework.azure import AzureOpenAIChatClient
from typing_extensions import Never


class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "writer"):

        self.agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content writer. You create new content and edit contents based on the feedback."
            ),
        )
        # Associate this agent with the executor node. The base Executor stores it on self.agent.
        super().__init__(id=id)

    @handler
    # Executor receives a single ChatMessage and sends downstream a list[ChatMessage] using ctx.send_message(messages).
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Generate content and forward the updated conversation."""
        # Start the conversation with the incoming user message.
        messages: list[ChatMessage] = [message]
        # Run the agent and extend the conversation with the agent's messages.
        response = await self.agent.run(messages)
        messages.extend(response.messages)

        await ctx.send_message(messages)


class Reviewer(Executor):
    """Custom executor that owns a review agent and completes the workflow."""

    agent: ChatAgent

    def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "reviewer"):

        self.agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content reviewer. You review the content and provide feedback to the writer."
            ),
        )
        super().__init__(id=id)

    @handler
    async def handle(self, messages: list[ChatMessage], ctx: WorkflowContext[Never, str]) -> None:

        response = await self.agent.run(messages)
        await ctx.yield_output(response.text)


async def main():
    """Build the two node workflow and run it with streaming to observe events."""

    writer = Writer(chat_client)
    reviewer = Reviewer(chat_client)


    workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build()

    # This surfaces executor events, workflow outputs, run-state changes, and errors.
    async for event in workflow.run_stream(
        ChatMessage(role="user", text="Create a slogan for a new electric SUV that is affordable and fun to drive.")
    ):
        if isinstance(event, WorkflowStatusEvent):
            prefix = f"State ({event.origin.value}): "
            if event.state == WorkflowRunState.IN_PROGRESS:
                print(prefix + "IN_PROGRESS")
            elif event.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS:
                print(prefix + "IN_PROGRESS_PENDING_REQUESTS (requests in flight)")
            elif event.state == WorkflowRunState.IDLE:
                print(prefix + "IDLE (no active work)")
            elif event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS:
                print(prefix + "IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)")
            else:
                print(prefix + str(event.state))
        elif isinstance(event, WorkflowOutputEvent):
            print(f"Workflow output ({event.origin.value}): {event.data}")
        elif isinstance(event, ExecutorFailedEvent):
            print(
                f"Executor failed ({event.origin.value}): "
                f"{event.executor_id} {event.details.error_type}: {event.details.message}"
            )
        elif isinstance(event, WorkflowFailedEvent):
            details = event.details
            print(f"Workflow failed ({event.origin.value}): {details.error_type}: {details.message}")
        else:
            # Fallback for other events that weren't explicitly handled
            print(f"{event.__class__.__name__} ({event.origin.value}): {event}")


await main()

### Exercise: Implement Your Own Workflow Scenario

Now that you’ve explored workflow basics, agents, and seen how custom executors and streaming events work, it’s time to design your own scenario. This exercise will help you apply everything you’ve learned and experiment with advanced features.

**Goal** - create a workflow that:

- Includes multiple agents with complementary roles.
- Wraps agents in custom executors for flexibility.
- Uses WorkflowContext to model type-safe data flow.
- Leverages function tools for richer capabilities.
- Implements rich logging for observability.
- Experiments with prompt variations to influence behaviour.

In [None]:

# Your implementation here

# 1. Define your scenario and specialized agents with complementary roles, ensure instructions are detailed and clear

# 2. Wrap agents in custom executors or turn them into tools

# 3. Configure WorkflowContext for type-safe data flow

# 4. Consider adding function tools

# 5. Implement streaming if applicable and an comprehensive logging strategy

# 6. Run and iterate