-
Notifications
You must be signed in to change notification settings - Fork 0
Recipe Sqs Polling Console
A long-running console application that polls an Amazon SQS queue and dispatches each message through a Plumber pipeline. No Lambda runtime — the process owns the loop, the receive call, the visibility timeout, and the delete-on-success step. The pipeline shape is (QueueMessage, Unit), where QueueMessage is a record carrying the SQS message and the receipt handle.
The interesting wrinkle is lifecycle ownership. The Lambda recipes lean on AWS to handle batching, scaling, and shutdown; the polling console does all of that itself. Three concerns dominate:
-
Graceful shutdown. Ctrl+C and SIGTERM should drain the in-flight message, stop the receive loop, and exit cleanly. A
CancellationTokenSourcelinked toConsole.CancelKeyPressandAppDomain.ProcessExitis the standard pattern. - Receipt handle handling. A successful message gets deleted; a failed message is left in the queue so SQS's visibility timeout puts it back in the available pool. The pipeline signals success or failure; the loop acts on that signal.
-
Long polling.
WaitTimeSeconds = 20onReceiveMessagecollapses the request rate when the queue is idle and removes the need for a sleep loop.
A plain console template is enough:
dotnet new console --name MyQueue.Worker --framework net10.0
cd MyQueue.WorkerAdd the SQS SDK and Plumber:
dotnet add package AWSSDK.SQS
dotnet add package MSL.Plumber.Pipeline
dotnet add package Microsoft.Extensions.HostingThe Microsoft.Extensions.Hosting reference is optional. Reach for it when you want:
-
IHostedServicelifetime management - Structured logging configured the same way as ASP.NET projects
- A graceful-shutdown signal the host wires up for you
A representative MyQueue.Worker.csproj:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.SQS" Version="4.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.0" />
<PackageReference Include="MSL.Plumber.Pipeline" Version="3.*" />
</ItemGroup>
</Project>The request type bundles the SQS message and its receipt handle into a single record. Bundling keeps the message body and the acknowledgement token together as the pipeline traverses middleware.
namespace MyQueue.Worker;
public sealed record QueueMessage(
string MessageId,
string ReceiptHandle,
string Body,
IReadOnlyDictionary<string, string> Attributes);The pipeline itself follows the standard CreateBuilder / Configure / Build shape:
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Plumber;
namespace MyQueue.Worker;
internal static class Pipeline
{
public static RequestHandlerBuilder<QueueMessage, Unit> CreateBuilder(string[] args) =>
RequestHandlerBuilder
.Create<QueueMessage, Unit>(args)
.AddDefaultConfigurationSources()
.ConfigureLogging(logging => logging
.SetMinimumLevel(LogLevel.Information)
.AddSimpleConsole(o => o.SingleLine = true))
.ConfigureServices((services, configuration) => services
.AddSingleton<IOrderProcessor, OrderProcessor>());
public static RequestHandler<QueueMessage, Unit> Configure(
RequestHandler<QueueMessage, Unit> handler) =>
handler
.Use<MessageLoggingMiddleware>()
.Use<ParseMiddleware>()
.Use<ProcessMiddleware>();
public static RequestHandler<QueueMessage, Unit> Build(string[] args) =>
Configure(CreateBuilder(args).Build());
}Unit is the response type because a successful run produces no value — the loop deletes the message on success and the pipeline has nothing to return. Failure is signalled by a thrown exception that propagates out of InvokeAsync.
Three middleware:
-
MessageLoggingMiddleware— symmetric logging around the message -
ParseMiddleware— body deserialization into a typed value -
ProcessMiddleware— the terminal business logic
Symmetric pre/post logging. The outer middleware sees the elapsed time even when inner middleware throw.
using Microsoft.Extensions.Logging;
using Plumber;
namespace MyQueue.Worker;
internal sealed class MessageLoggingMiddleware(
RequestMiddleware<QueueMessage, Unit> next,
ILogger<MessageLoggingMiddleware> logger)
{
public async Task InvokeAsync(RequestContext<QueueMessage, Unit> context)
{
var msg = context.Request;
logger.LogInformation("processing {MessageId}", msg.MessageId);
try
{
await next(context);
logger.LogInformation(
"completed {MessageId} in {Elapsed}ms",
msg.MessageId,
context.Elapsed.TotalMilliseconds);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogError(
ex,
"failed {MessageId} after {Elapsed}ms",
msg.MessageId,
context.Elapsed.TotalMilliseconds);
throw;
}
}
}OperationCanceledException is excluded from the catch because cancellation during shutdown is an expected control-flow signal, not a failure.
Deserializes the message body into a typed Order and stashes it in context.Data. A parse failure throws — the outer loop then leaves the message in-queue for SQS to retry until it hits the redrive policy.
using System.Text.Json;
using Plumber;
namespace MyQueue.Worker;
internal sealed class ParseMiddleware(
RequestMiddleware<QueueMessage, Unit> next)
{
public Task InvokeAsync(RequestContext<QueueMessage, Unit> context)
{
var order = JsonSerializer.Deserialize<Order>(context.Request.Body)
?? throw new InvalidOperationException("message body deserialized to null");
context.Data["order"] = order;
return next(context);
}
}
public sealed record Order(string OrderId, decimal Amount);For a message you'd rather skip than retry (an unparseable payload that will always fail), catch the exception, log it, and assign context.Response = default(Unit) to short-circuit cleanly. The loop then deletes the message instead of leaving it for retry — a "send to a side log and move on" pattern. The right choice depends on whether you have a DLQ.
The terminal business logic. The processor is method-injected so it picks up scoped dependencies on every invocation.
using Plumber;
namespace MyQueue.Worker;
internal sealed class ProcessMiddleware(
RequestMiddleware<QueueMessage, Unit> next)
{
public async Task InvokeAsync(
RequestContext<QueueMessage, Unit> context,
IOrderProcessor processor)
{
var order = (Order)context.Data["order"]!;
await processor.ProcessAsync(order, context.CancellationToken);
await next(context);
}
}
public interface IOrderProcessor
{
Task ProcessAsync(Order order, CancellationToken cancellationToken);
}
internal sealed class OrderProcessor : IOrderProcessor
{
public Task ProcessAsync(Order order, CancellationToken cancellationToken) =>
Task.CompletedTask; // your real domain logic here
}Build the handler once, register graceful-shutdown signals, and drive ReceiveMessageAsync in a loop until the cancellation token fires.
using Amazon.SQS;
using Amazon.SQS.Model;
using Plumber;
var queueUrl = Environment.GetEnvironmentVariable("QUEUE_URL")
?? throw new InvalidOperationException("QUEUE_URL must be set");
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent immediate process termination
Console.WriteLine("ctrl+c received; draining...");
cts.Cancel();
};
AppDomain.CurrentDomain.ProcessExit += (_, _) => cts.Cancel();
using var sqs = new AmazonSQSClient();
using var handler = MyQueue.Worker.Pipeline.Build(args);
await PollAsync(sqs, handler, queueUrl, cts.Token);
Console.WriteLine("worker exited cleanly");
static async Task PollAsync(
IAmazonSQS sqs,
RequestHandler<MyQueue.Worker.QueueMessage, Plumber.Unit> handler,
string queueUrl,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var receive = new ReceiveMessageRequest
{
QueueUrl = queueUrl,
MaxNumberOfMessages = 10,
WaitTimeSeconds = 20, // long polling — collapses the request rate when the queue is idle
VisibilityTimeout = 60,
MessageAttributeNames = ["All"],
};
ReceiveMessageResponse response;
try
{
response = await sqs.ReceiveMessageAsync(receive, cancellationToken);
}
catch (OperationCanceledException)
{
return; // shutdown requested mid-poll; drop the in-flight receive
}
if (response.Messages is null || response.Messages.Count == 0)
{
continue; // long-polling timed out with no messages; loop and try again
}
foreach (var sqsMessage in response.Messages)
{
if (cancellationToken.IsCancellationRequested) return;
var queueMessage = new MyQueue.Worker.QueueMessage(
MessageId: sqsMessage.MessageId,
ReceiptHandle: sqsMessage.ReceiptHandle,
Body: sqsMessage.Body,
Attributes: sqsMessage.MessageAttributes?
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.StringValue ?? string.Empty)
?? new Dictionary<string, string>());
try
{
_ = await handler.InvokeAsync(queueMessage, cancellationToken);
await sqs.DeleteMessageAsync(queueUrl, queueMessage.ReceiptHandle, cancellationToken);
}
catch (OperationCanceledException)
{
// shutdown — leave the message in the queue; visibility timeout returns it
return;
}
catch (Exception ex)
{
Console.Error.WriteLine($"message {queueMessage.MessageId} failed: {ex.Message}");
// leave the message; SQS visibility timeout returns it for retry
}
}
}
}A few details worth calling out:
-
Console.CancelKeyPresssetse.Cancel = true. With that flag in place the loop gets to drain. Leaving itfalselets the runtime kill the process the moment the user presses Ctrl+C. -
AppDomain.ProcessExitcovers SIGTERM (Linux container shutdown). On Windows it fires for a clean process exit; on Linux containers it's the signal Kubernetes sends during pod termination. -
CancellationTokenflows through everything — the receive call, the pipeline invocation, and the delete call. A cancelled receive returns immediately; a cancelledInvokeAsyncthrowsOperationCanceledException(orTimeoutExceptionif the handler-wide timeout fired first); a cancelled delete just stops trying. -
MaxNumberOfMessages = 10is the SQS hard cap per receive call. Process them sequentially in the loop above. For I/O-bound processors, swap theforeachforParallel.ForEachAsyncor push messages onto aChannelconsumed by a worker pool. -
VisibilityTimeout = 60is longer than the default 30 seconds in many configurations. Tune it to a value comfortably greater than your slowest message processing time. If a message takes longer than the visibility timeout, SQS will redeliver it while you're still working — then the delete call after success runs against an already-deleted receipt handle and returns an error.
This is a long-running process; "cold start" means application startup. Build the handler once at process start and keep it for the whole process lifetime. The using var handler = Pipeline.Build(args) line above does exactly that.
To embed the polling loop in IHostedService (so logging, configuration, and graceful shutdown are handled by the generic host instead of by hand), the shape is:
using Microsoft.Extensions.Hosting;
var builder = Host.CreateApplicationBuilder(args);
builder.Services
.AddSingleton<IAmazonSQS>(_ => new AmazonSQSClient())
.AddSingleton(sp => MyQueue.Worker.Pipeline.Configure(
Plumber.RequestHandler.Create<MyQueue.Worker.QueueMessage, Plumber.Unit>(sp)))
.AddHostedService<PollingService>();
await builder.Build().RunAsync();PollingService : BackgroundService overrides ExecuteAsync(CancellationToken stoppingToken) and runs the same loop body. Plumber's host-mode factory (RequestHandler.Create(serviceProvider)) reuses the host's DI container — see ASP.NET Core integration for the same pattern in a web context.
The host approach gives you graceful shutdown for free: the host signals the stoppingToken on SIGTERM/Ctrl+C and waits for ExecuteAsync to return before exiting.
The loop pattern is simple: success deletes the message; failure leaves it. SQS's visibility timeout puts the message back in the available pool after the timeout expires, and the message gets re-delivered. After maxReceiveCount attempts, the redrive policy moves it to the dead-letter queue.
Three patterns layered on top.
A message that will always fail to parse is better dropped than retried. Set context.Response = default(Unit) in ParseMiddleware's catch, log the bad payload, and let the loop delete the message. This is "fail forward into a side log."
Some failures are transient (network blips, throttling). Wrap processor.ProcessAsync in a retry middleware (Polly's ResiliencePipeline, or hand-rolled) so transient failures resolve before the message is acknowledged or returned to the queue. The middleware sits between ParseMiddleware and ProcessMiddleware.
If a message takes longer than the configured visibility timeout, call ChangeMessageVisibilityAsync from a heartbeat task that runs alongside the pipeline invocation. The heartbeat is outside Plumber's purview; spawn it from the loop and link its cancellation to the pipeline's completion.
PlumberApplicationFactory<TReq, TRes> builds the real pipeline once and lets you swap selected services for stubs. The polling loop itself is testable separately — fake the IAmazonSQS and assert on receive/delete calls. See Testing and PlumberApplicationFactory for the full surface.
using Plumber.Testing;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
namespace MyQueue.Worker.Tests;
public sealed class PipelineTests
{
[Fact]
public async Task ValidOrderInvokesProcessorAsync()
{
var processor = new RecordingProcessor();
using var factory = new PlumberApplicationFactory<QueueMessage, Plumber.Unit>(
Pipeline.CreateBuilder,
Pipeline.Configure)
.WithServices(services => services.AddSingleton<IOrderProcessor>(processor));
var message = new QueueMessage(
MessageId: "msg-1",
ReceiptHandle: "rh-1",
Body: """{"OrderId":"o-1","Amount":42.0}""",
Attributes: new Dictionary<string, string>());
_ = await factory.InvokeAsync(message);
Assert.Single(processor.Processed);
Assert.Equal("o-1", processor.Processed[0].OrderId);
}
private sealed class RecordingProcessor : IOrderProcessor
{
public List<Order> Processed { get; } = [];
public Task ProcessAsync(Order order, CancellationToken cancellationToken)
{
Processed.Add(order);
return Task.CompletedTask;
}
}
}- .NET 10
-
MSL.Plumber.Pipeline3.* -
MSL.Plumber.Pipeline.Testing3.* (test project) -
AWSSDK.SQS4.0.0 -
Microsoft.Extensions.Hosting10.0.0
The AWS SDK for .NET v4 reorganized many namespaces; if you're on v3, IAmazonSQS lives in the same place but several response shapes (notably the message-attribute property names) differ. Check the AWSSDK.SQS package on NuGet before copying these versions into a new project.
- Recipe: AWS Lambda for SQS — partial-batch failure when AWS owns the loop
-
Recipe: ASP.NET Core integration — host-mode
RequestHandler.Createfrom a generic host - Building a Pipeline — the full builder surface
- Middleware — class vs delegate, method injection
-
Request Lifecycle —
RequestContext,Unit, cancellation - Testing and PlumberApplicationFactory
-
Advanced — host-mode
RequestHandler.Create(serviceProvider)
Documents Plumber v4.x · Repository · MIT License · Report an issue
Getting Started
Pipeline (core)
Testing
Serilog Extensions
Diagnostics
Recipes
- AWS Lambda — API Gateway
- AWS Lambda — SQS
- Azure Functions — HTTP
- SQS polling console
- ASP.NET Core integration
- BackgroundService worker
- Webhook receiver
- Multi-command CLI
- File watcher
- Configuration reload
Repo · NuGet · NuGet — Testing · NuGet — Serilog · NuGet — Diagnostics