-
Notifications
You must be signed in to change notification settings - Fork 16
Outbox
The outbox stores events for publication after a state change commits. IEventPublisher.PublishAsync remains an in-process notification API. Outbox publication is explicit through IOutboxWriter or IIntegrationOutbox.
Use IIntegrationOutbox for events that cross a bounded-context or process boundary.
public sealed record OrderSubmittedIntegrationEvent : IIntegrationEvent
{
public required Guid OrderId { get; init; }
}var receipt = await integrationOutbox.AddAsync(
new OrderSubmittedIntegrationEvent
{
OrderId = orderId
},
new OutboxOptions
{
MessageId = eventId,
Topic = "orders",
CorrelationId = correlationId
},
cancellationToken);IOutboxWriter accepts any non-null event type. IIntegrationOutbox narrows the API to IIntegrationEvent so external publication is opt-in at the type level.
Use OutboxOptions.MessageId when the application already has a stable event id. v5 does not put that id on a marker interface; the event contract stays focused on domain data while the outbox envelope owns storage identity.
Use OutboxOptions.VisibleAfter for delayed first publication. Retry visibility after failures is still controlled by the processor and store state.
When UseLiteBusEventDispatcher() is registered, the dispatcher copies stored correlation, causation, and tenant values into MessageTraceContextKeys on EventMediationSettings.Items before PublishAsync.
Register contracts and an outbox store.
builder.Services.AddLiteBus(liteBus =>
{
liteBus.AddOutboxModule(outbox =>
{
outbox.Contracts.Register<OrderSubmittedIntegrationEvent>(
name: "orders.events.order-submitted",
version: 1);
});
});The processor dispatches through IOutboxDispatcher. To dispatch through LiteBus event handlers in the current process, opt in to the LiteBus event dispatcher and register the event module.
builder.Services.AddLiteBus(liteBus =>
{
liteBus.AddEventModule(events =>
{
events.Register<OrderSubmittedEventHandler>();
});
liteBus.AddOutboxModule(outbox =>
{
outbox.Contracts.Register<OrderSubmittedIntegrationEvent>(
"orders.events.order-submitted",
version: 1);
outbox.UseLiteBusEventDispatcher();
});
});External broker dispatchers can implement IOutboxDispatcher without changing the writer or processor.
The PostgreSQL package provides a raw Npgsql store.
var dataSource = NpgsqlDataSource.Create(connectionString);
builder.Services.AddLiteBus(liteBus =>
{
liteBus.AddOutboxModule(outbox =>
{
outbox.Contracts.Register<OrderSubmittedIntegrationEvent>(
"orders.events.order-submitted",
version: 1);
});
liteBus.AddPostgreSqlOutboxStore(postgres =>
{
postgres.UseDataSource(dataSource);
postgres.UseOptions(new PostgreSqlOutboxStoreOptions
{
SchemaName = "app",
TableName = "litebus_outbox_messages"
});
});
});When you do not already have an NpgsqlDataSource, pass a connection string and the module creates one for you:
liteBus.AddPostgreSqlOutboxStore(postgres =>
{
postgres.UseConnectionString(configuration.GetConnectionString("OrdersDb")!);
});See PostgreSQL Schema Management for migration-owned SQL files, explicit bootstrap, and opt-in host schema creation.
// Option A: explicit bootstrap during startup or a deploy job
await PostgreSqlOutboxSchema.EnsureAsync(dataSource, outboxOptions, cancellationToken);
// Option B: opt-in host bootstrap (register AddPostgreSqlOutboxSchemaHosting before processor hosting)
liteBus.AddPostgreSqlOutboxStore(postgres =>
{
postgres.UseDataSource(dataSource);
postgres.EnsureSchemaCreationOnStartup();
});
liteBus.AddPostgreSqlOutboxSchemaHosting();CreateIfNotExistsAsync remains available as an alias for EnsureAsync.
The store uses jsonb payloads and FOR UPDATE SKIP LOCKED leases. It does not depend on EF Core.
- Application code stores an event through
IOutboxWriterorIIntegrationOutboxin the same transaction as the state change. - The writer resolves a stable contract name and version.
- The writer serializes the event and stores an
OutboxMessageEnvelope. -
IOutboxProcessor.ProcessPendingAsyncleases due messages. - The processor calls
IOutboxDispatcher.DispatchAsyncfor each message. - The processor marks the message published, failed for retry, or dead-lettered.
| Interface | Used by | Responsibility |
|---|---|---|
IOutboxMessageWriter |
IOutboxWriter |
Append a pending outbox envelope and return the stored row |
IOutboxMessageLeaseStore |
IOutboxProcessor |
Atomically claim due messages for one publisher |
IOutboxMessageStateStore |
IOutboxProcessor |
Record published, failed, or dead-lettered publication results |
The role split keeps event writers from depending on publisher-only operations. A single database implementation can still implement all roles when one table owns the transaction boundary.
Closed generic events are supported when each closed event type is registered with its own stable contract.
public sealed record ExportCompletedEvent<TPayload> : IIntegrationEvent
{
public required TPayload Payload { get; init; }
}
outbox.Contracts.Register<ExportCompletedEvent<CustomerExport>>(
"exports.events.customer-completed",
version: 1);Open generic outbox contracts are not supported. Do not register ExportCompletedEvent<> as a persisted contract because the processor or dispatcher must deserialize each row into one concrete CLR type.
| Field | Purpose |
|---|---|
| message id | Unique outbox message identity |
| contract name | Stable persisted event name |
| contract version | Payload version used by dispatchers |
| serialized payload | Event data |
| topic | Optional publication target |
| created timestamp | Storage time |
| visible-after timestamp | Retry visibility |
| status | pending, publishing, published, failed, dead-lettered |
| attempt count | Retry policy input |
| lease owner | Publisher identity |
| lease expiration | Crash recovery |
| last error | Failure diagnostics |
| correlation id | Trace grouping |
| causation id | Parent operation tracking |
| tenant id | Tenant isolation when needed |
OutboxProcessorOptions uses the same RetryOptions type as the inbox. Failed messages are retried after a delay, then moved to DeadLettered when attempts are exhausted.
outbox.UseProcessorOptions(new OutboxProcessorOptions
{
BatchSize = 100,
LeaseDuration = TimeSpan.FromMinutes(2),
Retry = new RetryOptions
{
MaxAttempts = 12,
InitialDelay = TimeSpan.FromSeconds(10),
MaxDelay = TimeSpan.FromMinutes(10),
Backoff = RetryBackoff.Exponential,
UseJitter = true
}
});The outbox is not an event mediator replacement. Use PublishAsync when the current process should notify handlers now. Use IIntegrationOutbox.AddAsync when a fact must survive process failure and be published after commit.
IOutboxProcessor.ProcessPendingAsync runs one pass. For a generic-host background loop, reference LiteBus.Outbox.Extensions.Microsoft.Hosting:
liteBus.AddOutboxModule(outbox =>
{
outbox.UseProcessorOptions(new OutboxProcessorOptions { BatchSize = 100 });
});
liteBus.AddOutboxProcessorHosting(host => host.PollInterval = TimeSpan.FromSeconds(1));Optional: AddHealthChecks().AddLiteBusOutboxProcessor(). Inbox and outbox hosts are registered separately. See Processor Hosting.