Skip to content

Recipe Background Service Worker

Mark Lauter edited this page Jun 13, 2026 · 3 revisions

Background Service Worker

A long-running worker that consumes from a queue (or fires on a timer) and dispatches each unit of work through a Plumber pipeline. The worker lives inside a Generic Host — Host.CreateApplicationBuilder — and resolves the handler from the host's DI as a singleton, then invokes it once per item inside BackgroundService.ExecuteAsync.

The host-mode DI pattern is the same one Recipe-Aspnet-Host-Integration sets up: register the handler as a singleton via RequestHandler.Create<TReq, TRes>(sp), and each InvokeAsync call gets its own scope from the host's IServiceScopeFactory. This recipe shows the worker shape end-to-end — registration, the consume loop, graceful shutdown, error handling, and how to test the pipeline independently of the worker.

When this recipe applies

You want a long-running worker that:

  • Pulls items off a queue or channel (or wakes on a schedule) and processes them one at a time, or in small batches.
  • Runs each unit of work through several composable steps — validation, parsing, dispatching, logging.
  • Cancels cleanly when the host shuts down.
  • Shares services with the host (config, logger, db context, http client factory).

If the worker exists outside of a host — a console app polling SQS until you Ctrl-C it — see Recipe-Sqs-Polling-Console. That's the standalone variant: same shape but Plumber owns its own DI and lifecycle. This recipe assumes the host is doing the lifecycle work for you.

Project setup

dotnet new worker -n Queue.Worker
cd Queue.Worker
dotnet add package MSL.Plumber.Pipeline

The worker template gives you a Program.cs using Host.CreateApplicationBuilder and a Worker.cs with a BackgroundService. We'll replace the worker and add a pipeline alongside it.

The request type

Pick a request type that represents one unit of work. For a queue worker that's typically the deserialized message:

public sealed record QueueMessage(
    string Id,
    string Type,
    JsonElement Payload,
    int DeliveryCount);

Use Unit as the response type — a worker pipeline exists to do work, not to return a value:

using Plumber;

public sealed class HandleMessageMiddleware(
    RequestMiddleware<QueueMessage, Unit> next,
    ILogger<HandleMessageMiddleware> logger)
{
    public async Task InvokeAsync(RequestContext<QueueMessage, Unit> context)
    {
        logger.LogInformation(
            "handling message {MessageId} of type {Type}",
            context.Request.Id,
            context.Request.Type);

        // ... do work ...

        await next(context);
    }
}

See the Unit section of the README for the rationale.

Pipeline registration

Same shape as the ASP.NET Core integration: a static Configure method that adds middleware to the host-mode-created handler, plus a singleton registration:

internal static class WorkerPipeline
{
    public static RequestHandler<QueueMessage, Unit> Configure(
        RequestHandler<QueueMessage, Unit> handler) =>
        handler
            .Use<ValidateMessageMiddleware>()
            .Use<DeserializePayloadMiddleware>()
            .Use<DispatchMiddleware>();
}

In Program.cs:

var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddSingleton<IMessageStore, SqlMessageStore>();
builder.Services.AddScoped<AppDbContext>();

builder.Services.AddSingleton(sp =>
    WorkerPipeline.Configure(
        RequestHandler.Create<QueueMessage, Unit>(sp)));

builder.Services.AddHostedService<QueueWorker>();

var host = builder.Build();
await host.RunAsync();

The handler is a singleton because it's reusable. The middleware dispatch is compiled once at registration time and that compiled delegate is reused for every invocation; each call to InvokeAsync creates its own DI scope from the host's IServiceScopeFactory, so per-request services are fresh per message. See Recipe-Aspnet-Host-Integration for the full lifetime rationale.

The BackgroundService

The worker resolves the handler from constructor injection (it's a singleton in the host's DI) and an IMessageSource that produces messages. ExecuteAsync loops until the host signals shutdown, reading messages and invoking the pipeline:

public sealed class QueueWorker(
    RequestHandler<QueueMessage, Unit> handler,
    IMessageSource source,
    ILogger<QueueWorker> logger)
    : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation("queue worker started");

        await foreach (var message in source.ReadAllAsync(stoppingToken))
        {
            try
            {
                await handler.InvokeAsync(message, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                // shutdown in progress — let the loop exit.
                break;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "failed to handle message {MessageId}", message.Id);
                // continue to the next message; recovery strategy is queue-specific.
            }
        }

        logger.LogInformation("queue worker stopping");
    }
}

The stoppingToken flows into Plumber as the cancellation token for each call. Middleware that respects cancellation (context.ThrowIfCanceled, context.CancellationToken passed to async APIs) shuts down promptly when the host signals stop.

Source-of-work options

Two common shapes. The first is an in-memory Channel<T> — the right choice for tests, demos, or any case where another part of the same process produces messages. The second is a real broker; the example shows where to plug it in.

In-memory channel

System.Threading.Channels gives you a thread-safe producer/consumer queue with backpressure built in:

public interface IMessageSource
{
    IAsyncEnumerable<QueueMessage> ReadAllAsync(CancellationToken cancellationToken);
    ValueTask WriteAsync(QueueMessage message, CancellationToken cancellationToken);
}

public sealed class ChannelMessageSource : IMessageSource
{
    private readonly Channel<QueueMessage> channel = Channel.CreateBounded<QueueMessage>(
        new BoundedChannelOptions(capacity: 1024)
        {
            FullMode = BoundedChannelFullMode.Wait,
        });

    public IAsyncEnumerable<QueueMessage> ReadAllAsync(CancellationToken cancellationToken) =>
        channel.Reader.ReadAllAsync(cancellationToken);

    public ValueTask WriteAsync(QueueMessage message, CancellationToken cancellationToken) =>
        channel.Writer.WriteAsync(message, cancellationToken);
}

Register both interfaces against the same singleton instance so producer and consumer share the channel:

builder.Services.AddSingleton<ChannelMessageSource>();
builder.Services.AddSingleton<IMessageSource>(sp => sp.GetRequiredService<ChannelMessageSource>());

Real broker

Swap ChannelMessageSource for an implementation backed by your actual queue client. The skeleton:

public sealed class BrokerMessageSource(
    /* IBrokerClient client */)
    : IMessageSource
{
    public async IAsyncEnumerable<QueueMessage> ReadAllAsync(
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            // Pseudocode — swap for your broker client (SQS, Service Bus, RabbitMQ, Kafka).
            // var batch = await client.ReceiveAsync(cancellationToken);
            // foreach (var raw in batch) yield return Map(raw);
            yield break;
        }
    }

    public ValueTask WriteAsync(QueueMessage message, CancellationToken cancellationToken) =>
        // Pseudocode — most workers only consume; producer side is optional.
        ValueTask.CompletedTask;
}

Ack/nack semantics live inside the source, not inside Plumber. The handler's job is to throw on failure; the source decides what that means for the underlying queue.

Graceful shutdown

The host's stop sequence is well-defined: IHostApplicationLifetime.ApplicationStopping fires, every BackgroundService receives cancellation on its stoppingToken, the host waits up to HostOptions.ShutdownTimeout (default 30 seconds) for them to return.

Three things to make this work cleanly:

  1. Pass stoppingToken to InvokeAsync. The worker example above does this — the token threads into Plumber as context.CancellationToken and through to whatever async APIs your middleware calls.

  2. Catch OperationCanceledException at the worker boundary. When the token cancels, the in-flight InvokeAsync throws OperationCanceledException. The worker should treat that as "shutdown in progress" and exit the loop, not as "the message failed."

  3. Let middleware respect the token. Long-running middleware (a downstream HTTP call, a database query) should accept context.CancellationToken and pass it through. The terminal middleware in Plumber checks cancellation before invoking, so a token cancelled before the chain runs short-circuits cleanly.

If your work is genuinely uninterruptible — a payment that has either committed or it hasn't — wrap the critical section in a try/finally that completes the message regardless. The shutdown timeout exists for exactly this case.

Error handling

Exceptions from the pipeline propagate out of InvokeAsync. The worker's outer try/catch decides what happens next.

The example above logs and continues. That's the right call for a queue with retry and dead-letter semantics — the broker will redeliver after a visibility timeout, and after enough redeliveries, the message lands in the DLQ.

Retry inside the pipeline

For transient failures you want to handle inline rather than wait for the broker, add a retry middleware near the top of the chain:

internal sealed class RetryMiddleware(
    RequestMiddleware<QueueMessage, Unit> next,
    int maxAttempts,
    TimeSpan baseDelay)
{
    public async Task InvokeAsync(RequestContext<QueueMessage, Unit> context)
    {
        for (var attempt = 1; attempt <= maxAttempts; ++attempt)
        {
            try
            {
                await next(context);
                return;
            }
            catch (Exception ex) when (attempt < maxAttempts && IsTransient(ex))
            {
                var delay = baseDelay * Math.Pow(2, attempt - 1);
                await Task.Delay(delay, context.CancellationToken);
            }
        }
    }

    private static bool IsTransient(Exception ex) =>
        ex is HttpRequestException or TimeoutException;
}

Register with the constructor parameters:

handler.Use<RetryMiddleware>(3, TimeSpan.FromMilliseconds(200));

Wrap with an error boundary — a top-of-chain middleware that catches and logs everything below it — if you want every exception logged before it reaches the worker. Register the boundary first so it sees everything downstream throws.

Testing

The pipeline is testable independently of the worker. Both are wired by DI, so the middleware chain can be exercised directly without spinning up a real BackgroundService.

For pipeline-only tests, use WebApplicationFactory<Program> if your host is a web app, PlumberApplicationFactory for standalone pipelines, or — for a worker host — build a minimal test host:

[Fact]
public async Task ValidMessageRunsToCompletionAsync()
{
    var hostBuilder = Host.CreateApplicationBuilder();
    hostBuilder.Services.AddSingleton<IMessageStore, FakeMessageStore>();
    hostBuilder.Services.AddSingleton(sp =>
        WorkerPipeline.Configure(
            RequestHandler.Create<QueueMessage, Unit>(sp)));

    using var host = hostBuilder.Build();
    var handler = host.Services.GetRequiredService<RequestHandler<QueueMessage, Unit>>();

    var message = new QueueMessage(
        Id: "msg-1",
        Type: "order.created",
        Payload: JsonDocument.Parse("""{"orderId": 42}""").RootElement,
        DeliveryCount: 1);

    await handler.InvokeAsync(message, TestContext.Current.CancellationToken);

    var store = host.Services.GetRequiredService<IMessageStore>() as FakeMessageStore;
    Assert.Contains(store!.Saved, m => m.Id == "msg-1");
}

Worker-level tests — that the worker drains the source, that it exits on cancellation, that it logs and continues on exceptions — go in a separate test class with a fake IMessageSource that you can drive from the test.

For the strategy overview, see Testing. For the standalone factory pattern, see PlumberApplicationFactory.

Tested against

  • .NET 10
  • Microsoft.Extensions.Hosting 10.0.0
  • MSL.Plumber.Pipeline 3.*

See also

Clone this wiki locally