-
Notifications
You must be signed in to change notification settings - Fork 0
Recipe Aws Lambda Sqs
An AWS Lambda function triggered by an SQS queue that receives an SQSEvent (a batch of messages) and returns an SQSBatchResponse listing the failed messages. Lambda re-queues only the listed messages; everything else is acknowledged. The pipeline shape is (SQSEvent, SQSBatchResponse).
The interesting wrinkle is partial-batch failure. A consumer that throws on the first bad message forces SQS to retry the entire batch — every successful message gets reprocessed, idempotency assumptions get violated, and a single poison pill blocks the whole queue. SQSBatchResponse.BatchItemFailures lets you fail individual messages while acknowledging the rest. Plumber's job is to coordinate per-message processing and collect failures into that response.
Start from the AWS Lambda template and pick the empty function variant:
dotnet new install Amazon.Lambda.Templates
dotnet new lambda.EmptyFunction --name MyQueue.Lambda
cd MyQueue.Lambda/src/MyQueue.LambdaAdd the SQS event types and Plumber:
dotnet add package Amazon.Lambda.SQSEvents
dotnet add package MSL.Plumber.PipelineA representative MyQueue.Lambda.csproj:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AWSProjectType>Lambda</AWSProjectType>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Amazon.Lambda.Core" Version="2.7.0" />
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.5" />
<PackageReference Include="Amazon.Lambda.SQSEvents" Version="2.3.0" />
<PackageReference Include="MSL.Plumber.Pipeline" Version="3.*" />
</ItemGroup>
</Project>The function's event source mapping must have ReportBatchItemFailures enabled in the response config. Without it, SQS ignores the BatchItemFailures list and every batch is treated as all-or-nothing.
- CDK:
reportBatchItemFailures: trueon theSqsEventSource - SAM:
FunctionResponseTypes: [ReportBatchItemFailures] - AWS CLI:
--function-response-types ReportBatchItemFailuresoncreate-event-source-mapping
Two layers: an outer pipeline that handles the whole batch, and a per-message processor invoked by a fan-out middleware. The outer pipeline is what Plumber compiles. The inner processor can be a plain method, a service, or a second Plumber handler if you want middleware around individual messages.
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Plumber;
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
internal static class Pipeline
{
public static RequestHandlerBuilder<SQSEvent, SQSBatchResponse> CreateBuilder() =>
RequestHandlerBuilder
.Create<SQSEvent, SQSBatchResponse>()
.AddDefaultConfigurationSources()
.ConfigureLogging(logging => logging
.SetMinimumLevel(LogLevel.Information)
.AddJsonConsole())
.ConfigureServices((services, configuration) => services
.AddSingleton<IOrderProcessor, OrderProcessor>());
public static RequestHandler<SQSEvent, SQSBatchResponse> Configure(
RequestHandler<SQSEvent, SQSBatchResponse> handler) =>
handler
.Use<BatchLoggingMiddleware>()
.Use<FanOutMiddleware>();
public static RequestHandler<SQSEvent, SQSBatchResponse> Build() =>
Configure(CreateBuilder().Build());
}Two middleware in the outer pipeline:
-
BatchLoggingMiddleware— symmetric logging around the whole batch -
FanOutMiddleware— per-message processing with failure collection
Logs at entry and exit. The post-log includes the failure count from context.Response.
using Microsoft.Extensions.Logging;
using Plumber;
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
internal sealed class BatchLoggingMiddleware(
RequestMiddleware<SQSEvent, SQSBatchResponse> next,
ILogger<BatchLoggingMiddleware> logger)
{
public async Task InvokeAsync(RequestContext<SQSEvent, SQSBatchResponse> context)
{
var batchSize = context.Request.Records?.Count ?? 0;
logger.LogInformation("batch {Id}: received {Count} messages", context.Id, batchSize);
await next(context);
var failed = context.Response?.BatchItemFailures?.Count ?? 0;
logger.LogInformation(
"batch {Id}: {Succeeded} succeeded, {Failed} failed in {Elapsed}ms",
context.Id,
batchSize - failed,
failed,
context.Elapsed.TotalMilliseconds);
}
}Processes each SQSMessage independently. Successes are acknowledged; failures land in BatchItemFailures keyed by MessageId. The processor is method-injected so it picks up scoped dependencies on every batch.
using Microsoft.Extensions.Logging;
using Plumber;
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
internal sealed class FanOutMiddleware(
RequestMiddleware<SQSEvent, SQSBatchResponse> next)
{
public async Task InvokeAsync(
RequestContext<SQSEvent, SQSBatchResponse> context,
IOrderProcessor processor,
ILogger<FanOutMiddleware> logger)
{
var failures = new List<SQSBatchResponse.BatchItemFailure>();
var records = context.Request.Records ?? [];
foreach (var message in records)
{
context.ThrowIfCanceled();
try
{
await processor.ProcessAsync(message, context.CancellationToken);
}
catch (OperationCanceledException)
{
throw; // let cancellation propagate; the rest of the batch will not be processed
}
catch (Exception ex)
{
logger.LogError(
ex,
"message {MessageId} failed; will be reported for retry",
message.MessageId);
failures.Add(new SQSBatchResponse.BatchItemFailure { ItemIdentifier = message.MessageId });
}
}
context.Response = new SQSBatchResponse(failures);
await next(context);
}
}A few details worth calling out:
-
Sequential per-message processing is the default. For I/O-bound processors where order does not matter, swap the loop for
Parallel.ForEachAsyncwith aMaxDegreeOfParallelismtuned to your downstream throughput. Collect failures into aConcurrentBag<SQSBatchResponse.BatchItemFailure>instead of aList. -
Cancellation is honoured. If the Lambda runs out of time (you wired up
ILambdaContext.RemainingTime), the in-flight message and every message after it become failures — they will be re-queued by SQS visibility timeout expiry. Messages already processed remain acknowledged. - The processor is the boundary. Wrap external calls there; the middleware just collects results.
For a processor that is itself a pipeline — its own validation, retry, and persist middleware — build a second handler with (SQSMessage, Unit) and call it from IOrderProcessor.ProcessAsync. That is the same shape used in the CLI multi-command dispatcher recipe.
A plain interface. The implementation is whatever your domain needs; the relevant contract is that ProcessAsync either returns or throws.
using Amazon.Lambda.SQSEvents;
namespace MyQueue.Lambda;
public interface IOrderProcessor
{
Task ProcessAsync(SQSMessage message, CancellationToken cancellationToken);
}
internal sealed class OrderProcessor : IOrderProcessor
{
public Task ProcessAsync(SQSMessage message, CancellationToken cancellationToken)
{
// parse message.Body, do the work, throw on failure
return Task.CompletedTask;
}
}Build the Plumber handler once per cold start in a static readonly initializer and reuse it for every warm invocation:
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Plumber;
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace MyQueue.Lambda;
public sealed class Function
{
private static readonly RequestHandler<SQSEvent, SQSBatchResponse> Handler = Pipeline.Build();
public async Task<SQSBatchResponse> FunctionHandler(SQSEvent sqsEvent, ILambdaContext context)
{
// Reserve a small buffer so the response can be serialized before Lambda kills the container.
using var cts = new CancellationTokenSource(context.RemainingTime - TimeSpan.FromMilliseconds(500));
var response = await Handler.InvokeAsync(sqsEvent, cts.Token);
return response ?? new SQSBatchResponse([]);
}
}The ?? new SQSBatchResponse([]) fallback covers the case where the pipeline didn't assign a response — for an SQS handler, "no response" means "all messages succeeded," so an empty failure list is the safe default.
When all-or-nothing acknowledgement is the right fit — every message in a batch goes to the same downstream system that rejects the whole batch on any failure — drop the SQSBatchResponse and use Unit:
using var handler = RequestHandlerBuilder
.Create<SQSEvent, Unit>()
.Build()
.Use<FanOutMiddleware>();
// Function.cs
public async Task FunctionHandler(SQSEvent sqsEvent, ILambdaContext context)
{
_ = await Handler.InvokeAsync(sqsEvent);
}The fan-out middleware in this variant rethrows on the first failure (or aggregates them into a single exception) so the Lambda invocation fails and SQS retries the whole batch. The SQSBatchResponse shape is strictly more flexible — lead with it unless you have a specific reason to prefer all-or-nothing.
Build the handler once per container in a static readonly initializer. The compiled middleware dispatch, the DI container, and any singletons survive between invocations. Each InvokeAsync creates a fresh DI scope. (See the API Gateway recipe for the full cold-start playbook — same shape applies here.)
For SQS specifically, batch size matters for cold-start economics. A function with BatchSize = 10 amortizes the cold start across ten messages; a function with BatchSize = 1 pays the cold start per message. For bursty, infrequent workloads, raise the batch size — the partial-batch failure semantics keep failure isolation intact regardless.
The middleware pattern (catch in FanOutMiddleware, add to failures list, continue) covers the common case. A few edge cases follow.
When the Lambda is about to time out, the linked cancellation token fires. The fan-out re-throws OperationCanceledException, which propagates past the logging middleware and out of the pipeline. The Lambda host sees the exception and treats the entire batch as failed (because no BatchItemFailures was returned) — which is the right call, since messages already processed successfully are now subject to redelivery via visibility timeout.
To prefer "report what we got and let the rest re-queue," catch OperationCanceledException in FanOutMiddleware instead of re-throwing, mark the unprocessed remainder as failures, and assign context.Response.
SQS guarantees at-least-once delivery. Whether you use partial-batch failure or all-or-nothing, the processor must be idempotent — design around the message ID (or a stable business key inside the body), not around the assumption that each message arrives exactly once.
A message that always fails will be re-queued forever. Configure a redrive policy with a dead-letter queue and a maxReceiveCount so unprocessable messages eventually move out of the way. The pipeline does not need to know about the DLQ — that's an SQS-side concern.
The BatchItemFailure shape only carries the message ID; the failure reason has to be logged separately. The logger.LogError(ex, ...) call in FanOutMiddleware does that. Include message.MessageId in the structured log fields so CloudWatch Logs Insights can join failures to individual messages.
The fan-out logic is the interesting code. PlumberApplicationFactory<TReq, TRes> builds the real pipeline once and lets you swap selected services for stubs — here, an IOrderProcessor that fails on a known message ID. See Testing and PlumberApplicationFactory for the full surface.
using Plumber.Testing;
using Microsoft.Extensions.DependencyInjection;
using Amazon.Lambda.SQSEvents;
using Xunit;
namespace MyQueue.Lambda.Tests;
internal sealed class FailingProcessor(HashSet<string> failOn) : IOrderProcessor
{
public Task ProcessAsync(SQSMessage message, CancellationToken cancellationToken) =>
failOn.Contains(message.MessageId)
? throw new InvalidOperationException("simulated failure")
: Task.CompletedTask;
}
public sealed class FanOutTests
{
[Fact]
public async Task PartialFailureReportsOnlyTheBadMessageAsync()
{
using var factory = new PlumberApplicationFactory<SQSEvent, SQSBatchResponse>(
_ => Pipeline.CreateBuilder(),
Pipeline.Configure)
.WithServices(services => services
.AddSingleton<IOrderProcessor>(new FailingProcessor(["msg-2"])));
var batch = new SQSEvent
{
Records =
[
new SQSEvent.SQSMessage { MessageId = "msg-1", Body = "{}" },
new SQSEvent.SQSMessage { MessageId = "msg-2", Body = "{}" },
new SQSEvent.SQSMessage { MessageId = "msg-3", Body = "{}" },
],
};
var response = await factory.InvokeAsync(batch);
Assert.NotNull(response);
Assert.Single(response.BatchItemFailures);
Assert.Equal("msg-2", response.BatchItemFailures[0].ItemIdentifier);
}
}- .NET 10
-
MSL.Plumber.Pipeline3.* -
MSL.Plumber.Pipeline.Testing3.* (test project) -
Amazon.Lambda.Core2.7.0 -
Amazon.Lambda.Serialization.SystemTextJson2.4.5 -
Amazon.Lambda.SQSEvents2.3.0 - AWS Lambda .NET 10 managed runtime
Lambda runtime types and SDK versions move quickly. Check the Amazon.Lambda.* packages on NuGet before copying these versions into a new project. The ReportBatchItemFailures event-source flag is a current-as-of-writing AWS feature; the API surface for enabling it lives in CloudFormation/CDK/SAM rather than in the SDK.
- Recipe: AWS Lambda behind API Gateway — the request/response Lambda recipe
- Recipe: SQS polling console — long-running consumer outside Lambda
- Building a Pipeline — the full builder surface
- Middleware — class vs delegate, method injection
-
Request Lifecycle —
RequestContext,Unit, cancellation - Testing and PlumberApplicationFactory
Documents Plumber v4.x · Repository · MIT License · Report an issue
Getting Started
Pipeline (core)
Testing
Serilog Extensions
Diagnostics
Recipes
- AWS Lambda — API Gateway
- AWS Lambda — SQS
- Azure Functions — HTTP
- SQS polling console
- ASP.NET Core integration
- BackgroundService worker
- Webhook receiver
- Multi-command CLI
- File watcher
- Configuration reload
Repo · NuGet · NuGet — Testing · NuGet — Serilog · NuGet — Diagnostics