# GraphFlow (Workflows)

In this section you’ll learn how to create an multi-agent workflow using [GraphFlow](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.GraphFlow), or simply “flow” for short. It uses structured execution and precisely controls how agents interact to accomplish a task.

We’ll first show you how to create and run a flow. We’ll then explain how to observe and debug flow behavior, and discuss important operations for managing execution.

AutoGen AgentChat provides a team for directed graph execution:

- [GraphFlow](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.GraphFlow): A team that follows a [DiGraph](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.DiGraph) to control the execution flow between agents. Supports sequential, parallel, conditional, and looping behaviors.

**When should you use [GraphFlow](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.GraphFlow)?** *Use Graph when you need strict control over the order in which agents act, or when different outcomes must lead to different next steps. Start with a simple team such as [RoundRobinGroupChat](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.RoundRobinGroupChat) or [SelectorGroupChat](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.SelectorGroupChat) if ad-hoc conversation flow is sufficient. Transition to a structured workflow when your task requires deterministic control, conditional branching, or handling complex multi-step processes with cycles.*

## Creating and Running a Flow

[DiGraphBuilder](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.DiGraphBuilder) is a fluent utility that lets you easily construct execution graphs for workflows. It supports building:

- Sequential chains

- Parallel fan-outs

- Conditional branching

- Loops with safe exit conditions

Each node in the graph represents an agent, and edges define the allowed execution paths. Edges can optionally have conditions based on agent messages.

### Sequential Flow

We will begin by creating a simple workflow where a **writer** drafts a paragraph and a **reviewer** provides feedback. This graph terminates after the reviewer comments on the writer.

Note, the flow automatically computes all the source and leaf nodes of the graph and the execution starts at all the source nodes in the graph and completes execution when no nodes are left to execute.

In [1]:
import os
from dotenv import load_dotenv
load_dotenv()

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create an OpenAI model client
client = OpenAIChatCompletionClient(model="gpt-4.1-nano")

# Create the writer agent
writer = AssistantAgent("writer",
                        model_client=client,
                        system_message="Draft a short paragraph on climate change.")

# Create the reviewer agent
reviewer = AssistantAgent("reviewer",
                          model_client=client,
                          system_message="Review the draft and suggest improvements.")

# Build the graph
builder = DiGraphBuilder()
builder.add_node(writer).add_node(reviewer)
builder.add_edge(writer, reviewer)

# Build and validate the graph
graph = builder.build()

# Create the flow
flow = GraphFlow([writer, reviewer], graph=graph)

In [2]:
from autogen_agentchat.ui import Console

# Use `asyncio.run(...)` and wrap the below in a async function when running in a script.
# stream = flow.run_stream(task="Write a short paragraph about climate change.")
# async for event in stream:  # type: ignore
#    print(event)
#    print("-"*20)

await Console(flow.run_stream(task="Write a short paragraph (30 words) about climate change."))

---------- TextMessage (user) ----------
Write a short paragraph (30 words) about climate change.
---------- TextMessage (writer) ----------
Climate change, driven by greenhouse gas emissions, causes global warming, rising sea levels, and extreme weather events, threatening ecosystems, economies, and communities worldwide. Urgent action is essential to mitigate its impacts.
---------- TextMessage (reviewer) ----------
This is a solid paragraph. To improve clarity and flow, consider slight rephrasing for conciseness and impact:

"Climate change, caused by greenhouse gases, leads to global warming, rising sea levels, and extreme weather, threatening ecosystems and communities worldwide. Urgent action is vital to reduce its devastating effects."


TaskResult(messages=[TextMessage(id='ef614fa1-1c7a-4e5f-9902-8650acf14980', source='user', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 10, 35, 749620, tzinfo=datetime.timezone.utc), content='Write a short paragraph (30 words) about climate change.', type='TextMessage'), TextMessage(id='07aeb3f8-d756-40ad-b0b9-1154846e4054', source='writer', models_usage=RequestUsage(prompt_tokens=32, completion_tokens=41), metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 10, 36, 843736, tzinfo=datetime.timezone.utc), content='Climate change, driven by greenhouse gas emissions, causes global warming, rising sea levels, and extreme weather events, threatening ecosystems, economies, and communities worldwide. Urgent action is essential to mitigate its impacts.', type='TextMessage'), TextMessage(id='ae8a68c7-d5fc-4c65-899f-516ebcd871c5', source='reviewer', models_usage=RequestUsage(prompt_tokens=78, completion_tokens=63), metadata={}, created_at=datetime.datetime(2

### Parallel Flow with Join

We now create a slightly more complex flow:

- A **writer** drafts a paragraph.

- Two **editors** independently edit for grammar and style (parallel fan-out).

- A **final reviewer** consolidates their edits (join).

Execution starts at the **writer**, fans out to **editor1** and **editor2** simultaneously, and then both feed into the **final reviewer**.

In [3]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create an OpenAI model client
client = OpenAIChatCompletionClient(model="gpt-4.1-nano")

# Create the writer agent
writer = AssistantAgent("writer", model_client=client, system_message="Draft a short paragraph on climate change.")

# Create two editor agents
editor1 = AssistantAgent("editor1", model_client=client, system_message="Edit the paragraph for grammar.")

editor2 = AssistantAgent("editor2", model_client=client, system_message="Edit the paragraph for style.")

# Create the final reviewer agent
final_reviewer = AssistantAgent(
    "final_reviewer",
    model_client=client,
    system_message="Consolidate the grammar and style edits into a final version.",
)

# Build the workflow graph
builder = DiGraphBuilder()
builder.add_node(writer).add_node(editor1).add_node(editor2).add_node(final_reviewer)

# Fan-out from writer to editor1 and editor2
builder.add_edge(writer, editor1)
builder.add_edge(writer, editor2)

# Fan-in both editors into final reviewer
builder.add_edge(editor1, final_reviewer)
builder.add_edge(editor2, final_reviewer)

# Build and validate the graph
graph = builder.build()

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
)

# Run the workflow
await Console(flow.run_stream(task="Write a short paragraph about climate change."))

---------- TextMessage (user) ----------
Write a short paragraph about climate change.
---------- TextMessage (writer) ----------
Climate change refers to long-term shifts in temperature, weather patterns, and other environmental conditions caused primarily by the increase in greenhouse gases such as carbon dioxide and methane in the atmosphere. Human activities, including burning fossil fuels, deforestation, and industrial processes, have accelerated these changes, leading to more frequent and severe weather events like hurricanes, droughts, and floods. Addressing climate change requires global cooperation to reduce emissions, transition to renewable energy sources, and implement sustainable practices to protect ecosystems and future generations.
---------- TextMessage (editor1) ----------
Climate change refers to long-term shifts in temperature, weather patterns, and other environmental conditions caused primarily by the increase in greenhouse gases such as carbon dioxide and methane

TaskResult(messages=[TextMessage(id='46ef4204-82b5-4c93-a7b7-f070eeaf7a82', source='user', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 9, 34, 285696, tzinfo=datetime.timezone.utc), content='Write a short paragraph about climate change.', type='TextMessage'), TextMessage(id='f7dc11aa-5cb7-45f6-9779-0485b3c237b2', source='writer', models_usage=RequestUsage(prompt_tokens=28, completion_tokens=100), metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 9, 36, 304008, tzinfo=datetime.timezone.utc), content='Climate change refers to long-term shifts in temperature, weather patterns, and other environmental conditions caused primarily by the increase in greenhouse gases such as carbon dioxide and methane in the atmosphere. Human activities, including burning fossil fuels, deforestation, and industrial processes, have accelerated these changes, leading to more frequent and severe weather events like hurricanes, droughts, and floods. Addressing climate chang

## Message Filtering

### Execution Graph vs. Message Graph

In [GraphFlow](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.GraphFlow), the execution graph is defined using [DiGraph](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.teams.html#autogen_agentchat.teams.DiGraph), which controls the order in which agents execute. However, the execution graph does not control what messages an agent receives from other agents. By default, all messages are sent to all agents in the graph.

**Message filtering** is a separate feature that allows you to filter the messages received by each agent and limiting their model context to only the relevant information. The set of message filters defines the message graph in the flow.

Specifying the **message graph** can help with:

- Reduce hallucinations

- Control memory load

- Focus agents only on relevant information

You can use [MessageFilterAgent](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.agents.html#autogen_agentchat.agents.MessageFilterAgent) together with [MessageFilterConfig](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.agents.html#autogen_agentchat.agents.MessageFilterConfig) and [PerSourceFilter](https://microsoft.github.io/autogen/stable/reference/python/autogen_agentchat.agents.html#autogen_agentchat.agents.PerSourceFilter) to define these rules.

In [3]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Model client
client = OpenAIChatCompletionClient(model="gpt-4.1-nano")

# Create agents
researcher = AssistantAgent(
    "researcher", model_client=client, system_message="Summarize key facts about climate change."
)
analyst = AssistantAgent("analyst", model_client=client, system_message="Review the summary and suggest improvements.")
presenter = AssistantAgent(
    "presenter", model_client=client, system_message="Prepare a presentation slide based on the final summary."
)

# Apply message filtering
filtered_analyst = MessageFilterAgent(
    name="analyst",
    wrapped_agent=analyst,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="researcher", position="last", count=1)]),
)

filtered_presenter = MessageFilterAgent(
    name="presenter",
    wrapped_agent=presenter,
    filter=MessageFilterConfig(per_source=[PerSourceFilter(source="analyst", position="last", count=1)]),
)

# Build the flow
builder = DiGraphBuilder()
builder.add_node(researcher).add_node(filtered_analyst).add_node(filtered_presenter)
builder.add_edge(researcher, filtered_analyst).add_edge(filtered_analyst, filtered_presenter)

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=builder.build(),
)

# Run the flow
await Console(flow.run_stream(task="Summarize key facts about climate change."))

---------- TextMessage (user) ----------
Summarize key facts about climate change.
---------- TextMessage (researcher) ----------
Certainly! Here are some key facts about climate change:

1. **Global Warming**: The Earth's average surface temperature has increased significantly over the past century, primarily due to human activities such as burning fossil fuels.

2. **Greenhouse Gas Emissions**: Carbon dioxide (CO₂), methane (CH₄), and nitrous oxide (N₂O) are the main greenhouse gases, with CO₂ accounting for the majority of recent temperature rise.

3. **Human Influence**: The burning of coal, oil, and natural gas for energy, deforestation, and industrial processes have greatly contributed to increased greenhouse gas concentrations.

4. **Impacts**: Climate change leads to more frequent and severe weather events, rising sea levels, melting glaciers and polar ice, and disruptions to ecosystems and agriculture.

5. **Sea Level Rise**: Due to melting ice and thermal expansion of seawate

TaskResult(messages=[TextMessage(id='6b983218-225c-41a2-b55f-0dc573993e8f', source='user', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 19, 4, 178494, tzinfo=datetime.timezone.utc), content='Summarize key facts about climate change.', type='TextMessage'), TextMessage(id='4ed9b5f8-51cf-4246-9370-f1cbe375d290', source='researcher', models_usage=RequestUsage(prompt_tokens=30, completion_tokens=346), metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 19, 11, 227622, tzinfo=datetime.timezone.utc), content="Certainly! Here are some key facts about climate change:\n\n1. **Global Warming**: The Earth's average surface temperature has increased significantly over the past century, primarily due to human activities such as burning fossil fuels.\n\n2. **Greenhouse Gas Emissions**: Carbon dioxide (CO₂), methane (CH₄), and nitrous oxide (N₂O) are the main greenhouse gases, with CO₂ accounting for the majority of recent temperature rise.\n\n3. **Human Influence

## 🔁 Advanced Example: Conditional Loop + Filtered Summary

This example demonstrates:

- A loop between generator and reviewer (which exits when reviewer says “APPROVE”)

- A summarizer agent that only sees the first user input and the last reviewer message

In [4]:
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import (
    DiGraphBuilder,
    GraphFlow,
)
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

# Agents
generator = AssistantAgent("generator", model_client=model_client, system_message="Generate a list of creative ideas.")

reviewer = AssistantAgent(
    "reviewer",
    model_client=model_client,
    system_message="Review ideas and provide feedbacks, or just 'APPROVE' for final approval.",
)
summarizer_core = AssistantAgent(
    "summary", model_client=model_client, system_message="Summarize the user request and the final feedback."
)

# Filtered summarizer
filtered_summarizer = MessageFilterAgent(
    name="summary",
    wrapped_agent=summarizer_core,
    filter=MessageFilterConfig(
        per_source=[
            PerSourceFilter(source="user", position="first", count=1),
            PerSourceFilter(source="reviewer", position="last", count=1),
        ]
    ),
)

# Build graph with conditional loop
builder = DiGraphBuilder()
builder.add_node(generator).add_node(reviewer).add_node(filtered_summarizer)
builder.add_edge(generator, reviewer)
builder.add_edge(reviewer, filtered_summarizer, condition=lambda msg: "APPROVE" in msg.to_model_text())
builder.add_edge(reviewer, generator, condition=lambda msg: "APPROVE" not in msg.to_model_text())
builder.set_entry_point(generator)  # Set entry point to generator. Required if there are no source nodes.
graph = builder.build()

termination_condition = MaxMessageTermination(10)

# Create the flow
flow = GraphFlow(
    participants=builder.get_participants(),
    graph=graph,
    termination_condition=termination_condition
)

# Run the flow and pretty print the output in the console
await Console(flow.run_stream(task="Brainstorm ways to reduce plastic waste."))

---------- TextMessage (user) ----------
Brainstorm ways to reduce plastic waste.
---------- TextMessage (generator) ----------
Here are some creative ideas to reduce plastic waste:

1. **Community Plastic-Free Challenge**: Organize local challenges encouraging households to reduce or eliminate plastic usage for a month, providing resources and support.

2. **Upcycling Workshops**: Host workshops where participants can learn to repurpose plastic items into functional or decorative objects, such as turning plastic bottles into garden planters.

3. **Reusable Bulk Bins**: Set up a system in grocery stores where customers can bring their containers to fill with bulk items, reducing the need for plastic packaging.

4. **Plastic Credit System**: Create a rewards program for consumers who return or recycle plastic. Individuals can earn points for every piece of plastic they return to designated collection points.

5. **Eco-Friendly School Programs**: Implement educational programs in schools

TaskResult(messages=[TextMessage(id='ad62c29c-d6b5-45ad-a83a-3ab9628111d8', source='user', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 23, 22, 545525, tzinfo=datetime.timezone.utc), content='Brainstorm ways to reduce plastic waste.', type='TextMessage'), TextMessage(id='e3e9c954-d905-490e-94bc-f7333e291e5a', source='generator', models_usage=RequestUsage(prompt_tokens=27, completion_tokens=555), metadata={}, created_at=datetime.datetime(2025, 7, 28, 9, 23, 34, 156346, tzinfo=datetime.timezone.utc), content='Here are some creative ideas to reduce plastic waste:\n\n1. **Community Plastic-Free Challenge**: Organize local challenges encouraging households to reduce or eliminate plastic usage for a month, providing resources and support.\n\n2. **Upcycling Workshops**: Host workshops where participants can learn to repurpose plastic items into functional or decorative objects, such as turning plastic bottles into garden planters.\n\n3. **Reusable Bulk Bins**: 

## 🔁 Advanced Example: Cycles With Activation Group Examples

The following examples demonstrate how to use `activation_group` and `activation_condition` to handle complex dependency patterns in cyclic graphs, especially when multiple paths lead to the same target node.

### Example 1: Loop with Multiple Paths - “All” Activation (A→B→C→B)

In this scenario, we have A → B → C → B, where B has two incoming edges (from A and from C). By default, B requires all its dependencies to be satisfied before executing.

This example shows a review loop where both the initial input (A) and the feedback (C) must be processed before B can execute again.

In [5]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Model client
client = OpenAIChatCompletionClient(model="gpt-4o-mini")

# Create agents for A→B→C→B→E scenario
agent_a = AssistantAgent("A", model_client=client, system_message="Start the process and provide initial input.")
agent_b = AssistantAgent(
    "B",
    model_client=client,
    system_message="Process input from A or feedback from C. Say 'CONTINUE' if it's from A or 'STOP' if it's from C.",
)
agent_c = AssistantAgent("C", model_client=client, system_message="Review B's output and provide feedback.")
agent_e = AssistantAgent("E", model_client=client, system_message="Finalize the process.")

# Build the graph with activation groups
builder = DiGraphBuilder()
builder.add_node(agent_a).add_node(agent_b).add_node(agent_c).add_node(agent_e)

# A → B (initial path)
builder.add_edge(agent_a, agent_b, activation_group="initial")

# B → C
builder.add_edge(agent_b, agent_c, condition="CONTINUE")

# C → B (loop back - different activation group)
builder.add_edge(agent_c, agent_b, activation_group="feedback")

# B → E (exit condition)
builder.add_edge(agent_b, agent_e, condition="STOP")

termination_condition = MaxMessageTermination(10)
# Build and create flow
graph = builder.build()
flow = GraphFlow(participants=[agent_a, agent_b, agent_c, agent_e], graph=graph, termination_condition=termination_condition)

print("=== Example 1: A→B→C→B with 'All' Activation ===")
print("B will exit when it receives a message from C")
# await Console(flow.run_stream(task="Start a review process for a document."))

=== Example 1: A→B→C→B with 'All' Activation ===
B will exit when it receives a message from C


### Example 2: Loop with Multiple Paths - “Any” Activation (A→B→(C1,C2)→B)

In this more complex scenario, we have A → B → (C1, C2) → B, where:

- B fans out to both C1 and C2 in parallel

- Both C1 and C2 feed back to B

- B uses “any” activation, meaning it executes as soon as either C1 or C2 completes

This is useful for scenarios where you want the fastest response to trigger the next step.

In [6]:
# Create agents for A→B→(C1,C2)→B scenario
agent_a2 = AssistantAgent("A", model_client=client, system_message="Initiate a task that needs parallel processing.")
agent_b2 = AssistantAgent(
    "B",
    model_client=client,
    system_message="Coordinate parallel tasks. Say 'PROCESS' to start parallel work or 'DONE' to finish.",
)
agent_c1 = AssistantAgent("C1", model_client=client, system_message="Handle task type 1. Say 'C1_COMPLETE' when done.")
agent_c2 = AssistantAgent("C2", model_client=client, system_message="Handle task type 2. Say 'C2_COMPLETE' when done.")
agent_e = AssistantAgent("E", model_client=client, system_message="Finalize the process.")

# Build the graph with "any" activation
builder2 = DiGraphBuilder()
builder2.add_node(agent_a2).add_node(agent_b2).add_node(agent_c1).add_node(agent_c2).add_node(agent_e)

# A → B (initial)
builder2.add_edge(agent_a2, agent_b2)

# B → C1 and B → C2 (parallel fan-out)
builder2.add_edge(agent_b2, agent_c1, condition="PROCESS")
builder2.add_edge(agent_b2, agent_c2, condition="PROCESS")

# B → E (exit condition)
builder2.add_edge(agent_b2, agent_e, condition=lambda msg: "DONE" in msg.to_model_text())

# C1 → B and C2 → B (both in same activation group with "any" condition)
builder2.add_edge(
    agent_c1, agent_b2, activation_group="loop_back_group", activation_condition="any", condition="C1_COMPLETE"
)

builder2.add_edge(
    agent_c2, agent_b2, activation_group="loop_back_group", activation_condition="any", condition="C2_COMPLETE"
)

# Build and create flow
graph2 = builder2.build()
flow2 = GraphFlow(participants=[agent_a2, agent_b2, agent_c1, agent_c2, agent_e], graph=graph2)

print("=== Example 2: A→B→(C1,C2)→B with 'Any' Activation ===")
print("B will execute as soon as EITHER C1 OR C2 completes (whichever finishes first)")
# await Console(flow2.run_stream(task="Start a parallel processing task."))

=== Example 2: A→B→(C1,C2)→B with 'Any' Activation ===
B will execute as soon as EITHER C1 OR C2 completes (whichever finishes first)


### Example 3: Mixed Activation Groups

This example shows how different activation groups can coexist in the same graph. We have a scenario where:

- Node D receives inputs from multiple sources with different activation requirements

- Some dependencies use “all” activation (must wait for all inputs)

- Other dependencies use “any” activation (proceed on first input)

This pattern is useful for complex workflows where different types of dependencies have different urgency levels.

In [7]:
# Create agents for mixed activation scenario
agent_a3 = AssistantAgent("A", model_client=client, system_message="Provide critical input that must be processed.")
agent_b3 = AssistantAgent("B", model_client=client, system_message="Provide secondary critical input.")
agent_c3 = AssistantAgent("C", model_client=client, system_message="Provide optional quick input.")
agent_d3 = AssistantAgent("D", model_client=client, system_message="Process inputs based on different priority levels.")

# Build graph with mixed activation groups
builder3 = DiGraphBuilder()
builder3.add_node(agent_a3).add_node(agent_b3).add_node(agent_c3).add_node(agent_d3)

# Critical inputs that must ALL be present (activation_group="critical", activation_condition="all")
builder3.add_edge(agent_a3, agent_d3, activation_group="critical", activation_condition="all")
builder3.add_edge(agent_b3, agent_d3, activation_group="critical", activation_condition="all")

# Optional input that can trigger execution on its own (activation_group="optional", activation_condition="any")
builder3.add_edge(agent_c3, agent_d3, activation_group="optional", activation_condition="any")

# Build and create flow
graph3 = builder3.build()
flow3 = GraphFlow(participants=[agent_a3, agent_b3, agent_c3, agent_d3], graph=graph3)

print("=== Example 3: Mixed Activation Groups ===")
print("D will execute when:")
print("- BOTH A AND B complete (critical group with 'all' activation), OR")
print("- C completes (optional group with 'any' activation)")
print("This allows for both required dependencies and fast-path triggers.")
# await Console(flow3.run_stream(task="Process inputs with mixed priority levels."))

=== Example 3: Mixed Activation Groups ===
D will execute when:
- BOTH A AND B complete (critical group with 'all' activation), OR
- C completes (optional group with 'any' activation)
This allows for both required dependencies and fast-path triggers.


### Key Takeaways for Activation Groups

1. `activation_group`: Groups edges that point to the same target node, allowing you to define different dependency patterns.

2. `activation_condition`:

- `"all"` (default): Target node waits for ALL edges in the group to be satisfied

- `"any"`: Target node executes as soon as ANY edge in the group is satisfied

3. Use Cases:

- Cycles with multiple entry points: Different activation groups prevent conflicts

- Priority-based execution: Mix “all” and “any” conditions for different urgency levels

- Parallel processing with early termination: Use “any” to proceed with the fastest result

4. Best Practices:

- Use descriptive group names (`"critical"`, `"optional"`, `"feedback"`, etc.)

- Keep activation conditions consistent within the same group

- Test your graph logic with different execution paths

These patterns enable sophisticated workflow control while maintaining clear, understandable execution semantics.
