-
Notifications
You must be signed in to change notification settings - Fork 0
Recipe Aws Lambda Sqs
This recipe wires Plumber to an AWS Lambda function triggered by an SQS queue. The Lambda receives an SQSEvent (a batch of messages) and returns an SQSBatchResponse listing the messages that failed. Lambda re-queues only those listed messages; everything else is acknowledged. The pipeline shape is (SQSEvent, SQSBatchResponse).
What makes this scenario interesting is partial-batch failure. A naive consumer that throws on the first bad message forces SQS to retry the entire batch — every successful message gets reprocessed, idempotency assumptions get violated, and a single poison pill blocks the whole queue. SQSBatchResponse.BatchItemFailures lets you fail individual messages while acknowledging the rest. Plumber's job is to coordinate per-message processing and collect failures into that response.
Start from the AWS Lambda template and pick the SQS variant:
dotnet new install Amazon.Lambda.Templates
dotnet new lambda.EmptyFunction --name MyQueue.Lambda
cd MyQueue.Lambda/src/MyQueue.LambdaAdd the SQS event types and Plumber:
dotnet add package Amazon.Lambda.SQSEvents
dotnet add package MSL.Plumber.PipelineA representative MyQueue.Lambda.csproj:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AWSProjectType>Lambda</AWSProjectType>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Amazon.Lambda.Core" Version="2.7.0" />
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.5" />
<PackageReference Include="Amazon.Lambda.SQSEvents" Version="2.3.0" />
<PackageReference Include="MSL.Plumber.Pipeline" Version="3.0.0" />
</ItemGroup>
</Project>In the AWS console (or your IaC of choice) the function's event source mapping must have ReportBatchItemFailures enabled in the response config — otherwise SQS ignores the BatchItemFailures list and every batch is treated as all-or-nothing. With CDK this is reportBatchItemFailures: true on the SqsEventSource; with SAM it's FunctionResponseTypes: [ReportBatchItemFailures]; with the AWS CLI it's --function-response-types ReportBatchItemFailures on create-event-source-mapping.
Two layers: an outer pipeline that handles the whole batch, and a per-message processor invoked by a fan-out middleware. The outer pipeline is what Plumber compiles; the inner processor can be a plain method, a service, or a second Plumber handler if you want middleware around individual messages.
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Plumber;
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
internal static class Pipeline
{
public static RequestHandlerBuilder<SQSEvent, SQSBatchResponse> CreateBuilder() =>
RequestHandlerBuilder
.Create<SQSEvent, SQSBatchResponse>()
.AddDefaultConfigurationSources()
.ConfigureLogging(logging => logging
.SetMinimumLevel(LogLevel.Information)
.AddJsonConsole())
.ConfigureServices((services, configuration) => services
.AddSingleton<IOrderProcessor, OrderProcessor>());
public static RequestHandler<SQSEvent, SQSBatchResponse> Configure(
RequestHandler<SQSEvent, SQSBatchResponse> handler) =>
handler
.Use<BatchLoggingMiddleware>()
.Use<FanOutMiddleware>();
public static RequestHandler<SQSEvent, SQSBatchResponse> Build() =>
Configure(CreateBuilder().Build());
}Two middleware in the outer pipeline: a logging wrapper and the fan-out that processes each message individually.
Symmetric pre/post logging around the whole batch. The post-log includes the failure count from context.Response.
using Microsoft.Extensions.Logging;
using Plumber;
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
internal sealed class BatchLoggingMiddleware(
RequestMiddleware<SQSEvent, SQSBatchResponse> next,
ILogger<BatchLoggingMiddleware> logger)
{
public async Task InvokeAsync(RequestContext<SQSEvent, SQSBatchResponse> context)
{
var batchSize = context.Request.Records?.Count ?? 0;
logger.LogInformation("batch {Id}: received {Count} messages", context.Id, batchSize);
await next(context);
var failed = context.Response?.BatchItemFailures?.Count ?? 0;
logger.LogInformation(
"batch {Id}: {Succeeded} succeeded, {Failed} failed in {Elapsed}ms",
context.Id,
batchSize - failed,
failed,
context.Elapsed.TotalMilliseconds);
}
}Processes each SQSMessage independently. Successes are acknowledged; failures land in BatchItemFailures keyed by MessageId. The processor is method-injected so it picks up scoped dependencies on every batch.
using Microsoft.Extensions.Logging;
using Plumber;
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
internal sealed class FanOutMiddleware(
RequestMiddleware<SQSEvent, SQSBatchResponse> next)
{
public async Task InvokeAsync(
RequestContext<SQSEvent, SQSBatchResponse> context,
IOrderProcessor processor,
ILogger<FanOutMiddleware> logger)
{
var failures = new List<SQSBatchResponse.BatchItemFailure>();
var records = context.Request.Records ?? [];
foreach (var message in records)
{
context.ThrowIfCanceled();
try
{
await processor.ProcessAsync(message, context.CancellationToken);
}
catch (OperationCanceledException)
{
throw; // let cancellation propagate; the rest of the batch will not be processed
}
catch (Exception ex)
{
logger.LogError(
ex,
"message {MessageId} failed; will be reported for retry",
message.MessageId);
failures.Add(new SQSBatchResponse.BatchItemFailure { ItemIdentifier = message.MessageId });
}
}
context.Response = new SQSBatchResponse(failures);
await next(context);
}
}A few details worth calling out:
-
Sequential per-message processing is the default. If your processor is I/O-bound and order does not matter, swap the loop for
Parallel.ForEachAsyncwith aMaxDegreeOfParallelismtuned to your downstream throughput. Collect failures into aConcurrentBag<SQSBatchResponse.BatchItemFailure>instead of aList. -
Cancellation is honoured. If the Lambda runs out of time (you wired up
ILambdaContext.RemainingTime), the in-flight message and every message after it become failures — they will be re-queued by SQS visibility timeout expiry. Messages already processed remain acknowledged. - The processor is the boundary. Wrap external calls there; the middleware just collects results.
If you want the processor itself to be a Plumber pipeline — say, with its own validation, retry, and persist middleware — build a second handler with (SQSMessage, Unit) and call it from IOrderProcessor.ProcessAsync. That is also a recipe for the Cli multi-command dispatcher shape.
Plain interface. The implementation is whatever your domain needs; the relevant pattern is that ProcessAsync either returns or throws.
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
public interface IOrderProcessor
{
Task ProcessAsync(SQSMessage message, CancellationToken cancellationToken);
}
internal sealed class OrderProcessor : IOrderProcessor
{
public Task ProcessAsync(SQSMessage message, CancellationToken cancellationToken)
{
// parse message.Body, do the work, throw on failure
return Task.CompletedTask;
}
}The Lambda handler builds the Plumber handler once per cold start and reuses it for every warm invocation:
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Plumber;
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace MyQueue.Lambda;
public sealed class Function
{
private static readonly RequestHandler<SQSEvent, SQSBatchResponse> Handler = Pipeline.Build();
public async Task<SQSBatchResponse> FunctionHandler(SQSEvent sqsEvent, ILambdaContext context)
{
// Reserve a small buffer so the response can be serialized before Lambda kills the container.
using var cts = new CancellationTokenSource(context.RemainingTime - TimeSpan.FromMilliseconds(500));
var response = await Handler.InvokeAsync(sqsEvent, cts.Token);
return response ?? new SQSBatchResponse([]);
}
}The ?? new SQSBatchResponse([]) fallback covers the case where the pipeline didn't assign a response — for an SQS handler, "no response" should mean "all messages succeeded," so an empty failure list is the safe default.
If the queue's redrive policy and visibility timeout make all-or-nothing acknowledgement the right fit — for example, when every message in a batch goes to the same downstream system that rejects the whole batch on any failure — drop the SQSBatchResponse and use Unit:
using var handler = RequestHandlerBuilder
.Create<SQSEvent, Unit>()
.Build()
.Use<FanOutMiddleware>();
// Function.cs
public async Task FunctionHandler(SQSEvent sqsEvent, ILambdaContext context)
{
_ = await Handler.InvokeAsync(sqsEvent);
}The fan-out middleware in this variant rethrows on the first failure (or aggregates them into a single exception) so the Lambda invocation fails and SQS retries the whole batch. The SQSBatchResponse shape is strictly more flexible — lead with it unless you have a specific reason to prefer all-or-nothing.
The same advice as the API Gateway recipe: build the handler once per container in a static readonly initializer. The compiled middleware dispatch, the DI container, and any singletons survive between invocations. Each InvokeAsync creates a fresh DI scope.
For SQS specifically, batch size matters for cold-start economics. A function with BatchSize = 10 amortizes the cold start across ten messages; a function with BatchSize = 1 pays the cold start per message. If your workload is bursty and infrequent, raise the batch size; the partial-batch failure semantics let you keep failure isolation regardless.
The middleware-level pattern (catch in FanOutMiddleware, add to failures list, continue) covers the common case. A few edge cases:
Cancellation mid-batch. When the Lambda is about to time out, the linked cancellation token fires. The fan-out re-throws OperationCanceledException, which propagates past the logging middleware and out of the pipeline. The Lambda host sees the exception and (because no BatchItemFailures was returned) treats the entire batch as failed — which is what you want, because messages that were processed successfully are now subject to redelivery via visibility timeout. To prefer "report what we got and let the rest re-queue," catch OperationCanceledException in FanOutMiddleware instead of re-throwing, mark the unprocessed remainder as failures, and assign context.Response.
Idempotency. SQS guarantees at-least-once delivery. Whether you use partial-batch failure or all-or-nothing, your processor must be idempotent — design around the message ID (or a stable business key inside the body), not around the assumption that each message arrives exactly once.
Poison messages. A message that always fails will be re-queued forever. Configure a redrive policy with a dead-letter queue and a maxReceiveCount so unprocessable messages eventually move out of the way. The pipeline does not need to know about the DLQ — that's an SQS-side concern.
Logging the failure cause. The BatchItemFailure shape only carries the message ID; the failure reason has to be logged separately. The logger.LogError(ex, ...) call in FanOutMiddleware does that. Include message.MessageId in the structured log fields so CloudWatch Logs Insights can join failures to individual messages.
The fan-out logic is the interesting code. Test it with a stub processor that fails on a known message ID:
using Plumber.Testing;
using Microsoft.Extensions.DependencyInjection;
using Amazon.Lambda.SQSEvents;
using Xunit;
namespace MyQueue.Lambda.Tests;
internal sealed class FailingProcessor(HashSet<string> failOn) : IOrderProcessor
{
public Task ProcessAsync(SQSMessage message, CancellationToken cancellationToken) =>
failOn.Contains(message.MessageId)
? throw new InvalidOperationException("simulated failure")
: Task.CompletedTask;
}
public sealed class FanOutTests
{
[Fact]
public async Task PartialFailureReportsOnlyTheBadMessageAsync()
{
using var factory = new PlumberApplicationFactory<SQSEvent, SQSBatchResponse>(
_ => Pipeline.CreateBuilder(),
Pipeline.Configure)
.WithServices(services => services
.AddSingleton<IOrderProcessor>(new FailingProcessor(["msg-2"])));
var batch = new SQSEvent
{
Records =
[
new SQSEvent.SQSMessage { MessageId = "msg-1", Body = "{}" },
new SQSEvent.SQSMessage { MessageId = "msg-2", Body = "{}" },
new SQSEvent.SQSMessage { MessageId = "msg-3", Body = "{}" },
],
};
var response = await factory.InvokeAsync(batch);
Assert.NotNull(response);
Assert.Single(response.BatchItemFailures);
Assert.Equal("msg-2", response.BatchItemFailures[0].ItemIdentifier);
}
}See Testing for the full strategy and PlumberApplicationFactory for the customization hooks.
- .NET 10
-
MSL.Plumber.Pipeline3.0.0 -
Amazon.Lambda.Core2.7.0 -
Amazon.Lambda.Serialization.SystemTextJson2.4.5 -
Amazon.Lambda.SQSEvents2.3.0 - AWS Lambda .NET 10 managed runtime
Lambda runtime types and SDK versions move quickly. Check the Amazon.Lambda.* packages on NuGet before copying these versions into a new project. The ReportBatchItemFailures event-source flag is a current-as-of-writing AWS feature; the API surface for enabling it lives in CloudFormation/CDK/SAM rather than in the SDK.
- Recipe: AWS Lambda behind API Gateway — the request/response Lambda recipe
- Recipe: SQS polling console — long-running consumer outside Lambda
- Building a Pipeline — the full builder surface
- Middleware — class vs delegate, method injection
-
Request Lifecycle —
RequestContext,Unit, cancellation - Testing and PlumberApplicationFactory
Documents Plumber v4.x · Repository · MIT License · Report an issue
Getting Started
Pipeline (core)
Testing
Serilog Extensions
Diagnostics
Recipes
- AWS Lambda — API Gateway
- AWS Lambda — SQS
- Azure Functions — HTTP
- SQS polling console
- ASP.NET Core integration
- BackgroundService worker
- Webhook receiver
- Multi-command CLI
- File watcher
- Configuration reload
Repo · NuGet · NuGet — Testing · NuGet — Serilog · NuGet — Diagnostics