Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions skills/dotnet-microsoft-agent-framework/SKILL.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: dotnet-microsoft-agent-framework
version: "1.4.0"
version: "1.5.0"
category: "AI"
description: "Build .NET AI agents and multi-agent workflows with Microsoft Agent Framework using the right agent type, threads, tools, workflows, hosting protocols, and enterprise guardrails."
compatibility: "Requires preview-era Microsoft Agent Framework packages and a .NET application that truly needs agentic or workflow orchestration."
Expand Down Expand Up @@ -54,6 +54,7 @@ flowchart LR
- `AgentResponse` and `AgentResponseUpdate` are not just text containers. They can include tool calls, tool results, structured output, reasoning-like updates, and response metadata.
- `ChatClientAgent` is the safest default when you already have an `IChatClient` and do not need a hosted-agent service.
- `Workflow` is an explicit graph of executors and edges. Use it when the control flow must stay inspectable, typed, resumable, or human-steerable.
- `AgentWorkflowBuilder` provides high-level factory methods such as `BuildConcurrent` for common agent orchestration patterns. Use it when you need concurrent or sequential agent pipelines without writing custom executor classes.
- Hosting layers such as OpenAI-compatible HTTP, A2A, and AG-UI are adapters over your in-process agent or workflow. They do not replace the core architecture choice.
- Durable agents are a hosting and persistence decision for Azure Functions. They are not the default answer for ordinary app-level orchestration.

Expand All @@ -65,7 +66,7 @@ flowchart LR
| OpenAI-style future-facing APIs, background responses, or richer response state | Responses-based agent | Better fit for new OpenAI-compatible integrations |
| Simple client-managed chat history | Chat Completions agent | Keeps request/response simple |
| Service-hosted agents and service-owned threads/tools | Azure AI Foundry Agent or other hosted agent | Managed runtime is the requirement |
| Typed multi-step orchestration | `Workflow` | Control flow stays explicit and testable |
| Typed multi-step orchestration | `Workflow` or `AgentWorkflowBuilder` helpers | Control flow stays explicit and testable; use `BuildConcurrent` for agent fan-out/fan-in |
| Week-long or failure-resilient Azure execution | Durable agent on Azure Functions | Durable Task gives replay and persisted state |
| Agent-to-agent interoperability | A2A hosting or A2A proxy agent | This is protocol-level delegation, not local inference |
| Browser or web UI protocol integration | AG-UI | Designed for remote UI sync and approval flows |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@ zone_pivot_groups: programming-languages
author: TaoChenOSU
ms.topic: tutorial
ms.author: taochen
ms.date: 09/29/2025
ms.date: 03/17/2026
ms.service: agent-framework
---

# Create a Simple Concurrent Workflow

This tutorial demonstrates how to create a concurrent workflow using Agent Framework. You'll learn to implement fan-out and fan-in patterns that enable parallel processing, allowing multiple executors or agents to work simultaneously and then aggregate their results.
This tutorial demonstrates how to create a concurrent workflow using Agent Framework. You'll learn to implement fan-out and fan-in patterns that enable parallel processing, allowing multiple agents to work simultaneously on the same input and then aggregate their results.

::: zone pivot="programming-language-csharp"

## What You'll Build

You'll create a workflow that:

- Takes a question as input (for example, "What is temperature?")
- Sends the same question to two expert AI agents simultaneously (Physicist and Chemist)
- Collects and combines responses from both agents into a single output
- Demonstrates concurrent execution with AI agents using fan-out/fan-in patterns
- Takes a user message as input (for example, "Hello, world!")
- Sends the same message to multiple translation agents simultaneously
- Collects and aggregates responses from all agents into a single output
- Demonstrates concurrent execution with `AgentWorkflowBuilder.BuildConcurrent`

### Concepts Covered

Expand Down Expand Up @@ -50,7 +50,7 @@ dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
```

## Step 2: Setup Dependencies and Azure OpenAI
## Step 2: Set Up Dependencies and Azure OpenAI

Start by setting up your project with the required NuGet packages and Azure OpenAI client:

Expand All @@ -70,158 +70,149 @@ public static class Program
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
```

## Step 3: Create Expert AI Agents
## Step 3: Create Specialized AI Agents

Create two specialized AI agents that will provide expert perspectives:
Create multiple specialized agents that will each process the same input concurrently:

```csharp
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);

ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
// Helper method to create a translation agent for a target language
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient client) =>
new(client,
$"You are a translation assistant who only responds in {targetLanguage}. " +
$"Respond to any input by outputting the name of the input language and then " +
$"translating the input to {targetLanguage}.");

// Create translation agents for concurrent processing
var translationAgents = new[] { "French", "Spanish", "English" }
.Select(lang => GetTranslationAgent(lang, chatClient));
```

## Step 4: Create the Start Executor
## Step 4: Build the Concurrent Workflow

Create an executor that initiates the concurrent processing by sending input to multiple agents:
Use `AgentWorkflowBuilder.BuildConcurrent` to create the concurrent workflow from the agent collection. The builder automatically handles the fan-out and fan-in logic:

```csharp
var startExecutor = new ConcurrentStartExecutor();
// Build the concurrent workflow - fan-out and fan-in are handled automatically
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);
```

The `ConcurrentStartExecutor` implementation:
## Step 5: Execute the Workflow

Run the workflow, send the turn token to kick off the agents, and capture the streaming output:

```csharp
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Execute the workflow in streaming mode
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);

// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
// Send a turn token to start agent processing
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Data}");
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = (List<ChatMessage>)outputEvt.Data!;
break;
}
}

Console.WriteLine("===== Final Aggregated Results =====");
foreach (var msg in result)
{
Console.WriteLine($"{msg.Role}: {msg.Content}");
}
}
}
```

## Step 5: Create the Aggregation Executor
## How It Works

Create an executor that collects and combines responses from multiple agents:
1. **Fan-Out**: `AgentWorkflowBuilder.BuildConcurrent` distributes the same input to all agents simultaneously.
2. **Parallel Processing**: All agents process the same message concurrently, each providing their unique perspective.
3. **Turn Token**: `TurnToken` signals agents to begin processing the queued message.
4. **Fan-In / Aggregation**: Results from all agents are automatically collected into a `List<ChatMessage>` output.

```csharp
var aggregationExecutor = new ConcurrentAggregationExecutor();
```
## Key Concepts

- **`AgentWorkflowBuilder.BuildConcurrent(agents)`**: High-level builder method that creates a concurrent workflow from an `IEnumerable<AIAgent>`. Handles fan-out and fan-in automatically without requiring custom executor classes.
- **Custom Aggregator**: An optional `Func<IList<List<ChatMessage>>, List<ChatMessage>>` overload lets you provide custom aggregation logic.
- **Turn Tokens**: Use `TurnToken` to signal agents to begin processing queued messages.
- **`AgentResponseUpdateEvent`**: Streaming event for real-time per-agent progress.
- **`WorkflowOutputEvent`**: Terminal event carrying the aggregated `List<ChatMessage>` from all agents.

The `ConcurrentAggregationExecutor` implementation:
## Advanced: Manual Fan-Out / Fan-In with Custom Executors

For scenarios requiring fine-grained control over the dispatcher or aggregation logic, you can build the concurrent graph directly with `WorkflowBuilder`:

```csharp
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
// Custom executor that dispatches the user message and turn token to all connected agents
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}

// Custom executor that aggregates individual ChatMessage responses from each agent
internal sealed class ConcurrentAggregationExecutor(int agentCount) :
Executor<ChatMessage>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];

/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);

if (this._messages.Count == 2)
this._messages.Add(message);
if (this._messages.Count == agentCount)
{
var formattedMessages = string.Join(Environment.NewLine,
var formatted = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
await context.YieldOutputAsync(formatted, cancellationToken);
}
}
}
```

## Step 6: Build the Workflow

Connect the executors and agents using fan-out and fan-in edge patterns:

```csharp
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
```

## Step 7: Execute the Workflow

Run the workflow and capture the streaming output:
Build the graph manually:

```csharp
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
var startExecutor = new ConcurrentStartExecutor();
var aggregationExecutor = new ConcurrentAggregationExecutor(agentCount: 2);

var physicistAgent = new ChatClientAgent(chatClient, name: "Physicist",
instructions: "You are an expert in physics.");
var chemistAgent = new ChatClientAgent(chatClient, name: "Chemist",
instructions: "You are an expert in chemistry.");

var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicistAgent, chemistAgent])
.AddFanInEdge(aggregationExecutor, sources: [physicistAgent, chemistAgent])
.WithOutputFrom(aggregationExecutor)
.Build();
```

## How It Works

1. **Fan-Out**: The `ConcurrentStartExecutor` receives the input question and the fan-out edge sends it to both the Physicist and Chemist agents simultaneously.
2. **Parallel Processing**: Both AI agents process the same question concurrently, each providing their expert perspective.
3. **Fan-In**: The `ConcurrentAggregationExecutor` collects `ChatMessage` responses from both agents.
4. **Aggregation**: Once both responses are received, the aggregator combines them into a formatted output.

## Key Concepts

- **Fan-Out Edges**: Use `AddFanOutEdge()` to distribute the same input to multiple executors or agents.
- **Fan-In Edges**: Use `AddFanInEdge()` to collect results from multiple source executors.
- **AI Agent Integration**: AI agents can be used directly as executors in workflows.
- **Executor Base Class**: Custom executors inherit from `Executor<TInput>` and override the `HandleAsync` method.
- **Turn Tokens**: Use `TurnToken` to signal agents to begin processing queued messages.
- **Streaming Execution**: Use `StreamAsync()` to get real-time updates as the workflow progresses.
Use this approach when:
- You need a custom dispatcher that does more than broadcast a message and turn token.
- Your aggregation logic requires domain-specific processing before yielding output.
- The agent count or structure is not known at build time and cannot be expressed with `BuildConcurrent`.

## Complete Implementation

For the complete working implementation of this concurrent workflow with AI agents, see the [Concurrent/Program.cs](https://github.com/microsoft/agent-framework/blob/main/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Program.cs) sample in the Agent Framework repository.
For the complete working implementation of this concurrent workflow with AI agents, see the [concurrent orchestration sample](https://github.com/microsoft/agent-framework/tree/main/dotnet/samples/GettingStarted/Workflows/Concurrent) in the Agent Framework repository.

::: zone-end

Expand Down
12 changes: 12 additions & 0 deletions skills/dotnet-microsoft-agent-framework/references/workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ Use `AgentWorkflowBuilder` when:
- the orchestration matches built-in agent patterns
- you want sequential or concurrent pipeline helpers

### `AgentWorkflowBuilder.BuildConcurrent`

```csharp
var workflow = AgentWorkflowBuilder.BuildConcurrent(agents);
```

- Accepts `IEnumerable<AIAgent>` and an optional custom aggregator `Func<IList<List<ChatMessage>>, List<ChatMessage>>`.
- Handles fan-out and fan-in automatically without requiring custom executor classes.
- Default aggregator returns the last message from each responding agent.
- After `InProcessExecution.StreamAsync`, send `TurnToken(emitEvents: true)` via `run.TrySendMessageAsync` to kick off agents.
- Use manual `WorkflowBuilder` with `AddFanOutEdge`/`AddFanInEdge` only when you need a custom dispatcher or aggregation logic beyond what the built-in overload supports.

## Workflow Patterns

| Pattern | Best For | Main Risk |
Expand Down
Loading