-
Notifications
You must be signed in to change notification settings - Fork 0
Recipe Cli Multi Command Dispatcher
A git-style CLI called mytool with three subcommands — sync, export, and cleanup — each driven by its own Plumber pipeline. Three pipelines share one service container, one configuration root, one logger, and one cancellation flow.
The point of the recipe is to show why dependency injection earns its keep when a CLI grows past one command. Every subcommand picks up the same scoped DbContext per invocation, the same logger with the command name on the scope, the same Ctrl-C cancellation through every middleware. None of it is threaded by hand.
Real CLIs hit this shape early. git has dozens of subcommands. dotnet has more. kubectl has so many the help text scrolls. Each subcommand does its own thing, but they all reach for the same handful of services.
Three commands:
-
mytool sync— fetches records from an external HTTP API and writes them to a local SQLite database. -
mytool export— reads records from the database and writes a JSON file to disk. -
mytool cleanup— deletes records older than a configured number of days.
Three subcommands, three pipelines, one shared service container.
- Configuration loads once.
- The HTTP client lifetime is owned by
IHttpClientFactory. - Each invocation gets its own DI scope, so each invocation gets a fresh
DbContext. - Ctrl-C cancels whatever's in flight, and the cancellation token reaches every middleware.
Spin up a console project and add the packages:
dotnet new console -n MyTool
cd MyTool
dotnet add package MSL.Plumber.Pipeline
dotnet add package Microsoft.EntityFrameworkCore.Sqlite
dotnet add package Microsoft.EntityFrameworkCore.Design
dotnet add package Microsoft.Extensions.Http
dotnet add package Microsoft.Extensions.Logging.ConsoleYou'll also want an appsettings.json for connection strings and the API base URL:
{
"ConnectionStrings": {
"Default": "Data Source=mytool.db"
},
"Api": {
"BaseUrl": "https://api.example.com/"
},
"Cleanup": {
"RetentionDays": 30
}
}Mark it Copy if newer so it lands in the bin directory:
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewer</CopyToOutputDirectory>
</None>
</ItemGroup>Each subcommand has a different request type. Keeping them separate lets each command name describe what it carries. Unit stands in for "no meaningful response" — these commands run for their side effects:
namespace MyTool;
internal record SyncRequest(string? Since);
internal record ExportRequest(string OutputPath);
internal record CleanupRequest(int? OverrideDays);The model and DbContext are mundane:
using Microsoft.EntityFrameworkCore;
namespace MyTool;
internal sealed class Record
{
public int Id { get; init; }
public string ExternalId { get; init; } = "";
public string Payload { get; init; } = "";
public DateTimeOffset FetchedAt { get; init; }
}
internal sealed class MyDbContext(DbContextOptions<MyDbContext> options)
: DbContext(options)
{
public DbSet<Record> Records => Set<Record>();
}And a typed HTTP client to talk to the API:
using System.Net.Http.Json;
namespace MyTool;
internal sealed class ApiClient(HttpClient http)
{
public async Task<IReadOnlyList<RecordDto>> FetchAsync(
string? since,
CancellationToken cancellationToken)
{
var url = since is null ? "records" : $"records?since={Uri.EscapeDataString(since)}";
var dtos = await http.GetFromJsonAsync<RecordDto[]>(url, cancellationToken);
return dtos ?? [];
}
}
internal record RecordDto(string Id, string Payload);The centerpiece. Pipelines builds one shared RequestHandlerBuilder recipe per command. Shared parts — configuration sources, logging, DbContext, typed HTTP client — live in a helper that every builder calls. The pipeline-specific middleware list goes in each command's Configure method.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Plumber;
namespace MyTool;
internal static class Pipelines
{
public static async Task<int> RunSyncAsync(string[] args, CancellationToken cancellationToken)
{
using var handler = BuildSync(args);
await handler.InvokeAsync(new SyncRequest(Since: ParseSince(args)), cancellationToken);
return 0;
}
public static async Task<int> RunExportAsync(string[] args, CancellationToken cancellationToken)
{
using var handler = BuildExport(args);
var output = ParseOutput(args) ?? "export.json";
await handler.InvokeAsync(new ExportRequest(output), cancellationToken);
return 0;
}
public static async Task<int> RunCleanupAsync(string[] args, CancellationToken cancellationToken)
{
using var handler = BuildCleanup(args);
await handler.InvokeAsync(new CleanupRequest(ParseDays(args)), cancellationToken);
return 0;
}
private static RequestHandler<SyncRequest, Unit> BuildSync(string[] args) =>
SharedBuilder<SyncRequest>(args)
.Build()
.Use<LoggingMiddleware<SyncRequest, Unit>>()
.Use<SyncMiddleware>();
private static RequestHandler<ExportRequest, Unit> BuildExport(string[] args) =>
SharedBuilder<ExportRequest>(args)
.Build()
.Use<LoggingMiddleware<ExportRequest, Unit>>()
.Use<ExportMiddleware>();
private static RequestHandler<CleanupRequest, Unit> BuildCleanup(string[] args) =>
SharedBuilder<CleanupRequest>(args)
.Build()
.Use<LoggingMiddleware<CleanupRequest, Unit>>()
.Use<CleanupMiddleware>();
private static RequestHandlerBuilder<TRequest, Unit> SharedBuilder<TRequest>(string[] args)
where TRequest : notnull =>
RequestHandlerBuilder.Create<TRequest, Unit>(args)
.AddJsonFile("appsettings.json", optional: true)
.AddEnvironmentVariables("MYTOOL_")
.ConfigureLogging(logging => logging
.SetMinimumLevel(LogLevel.Information)
.AddSimpleConsole(o => o.SingleLine = true))
.ConfigureServices((services, configuration) =>
{
var connection = configuration.GetConnectionString("Default")
?? "Data Source=mytool.db";
services.AddDbContext<MyDbContext>(o => o.UseSqlite(connection));
var apiBase = configuration["Api:BaseUrl"]
?? throw new InvalidOperationException("Api:BaseUrl is required");
services
.AddHttpClient<ApiClient>(c => c.BaseAddress = new Uri(apiBase));
services.Configure<CleanupOptions>(configuration.GetSection("Cleanup"));
});
// illustrative arg parsing — replace with System.CommandLine if you need flags
private static string? ParseSince(string[] args) => GetFlag(args, "--since");
private static string? ParseOutput(string[] args) => GetFlag(args, "--out");
private static int? ParseDays(string[] args) =>
int.TryParse(GetFlag(args, "--days"), out var d) ? d : null;
private static string? GetFlag(string[] args, string name)
{
var i = Array.IndexOf(args, name);
return i >= 0 && i + 1 < args.Length ? args[i + 1] : null;
}
}
internal sealed class CleanupOptions
{
public int RetentionDays { get; init; } = 30;
}A few things worth pointing out:
-
SharedBuilder<TRequest>is generic over the request type so the same registration code applies to all three pipelines. The response type is alwaysUnit— these commands run for effects, not return values. -
AddDbContext<MyDbContext>registersMyDbContextas scoped (the EF Core default). Since eachInvokeAsynccreates a new DI scope, each invocation sees a freshDbContext. -
AddHttpClient<ApiClient>registers the typed client and letsIHttpClientFactorymanage the underlyingHttpMessageHandlerpool. If you later add a retry policy or a circuit breaker, this is the line you chain it onto. - Logging is configured once, shared across all three pipelines.
- Configuration is opt-in — explicit
AddJsonFileandAddEnvironmentVariablescalls. Plumber v3 loads nothing automatically; you pick the sources you want.
Three command-specific middleware plus one shared logging wrapper. Logging is generic so it works for all three pipelines without copy/paste:
using Microsoft.Extensions.Logging;
using Plumber;
namespace MyTool;
internal sealed class LoggingMiddleware<TRequest, TResponse>(
RequestMiddleware<TRequest, TResponse> next,
ILogger<LoggingMiddleware<TRequest, TResponse>> logger)
where TRequest : notnull
{
public async Task InvokeAsync(RequestContext<TRequest, TResponse> context)
{
using var scope = logger.BeginScope("{Command} {RequestId}",
typeof(TRequest).Name, context.Id);
logger.LogInformation("started");
try
{
await next(context);
logger.LogInformation("completed in {ElapsedMs}ms",
context.Elapsed.TotalMilliseconds);
}
catch (OperationCanceledException)
{
logger.LogWarning("canceled after {ElapsedMs}ms",
context.Elapsed.TotalMilliseconds);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "failed after {ElapsedMs}ms",
context.Elapsed.TotalMilliseconds);
throw;
}
}
}ILogger<T> is a singleton, so constructor injection is the right place for it. The middleware itself is constructed once at registration and reused for every invocation, which matches the singleton lifetime of the logger. Generic over (TRequest, TResponse) means one definition serves all three pipelines.
SyncMiddleware calls the typed HTTP client and writes results to the database. Both dependencies are method-injected because both have lifetimes that need a fresh resolution per request — MyDbContext is scoped, and ApiClient (registered via AddHttpClient<T>) is transient with a pooled handler underneath.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Plumber;
namespace MyTool;
internal sealed class SyncMiddleware(RequestMiddleware<SyncRequest, Unit> next)
{
public async Task InvokeAsync(
RequestContext<SyncRequest, Unit> context,
ApiClient api,
MyDbContext db,
ILogger<SyncMiddleware> logger)
{
await db.Database.EnsureCreatedAsync(context.CancellationToken);
var dtos = await api.FetchAsync(context.Request.Since, context.CancellationToken);
logger.LogInformation("fetched {Count} record(s)", dtos.Count);
var existing = await db.Records
.Select(r => r.ExternalId)
.ToHashSetAsync(context.CancellationToken);
var fresh = dtos
.Where(d => !existing.Contains(d.Id))
.Select(d => new Record
{
ExternalId = d.Id,
Payload = d.Payload,
FetchedAt = DateTimeOffset.UtcNow,
})
.ToList();
if (fresh.Count > 0)
{
db.Records.AddRange(fresh);
await db.SaveChangesAsync(context.CancellationToken);
}
logger.LogInformation("inserted {Count} new record(s)", fresh.Count);
await next(context);
}
}ExportMiddleware reads from the database and writes JSON to a file:
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Plumber;
namespace MyTool;
internal sealed class ExportMiddleware(RequestMiddleware<ExportRequest, Unit> next)
{
public async Task InvokeAsync(
RequestContext<ExportRequest, Unit> context,
MyDbContext db,
ILogger<ExportMiddleware> logger)
{
var records = await db.Records
.OrderBy(r => r.FetchedAt)
.ToListAsync(context.CancellationToken);
await using var file = File.Create(context.Request.OutputPath);
await JsonSerializer.SerializeAsync(file, records, cancellationToken: context.CancellationToken);
logger.LogInformation("wrote {Count} record(s) to {Path}",
records.Count, context.Request.OutputPath);
await next(context);
}
}CleanupMiddleware runs a delete query against the database. IOptions<CleanupOptions> is method-injected too, so the registered options instance reaches the middleware on every invocation:
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Plumber;
namespace MyTool;
internal sealed class CleanupMiddleware(RequestMiddleware<CleanupRequest, Unit> next)
{
public async Task InvokeAsync(
RequestContext<CleanupRequest, Unit> context,
MyDbContext db,
IOptions<CleanupOptions> options,
ILogger<CleanupMiddleware> logger)
{
var days = context.Request.OverrideDays ?? options.Value.RetentionDays;
var cutoff = DateTimeOffset.UtcNow.AddDays(-days);
var deleted = await db.Records
.Where(r => r.FetchedAt < cutoff)
.ExecuteDeleteAsync(context.CancellationToken);
logger.LogInformation("deleted {Count} record(s) older than {Cutoff:O}",
deleted, cutoff);
await next(context);
}
}Four payoffs across the three pipelines:
-
Fresh
DbContextper invocation, automatically. Scoped registration plus a per-invocation DI scope. -
HTTP client lifetime managed by the framework.
IHttpClientFactoryowns the handler pool. -
Configuration loaded once and shared. Every middleware reads from one
IConfiguration. - Cancellation flows uniformly. Ctrl-C reaches every middleware, EF call, and HTTP request.
- Structured logging with the command name on the scope. One generic middleware serves all three pipelines.
Each is unpacked below.
EF Core's AddDbContext registers MyDbContext as scoped. Plumber creates a new DI scope on every InvokeAsync call. Method injection on InvokeAsync resolves MyDbContext from that scope.
Run the sync command twice in the same process and each call sees a clean DbContext with no leaked tracking state — without writing a single using block.
AddHttpClient<ApiClient> registers ApiClient as transient and hands the underlying HttpMessageHandler pool to IHttpClientFactory. The handler gets recycled on a schedule. The pooled connections get reused across invocations. The DNS-pinning bug that bites every hand-rolled HttpClient is solved.
You picked it up by writing one line.
AddJsonFile, AddEnvironmentVariables, and the implicit AddCommandLine each run once during Build(). The resulting IConfiguration is registered with the service provider so any middleware can pull options from it.
Override the API URL with MYTOOL_Api__BaseUrl=... and every command sees it.
Ctrl-C trips the CancellationTokenSource in Program.Main, which gets passed to InvokeAsync, which becomes context.CancellationToken inside every middleware. EF Core, HttpClient, and JsonSerializer all accept the token. The cancel propagates without anyone wiring it up.
LoggingMiddleware opens a BeginScope with the request type's name and the per-request Id. Every log line emitted downstream gets that scope automatically. The same middleware works for all three pipelines because it's generic.
Every one of those would be possible without DI. Each would also be a place to forget something. The pipeline shape gives you somewhere to register the wiring once and forget about it — and DI gives you a way to share that wiring across pipelines that would otherwise have nothing in common.
The dispatch is plain. Parse args[0], hand off to the right pipeline, return its exit code. Ctrl-C cancels via a CancellationTokenSource:
using MyTool;
var args = Environment.GetCommandLineArgs();
var command = args.Length > 1 ? args[1] : "help";
var commandArgs = args.Length > 2 ? args[2..] : [];
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
try
{
return command switch
{
"sync" => await Pipelines.RunSyncAsync(commandArgs, cts.Token),
"export" => await Pipelines.RunExportAsync(commandArgs, cts.Token),
"cleanup" => await Pipelines.RunCleanupAsync(commandArgs, cts.Token),
"help" or "--help" or "-h" => PrintHelp(),
_ => UnknownCommand(command),
};
}
catch (OperationCanceledException)
{
Console.Error.WriteLine("canceled");
return 130;
}
static int PrintHelp()
{
Console.WriteLine("""
usage: mytool <command> [options]
commands:
sync [--since <iso-date>] fetch records from the API and store locally
export [--out <path>] write all records to a JSON file
cleanup [--days <n>] delete records older than N days
""");
return 0;
}
static int UnknownCommand(string command)
{
Console.Error.WriteLine($"unknown command: {command}");
return 64;
}System.CommandLine would give you proper option parsing, help generation, and tab completion. It's also a learning curve. For a recipe of this size, a switch on args[1] plus a couple of Array.IndexOf calls reads cleaner. Reach for System.CommandLine when the flag count starts to climb.
You could, in principle, define a single base class or sum-type request and route inside the pipeline:
// illustrative — shows the alternative shape, not the recommended approach
internal abstract record CommandRequest;
internal record SyncCommand(string? Since) : CommandRequest;
internal record ExportCommand(string OutputPath) : CommandRequest;
internal record CleanupCommand(int? OverrideDays) : CommandRequest;
handler.Use(async (context, next) =>
{
switch (context.Request)
{
case SyncCommand: await syncSteps(context); break;
case ExportCommand: await exportSteps(context); break;
case CleanupCommand: await cleanupSteps(context); break;
}
});This works. It also forces every middleware to either care about every command or guard with type checks. The pipeline becomes a router and the middleware become subroutines, which is a pattern MediatR fits better than Plumber.
Plumber's strength is composing the steps that handle one request shape. When you have three request shapes, three pipelines is usually the cleaner read. The shared builder helper keeps the duplication down to "which middleware do you want."
Each pipeline can be tested in isolation with PlumberApplicationFactory from Plumber.Testing. Swap the ApiClient for a stub, point MyDbContext at an in-memory SQLite database, and the rest of the pipeline runs unchanged.
using Microsoft.Data.Sqlite;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Plumber.Testing;
public sealed class SyncPipelineTests
{
[Fact]
public async Task SyncWritesNewRecordsToDatabaseAsync()
{
await using var connection = new SqliteConnection("DataSource=:memory:");
await connection.OpenAsync();
using var factory = new PlumberApplicationFactory<SyncRequest, Unit>(
args => Pipelines.SharedBuilder<SyncRequest>(args),
handler => handler
.Use<LoggingMiddleware<SyncRequest, Unit>>()
.Use<SyncMiddleware>())
.WithServices(services =>
{
services.RemoveAll<DbContextOptions<MyDbContext>>();
services.AddDbContext<MyDbContext>(o => o.UseSqlite(connection));
services.AddSingleton<ApiClient>(_ => new StubApiClient(
new RecordDto("a", "1"),
new RecordDto("b", "2")));
});
await factory.InvokeAsync(new SyncRequest(Since: null));
// assert through the pipeline's own provider — DbContext is scoped, so scope first
using var scope = factory.Services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<MyDbContext>();
Assert.Equal(2, await db.Records.CountAsync());
}
}The assertion resolves the pipeline's own MyDbContext through factory.Services — scope first, since DbContext is scoped.
Expose SharedBuilder<TRequest> as internal if your test project has InternalsVisibleTo (the sample declares it private), and the test reuses the production wiring directly. Full guidance lives in Testing — strategy, fixtures, and helpers — and PlumberApplicationFactory — the test factory's full surface.
- .NET 10
-
MSL.Plumber.Pipeline3.* -
MSL.Plumber.Pipeline.Testing3.* (test project) -
Microsoft.EntityFrameworkCore.Sqlite10.x -
Microsoft.Extensions.Http10.x -
Microsoft.Extensions.Logging.Console10.x
- Building a Pipeline — the full builder surface
- Middleware — delegate vs class, method injection vs constructor injection
-
Request Lifecycle — what
RequestContextcarries through the pipeline - Sample.Cli walkthrough — single-pipeline CLI in the repo
-
Testing — testing strategy and
PlumberApplicationFactory - Recipe: File Watcher — long-running standalone variant
- Home
Documents Plumber v4.x · Repository · MIT License · Report an issue
Getting Started
Pipeline (core)
Testing
Serilog Extensions
Diagnostics
Recipes
- AWS Lambda — API Gateway
- AWS Lambda — SQS
- Azure Functions — HTTP
- SQS polling console
- ASP.NET Core integration
- BackgroundService worker
- Webhook receiver
- Multi-command CLI
- File watcher
- Configuration reload
Repo · NuGet · NuGet — Testing · NuGet — Serilog · NuGet — Diagnostics