You can refer to agent-framework-dotnet 1.0.0 -> Samples -> 03-workflows -> Concurrent #-> MapReduce example.
c#
using Microsoft.Agents.AI.Workflows;
var startExecutor = new StartExecutor();
var uppercaseExecutor = new UppercaseExecutor();
var reverseExecutor = new ReverseExecutor();
var completeExecutor = new CompleteExecutor();
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, [reverseExecutor, uppercaseExecutor])
.AddFanInBarrierEdge([reverseExecutor, uppercaseExecutor], completeExecutor)
.WithOutputFrom(completeExecutor)
.Build();
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input: "Hello, World!");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompleted)
{
Console.WriteLine($"{executorCompleted.ExecutorId}: {executorCompleted.Data}");
}
else if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Workflow Output: {outputEvent.Data}");
}
else if (evt is WorkflowErrorEvent errorEvent)
{
Console.WriteLine($"Workflow Error: {errorEvent.Data}");
}
}
[SendsMessage(typeof(string))]
internal sealed partial class StartExecutor() : Executor<string>("StartExecutor")
{
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.SendMessageAsync(message, cancellationToken);
}
}
[SendsMessage(typeof(TransformComplete))]
internal sealed partial class UppercaseExecutor() : Executor<string>("UppercaseExecutor")
{
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"Executor: {this.Id} starting");
var str = message.ToUpperInvariant();
await Task.Delay(1000);
Console.WriteLine($"Executor: {this.Id} completed");
await context.SendMessageAsync(new TransformComplete(str), cancellationToken: cancellationToken);
}
}
[SendsMessage(typeof(TransformComplete))]
internal sealed partial class ReverseExecutor() : Executor<string>("ReverseExecutor")
{
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"Executor: {this.Id} starting");
var str = string.Concat(message.Reverse());
await Task.Delay(1000);
Console.WriteLine($"Executor: {this.Id} completed");
await context.SendMessageAsync(new TransformComplete(str), cancellationToken: cancellationToken);
}
}
[YieldsOutput(typeof(string))]
internal sealed partial class CompleteExecutor() : Executor<List<TransformComplete>>("CompleteExecutor")
{
public override async ValueTask HandleAsync(List<TransformComplete> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var str = string.Concat(message.Select(m => m.Value));
await context.YieldOutputAsync(str, cancellationToken);
}
}
internal sealed class TransformComplete(string value) : WorkflowEvent
{
public string Value { get; } = value;
}
Description
You can refer to agent-framework-dotnet 1.0.0 -> Samples -> 03-workflows -> Concurrent #-> MapReduce example.
Code Sample
c# using Microsoft.Agents.AI.Workflows; var startExecutor = new StartExecutor(); var uppercaseExecutor = new UppercaseExecutor(); var reverseExecutor = new ReverseExecutor(); var completeExecutor = new CompleteExecutor(); var workflow = new WorkflowBuilder(startExecutor) .AddFanOutEdge(startExecutor, [reverseExecutor, uppercaseExecutor]) .AddFanInBarrierEdge([reverseExecutor, uppercaseExecutor], completeExecutor) .WithOutputFrom(completeExecutor) .Build(); await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input: "Hello, World!"); await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine($"{executorCompleted.ExecutorId}: {executorCompleted.Data}"); } else if (evt is WorkflowOutputEvent outputEvent) { Console.WriteLine($"Workflow Output: {outputEvent.Data}"); } else if (evt is WorkflowErrorEvent errorEvent) { Console.WriteLine($"Workflow Error: {errorEvent.Data}"); } } [SendsMessage(typeof(string))] internal sealed partial class StartExecutor() : Executor<string>("StartExecutor") { public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) { await context.SendMessageAsync(message, cancellationToken); } } [SendsMessage(typeof(TransformComplete))] internal sealed partial class UppercaseExecutor() : Executor<string>("UppercaseExecutor") { public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) { Console.WriteLine($"Executor: {this.Id} starting"); var str = message.ToUpperInvariant(); await Task.Delay(1000); Console.WriteLine($"Executor: {this.Id} completed"); await context.SendMessageAsync(new TransformComplete(str), cancellationToken: cancellationToken); } } [SendsMessage(typeof(TransformComplete))] internal sealed partial class ReverseExecutor() : Executor<string>("ReverseExecutor") { public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) { Console.WriteLine($"Executor: {this.Id} starting"); var str = string.Concat(message.Reverse()); await Task.Delay(1000); Console.WriteLine($"Executor: {this.Id} completed"); await context.SendMessageAsync(new TransformComplete(str), cancellationToken: cancellationToken); } } [YieldsOutput(typeof(string))] internal sealed partial class CompleteExecutor() : Executor<List<TransformComplete>>("CompleteExecutor") { public override async ValueTask HandleAsync(List<TransformComplete> message, IWorkflowContext context, CancellationToken cancellationToken = default) { var str = string.Concat(message.Select(m => m.Value)); await context.YieldOutputAsync(str, cancellationToken); } } internal sealed class TransformComplete(string value) : WorkflowEvent { public string Value { get; } = value; }Error Messages / Stack Traces
Package Versions
Microsoft.Agents.AI:1.0.0
.NET Version
.Net 10.0
Additional Context
No response