-
Notifications
You must be signed in to change notification settings - Fork 16
Command Inbox
The command inbox stores ICommand messages for deferred execution. ICommandMediator.SendAsync always executes the command in process. Inbox submission is explicit through ICommandScheduler.ScheduleAsync.
ScheduleAsync accepts only commands that implement ICommand without a result type. Passing a command that implements ICommand<TResult> throws ArgumentException at the call site because the processor discards the handler result when it replays commands later.
public sealed record ProcessPaymentCommand(Guid PaymentId, decimal Amount) : ICommand;var receipt = await commandScheduler.ScheduleAsync(
new ProcessPaymentCommand(paymentId, amount),
new CommandScheduleOptions
{
IdempotencyKey = $"payment:{paymentId}",
CorrelationId = correlationId
},
cancellationToken);The receipt confirms acceptance into the inbox. It does not contain the business result from the command handler.
public sealed record CommandReceipt<TCommand>
where TCommand : ICommand
{
public required Guid CommandId { get; init; }
public required Type CommandType { get; init; }
public required string ContractName { get; init; }
public required int ContractVersion { get; init; }
public required DateTimeOffset AcceptedAt { get; init; }
public string? CorrelationId { get; init; }
public string? CausationId { get; init; }
public string? TenantId { get; init; }
}Use a query when a caller needs the final state later.
var status = await queryMediator.QueryAsync(
new GetPaymentStatusQuery(paymentId),
cancellationToken);There is no attribute-based inbox path in v5. SendAsync executes now; ScheduleAsync stores for later execution.
Register command handlers, the inbox module, a store, and contracts.
builder.Services.AddLiteBus(liteBus =>
{
liteBus.AddCommandModule(commands =>
{
commands.RegisterFromAssembly(typeof(ProcessPaymentCommand).Assembly);
});
liteBus.AddCommandInboxModule(inbox =>
{
inbox.Contracts.Register<ProcessPaymentCommand>(
name: "payments.commands.process-payment",
version: 1);
});
});The scheduler writes through ICommandInboxWriter. The processor leases through ICommandInboxLeaseStore and records execution results through ICommandInboxStateStore. Applications can provide their own stores or use the PostgreSQL package, whose store implements all three roles against one table.
var dataSource = NpgsqlDataSource.Create(connectionString);
builder.Services.AddLiteBus(liteBus =>
{
liteBus.AddCommandInboxModule(inbox =>
{
inbox.Contracts.Register<ProcessPaymentCommand>(
"payments.commands.process-payment",
version: 1);
});
liteBus.AddPostgreSqlCommandInboxStore(postgres =>
{
postgres.UseDataSource(dataSource);
postgres.UseOptions(new PostgreSqlInboxStoreOptions
{
SchemaName = "app",
TableName = "litebus_inbox_commands"
});
});
});When you do not already have an NpgsqlDataSource, pass a connection string and the module creates one for you:
liteBus.AddPostgreSqlCommandInboxStore(postgres =>
{
postgres.UseConnectionString(configuration.GetConnectionString("OrdersDb")!);
});See PostgreSQL Schema Management for migration-owned SQL files, explicit bootstrap, and opt-in host schema creation.
// Option A: explicit bootstrap during startup or a deploy job
await PostgreSqlInboxSchema.EnsureAsync(dataSource, inboxOptions, cancellationToken);
// Option B: opt-in host bootstrap (register AddPostgreSqlCommandInboxSchemaHosting before processor hosting)
liteBus.AddPostgreSqlCommandInboxStore(postgres =>
{
postgres.UseDataSource(dataSource);
postgres.EnsureSchemaCreationOnStartup();
});
liteBus.AddPostgreSqlCommandInboxSchemaHosting();CreateIfNotExistsAsync remains available as an alias for EnsureAsync.
The PostgreSQL store uses raw Npgsql commands, jsonb payloads, and FOR UPDATE SKIP LOCKED leases. It does not depend on EF Core.
- The caller schedules a command with
ICommandScheduler.ScheduleAsync. - The scheduler resolves a stable contract name and version.
- The scheduler serializes the command and stores an
InboxCommandEnvelope. -
ICommandInboxProcessor.ProcessPendingAsyncleases due commands. - The processor resolves the command type from the contract registry.
- The processor deserializes the payload and calls
ICommandMediator.SendAsync. - The normal command pipeline runs: pre-handlers, handler, post-handlers, and error handlers.
- The processor marks the command completed, failed for retry, or dead-lettered.
| Interface | Used by | Responsibility |
|---|---|---|
ICommandInboxWriter |
ICommandScheduler |
Append a pending command envelope and return the stored row |
ICommandInboxLeaseStore |
ICommandInboxProcessor |
Atomically claim due commands for one worker |
ICommandInboxStateStore |
ICommandInboxProcessor |
Record completed, failed, or dead-lettered execution results |
The role split keeps application services from depending on processor-only operations. A single database implementation can still implement all roles when one table owns the transaction boundary.
Closed generic commands are supported when each closed command type is registered with its own stable contract.
public sealed record ArchiveCommand<T> : ICommand
{
public required T Value { get; init; }
}
inbox.Contracts.Register<ArchiveCommand<string>>(
"archive.commands.string",
version: 1);Open generic inbox contracts are not supported. Do not register ArchiveCommand<> as a persisted contract because the processor must deserialize each row into one concrete CLR type.
An inbox store persists non-generic envelopes because one table can contain many command types.
| Field | Purpose |
|---|---|
| command id | Unique stored command identity |
| contract name | Stable persisted command name |
| contract version | Payload version used for deserialization |
| serialized payload | Command data |
| created timestamp | Acceptance time |
| visible-after timestamp | Delayed processing and retry |
| attempt count | Retry policy input |
| status | pending, processing, completed, failed, dead-lettered |
| idempotency key | Duplicate submission detection |
| lease owner | Worker identity |
| lease expiration | Crash recovery |
| last error | Failure diagnostics |
| correlation id | Trace grouping |
| causation id | Parent operation tracking |
| tenant id | Tenant isolation when needed |
Inbox processing is at least once. A handler can run more than once after a crash, timeout, or retry. Use business keys or an idempotency store around side effects.
public sealed record SubmitPaymentCommand : IIdempotentCommand
{
public required Guid PaymentId { get; init; }
public required decimal Amount { get; init; }
public required string IdempotencyKey { get; init; }
}When CommandScheduleOptions.IdempotencyKey is not set, the scheduler reads IIdempotentCommand.IdempotencyKey.
A duplicate submission with the same idempotency key returns the first accepted command. Later payloads are not merged; design handlers to be safe when the same key is retried.
IInboxCommand is an optional marker that extends ICommand. Scheduling still happens only through ICommandScheduler.ScheduleAsync; the marker makes inbox-intended commands visible in reviews and generic constraints.
When the inbox processor executes a leased command, it sets CommandInboxExecutionContextKeys.IsInboxExecution and copies stored correlation, causation, and tenant values into MessageTraceContextKeys on CommandMediationSettings.Items and the ambient execution context. Pre-handlers, handlers, and post-handlers can read those keys for logging, metrics, and idempotency policy.
CommandInboxProcessorOptions contains batch size, lease duration, lease owner, and RetryOptions. Failed commands are made visible after the retry delay. Commands that exceed the maximum attempts are moved to DeadLettered with the last error.
inbox.UseProcessorOptions(new CommandInboxProcessorOptions
{
BatchSize = 50,
LeaseDuration = TimeSpan.FromMinutes(2),
Retry = new RetryOptions
{
MaxAttempts = 10,
InitialDelay = TimeSpan.FromSeconds(5),
MaxDelay = TimeSpan.FromMinutes(5),
Backoff = RetryBackoff.Exponential,
UseJitter = true
}
});Store an inbox command in the same database transaction as the state change that creates the need for the command. If the command is accepted directly from an HTTP request, the inbox row is the acceptance point for the request.
Use the outbox for facts that must be published after a state change. The inbox schedules future command execution; the outbox records events for later publication.
ICommandInboxProcessor.ProcessPendingAsync runs one pass. For a background loop inside a generic host, reference LiteBus.Inbox.Extensions.Microsoft.Hosting:
liteBus.AddCommandInboxModule(inbox =>
{
inbox.UseProcessorOptions(new CommandInboxProcessorOptions { BatchSize = 50 });
});
liteBus.AddCommandInboxProcessorHosting(host => host.PollInterval = TimeSpan.FromSeconds(1));Optional health checks: AddHealthChecks().AddLiteBusCommandInboxProcessor().
See Processor Hosting for adaptive polling, lifecycle hooks, Autofac registration, and how this differs from v4 hosting packages.