# fan_out_fan_in_edges.py - ELI5 Walkthrough
This notebook walks through `python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py` with diagrams and commentary.


## Big Picture
One dispatcher fans out a marketing brief to researcher, marketer, and legal agents. Their responses are fanned back in and aggregated into a single report.


## Key Ingredients
- Agent executors created by `AzureOpenAIChatClient`.
- Fan-out edges from the dispatcher to multiple experts.
- Fan-in edges into an aggregator that merges structured agent responses.


### Workflow Diagram
```mermaid
flowchart LR
    Start(["Launch Prompt"]) --> Dispatcher[[DispatchToExperts]]
    Dispatcher --> Researcher[[researcher AgentExecutor]]
    Dispatcher --> Marketer[[marketer AgentExecutor]]
    Dispatcher --> Legal[[legal AgentExecutor]]
    Researcher --> Aggregator[[AggregateInsights]]
    Marketer --> Aggregator
    Legal --> Aggregator
    Aggregator --> Output(["Consolidated Insights"])
```


### Step 1: Imports and scenario description
We import Agent Framework classes, configure Azure authentication, and keep the docstring explaining the parallel expert pattern.


In [None]:
# Copyright (c) Microsoft. All rights reserved.
from dotenv import load_dotenv
load_dotenv()

import asyncio
from dataclasses import dataclass

from agent_framework import (  # Core chat primitives to build LLM requests
    AgentExecutor,  # Wraps an LLM agent for use inside a workflow
    AgentExecutorRequest,  # The message bundle sent to an AgentExecutor
    AgentExecutorResponse,  # The structured result returned by an AgentExecutor
    AgentRunEvent,  # Tracing event for agent execution steps
    ChatMessage,  # Chat message structure
    Executor,  # Base class for custom Python executors
    Role,  # Enum of chat roles (user, assistant, system)
    WorkflowBuilder,  # Fluent builder for wiring the workflow graph
    WorkflowContext,  # Per run context and event bus
    WorkflowOutputEvent,  # Event emitted when workflow yields output
    handler,  # Decorator to mark an Executor method as invokable
)
from agent_framework.azure import AzureOpenAIChatClient  # Client wrapper for Azure OpenAI chat models
from azure.identity import AzureCliCredential  # Uses your az CLI login for credentials
from typing_extensions import Never

"""
Sample: Concurrent fan out and fan in with three domain agents

A dispatcher fans out the same user prompt to research, marketing, and legal AgentExecutor nodes.
An aggregator then fans in their responses and produces a single consolidated report.

Purpose:
Show how to construct a parallel branch pattern in workflows. Demonstrate:
- Fan out by targeting multiple AgentExecutor nodes from one dispatcher.
- Fan in by collecting a list of AgentExecutorResponse objects and reducing them to a single result.
- Simple tracing using AgentRunEvent to observe execution order and progress.

Prerequisites:
- Familiarity with WorkflowBuilder, executors, edges, events, and streaming runs.
- Azure OpenAI access configured for AzureOpenAIChatClient. Log in with Azure CLI and set any required environment variables.
- Comfort reading AgentExecutorResponse.agent_run_response.text for assistant output aggregation.
"""




### Step 2: Dispatcher fans out to expert agents
`DispatchToExperts` clones the user prompt and sends it to each expert `AgentExecutor`.


In [None]:
class DispatchToExperts(Executor):
    """Dispatches the incoming prompt to all expert agent executors for parallel processing (fan out)."""

    def __init__(self, expert_ids: list[str], id: str | None = None):
        super().__init__(id=id or "dispatch_to_experts")
        self._expert_ids = expert_ids

    @handler
    async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        # Wrap the incoming prompt as a user message for each expert and request a response.
        # Each send_message targets a different AgentExecutor by id so that branches run in parallel.
        initial_message = ChatMessage(Role.USER, text=prompt)
        for expert_id in self._expert_ids:
            await ctx.send_message(
                AgentExecutorRequest(messages=[initial_message], should_respond=True),
                target_id=expert_id,
            )


@dataclass
class AggregatedInsights:
    """Typed container for the aggregator to hold per domain strings before formatting."""

    research: str
    marketing: str
    legal: str




### Step 3: Aggregator fans responses back in
`AggregateInsights` turns the list of `AgentExecutorResponse` objects into a consolidated report.


In [None]:
class AggregateInsights(Executor):
    """Aggregates expert agent responses into a single consolidated result (fan in)."""

    def __init__(self, expert_ids: list[str], id: str | None = None):
        super().__init__(id=id or "aggregate_insights")
        self._expert_ids = expert_ids

    @handler
    async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
        # Map responses to text by executor id for a simple, predictable demo.
        by_id: dict[str, str] = {}
        for r in results:
            # AgentExecutorResponse.agent_run_response.text is the assistant text produced by the agent.
            by_id[r.executor_id] = r.agent_run_response.text

        research_text = by_id.get("researcher", "")
        marketing_text = by_id.get("marketer", "")
        legal_text = by_id.get("legal", "")

        aggregated = AggregatedInsights(
            research=research_text,
            marketing=marketing_text,
            legal=legal_text,
        )

        # Provide a readable, consolidated string as the final workflow result.
        consolidated = (
            "Consolidated Insights\n"
            "====================\n\n"
            f"Research Findings:\n{aggregated.research}\n\n"
            f"Marketing Angle:\n{aggregated.marketing}\n\n"
            f"Legal/Compliance Notes:\n{aggregated.legal}\n"
        )

        await ctx.yield_output(consolidated)




### Step 4: Build and stream the workflow
`main()` configures each expert agent, wires the graph, and streams events so you can watch agent execution before the final output.


In [None]:
async def main() -> None:
    # 1) Create agent executors for domain experts
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    researcher = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
                " opportunities, and risks."
            ),
        ),
        id="researcher",
    )
    marketer = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
                " aligned to the prompt."
            ),
        ),
        id="marketer",
    )
    legal = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
                " based on the prompt."
            ),
        ),
        id="legal",
    )

    expert_ids = [researcher.id, marketer.id, legal.id]

    dispatcher = DispatchToExperts(expert_ids=expert_ids, id="dispatcher")
    aggregator = AggregateInsights(expert_ids=expert_ids, id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [researcher, marketer, legal])  # Parallel branches
        .add_fan_in_edges([researcher, marketer, legal], aggregator)  # Join at the aggregator
        .build()
    )

    # 3) Run with a single prompt and print progress plus the final consolidated output
    async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
        if isinstance(event, AgentRunEvent):
            # Show which agent ran and what step completed for lightweight observability.
            print(event)
        elif isinstance(event, WorkflowOutputEvent):
            print("===== Final Aggregated Output =====")
            print(event.data)




### Step 5: Try it yourself
Use the helper below. In notebooks it awaits `main()` on the active loop; in scripts it falls back to `asyncio.run(main())`.


In [None]:
import asyncio

# Helper for notebooks vs. scripts
loop = asyncio.get_event_loop()
if loop.is_running():
    # Jupyter/VS Code notebooks already have an event loop, so await directly.
    await main()
else:
    asyncio.run(main())
