Skip to content

Command Inbox

A. Shafie edited this page May 29, 2026 · 5 revisions

Command Inbox

The command inbox stores ICommand messages for deferred execution. ICommandMediator.SendAsync always executes the command in process. Inbox submission is explicit through ICommandScheduler.ScheduleAsync.

Contract

ScheduleAsync accepts only commands that implement ICommand without a result type. Passing a command that implements ICommand<TResult> throws ArgumentException at the call site because the processor discards the handler result when it replays commands later.

public sealed record ProcessPaymentCommand(Guid PaymentId, decimal Amount) : ICommand;
var receipt = await commandScheduler.ScheduleAsync(
    new ProcessPaymentCommand(paymentId, amount),
    new CommandScheduleOptions
    {
        IdempotencyKey = $"payment:{paymentId}",
        CorrelationId = correlationId
    },
    cancellationToken);

The receipt confirms acceptance into the inbox. It does not contain the business result from the command handler.

public sealed record CommandReceipt<TCommand>
    where TCommand : ICommand
{
    public required Guid CommandId { get; init; }
    public required Type CommandType { get; init; }
    public required string ContractName { get; init; }
    public required int ContractVersion { get; init; }
    public required DateTimeOffset AcceptedAt { get; init; }
    public string? CorrelationId { get; init; }
    public string? CausationId { get; init; }
    public string? TenantId { get; init; }
}

Use a query when a caller needs the final state later.

var status = await queryMediator.QueryAsync(
    new GetPaymentStatusQuery(paymentId),
    cancellationToken);

There is no attribute-based inbox path in v5. SendAsync executes now; ScheduleAsync stores for later execution.

Registration

Register command handlers, the inbox module, a store, and contracts.

builder.Services.AddLiteBus(liteBus =>
{
    liteBus.AddCommandModule(commands =>
    {
        commands.RegisterFromAssembly(typeof(ProcessPaymentCommand).Assembly);
    });

    liteBus.AddCommandInboxModule(inbox =>
    {
        inbox.Contracts.Register<ProcessPaymentCommand>(
            name: "payments.commands.process-payment",
            version: 1);
    });
});

The scheduler writes through ICommandInboxWriter. The processor leases through ICommandInboxLeaseStore and records execution results through ICommandInboxStateStore. Applications can provide their own stores or use the PostgreSQL package, whose store implements all three roles against one table.

var dataSource = NpgsqlDataSource.Create(connectionString);

builder.Services.AddLiteBus(liteBus =>
{
    liteBus.AddCommandInboxModule(inbox =>
    {
        inbox.Contracts.Register<ProcessPaymentCommand>(
            "payments.commands.process-payment",
            version: 1);
    });

    liteBus.AddPostgreSqlCommandInboxStore(postgres =>
    {
        postgres.UseDataSource(dataSource);
        postgres.UseOptions(new PostgreSqlInboxStoreOptions
        {
            SchemaName = "app",
            TableName = "litebus_inbox_commands"
        });
    });
});

await PostgreSqlInboxSchema.CreateIfNotExistsAsync(
    dataSource,
    new PostgreSqlInboxStoreOptions
    {
        SchemaName = "app",
        TableName = "litebus_inbox_commands"
    },
    cancellationToken);

The PostgreSQL store uses raw Npgsql commands, jsonb payloads, and FOR UPDATE SKIP LOCKED leases. It does not depend on EF Core.

Processing Flow

  1. The caller schedules a command with ICommandScheduler.ScheduleAsync.
  2. The scheduler resolves a stable contract name and version.
  3. The scheduler serializes the command and stores an InboxCommandEnvelope.
  4. ICommandInboxProcessor.ProcessPendingAsync leases due commands.
  5. The processor resolves the command type from the contract registry.
  6. The processor deserializes the payload and calls ICommandMediator.SendAsync.
  7. The normal command pipeline runs: pre-handlers, handler, post-handlers, and error handlers.
  8. The processor marks the command completed, failed for retry, or dead-lettered.

Store Roles

Interface Used by Responsibility
ICommandInboxWriter ICommandScheduler Append a pending command envelope and return the stored row
ICommandInboxLeaseStore ICommandInboxProcessor Atomically claim due commands for one worker
ICommandInboxStateStore ICommandInboxProcessor Record completed, failed, or dead-lettered execution results

The role split keeps application services from depending on processor-only operations. A single database implementation can still implement all roles when one table owns the transaction boundary.

Generic Commands

Closed generic commands are supported when each closed command type is registered with its own stable contract.

public sealed record ArchiveCommand<T> : ICommand
{
    public required T Value { get; init; }
}

inbox.Contracts.Register<ArchiveCommand<string>>(
    "archive.commands.string",
    version: 1);

Open generic inbox contracts are not supported. Do not register ArchiveCommand<> as a persisted contract because the processor must deserialize each row into one concrete CLR type.

Storage Record

An inbox store persists non-generic envelopes because one table can contain many command types.

Field Purpose
command id Unique stored command identity
contract name Stable persisted command name
contract version Payload version used for deserialization
serialized payload Command data
created timestamp Acceptance time
visible-after timestamp Delayed processing and retry
attempt count Retry policy input
status pending, processing, completed, failed, dead-lettered
idempotency key Duplicate submission detection
lease owner Worker identity
lease expiration Crash recovery
last error Failure diagnostics
correlation id Trace grouping
causation id Parent operation tracking
tenant id Tenant isolation when needed

Idempotency

Inbox processing is at least once. A handler can run more than once after a crash, timeout, or retry. Use business keys or an idempotency store around side effects.

public sealed record SubmitPaymentCommand : IIdempotentCommand
{
    public required Guid PaymentId { get; init; }
    public required decimal Amount { get; init; }
    public required string IdempotencyKey { get; init; }
}

When CommandScheduleOptions.IdempotencyKey is not set, the scheduler reads IIdempotentCommand.IdempotencyKey.

A duplicate submission with the same idempotency key returns the first accepted command. Later payloads are not merged; design handlers to be safe when the same key is retried.

Optional inbox marker

IInboxCommand is an optional marker that extends ICommand. Scheduling still happens only through ICommandScheduler.ScheduleAsync; the marker makes inbox-intended commands visible in reviews and generic constraints.

Execution context during replay

When the inbox processor executes a leased command, it sets CommandInboxExecutionContextKeys.IsInboxExecution and copies stored correlation, causation, and tenant values into MessageTraceContextKeys on CommandMediationSettings.Items and the ambient execution context. Pre-handlers, handlers, and post-handlers can read those keys for logging, metrics, and idempotency policy.

Retry and Dead Letter

CommandInboxProcessorOptions contains batch size, lease duration, lease owner, and RetryOptions. Failed commands are made visible after the retry delay. Commands that exceed the maximum attempts are moved to DeadLettered with the last error.

inbox.UseProcessorOptions(new CommandInboxProcessorOptions
{
    BatchSize = 50,
    LeaseDuration = TimeSpan.FromMinutes(2),
    Retry = new RetryOptions
    {
        MaxAttempts = 10,
        InitialDelay = TimeSpan.FromSeconds(5),
        MaxDelay = TimeSpan.FromMinutes(5),
        Backoff = RetryBackoff.Exponential,
        UseJitter = true
    }
});

Transaction Boundary

Store an inbox command in the same database transaction as the state change that creates the need for the command. If the command is accepted directly from an HTTP request, the inbox row is the acceptance point for the request.

Use the outbox for facts that must be published after a state change. The inbox schedules future command execution; the outbox records events for later publication.

Processor hosting

ICommandInboxProcessor.ProcessPendingAsync runs one pass. For a background loop inside a generic host, reference LiteBus.Inbox.Extensions.Microsoft.Hosting:

liteBus.AddCommandInboxModule(inbox =>
{
    inbox.UseProcessorOptions(new CommandInboxProcessorOptions { BatchSize = 50 });
});

liteBus.AddCommandInboxProcessorHosting(host => host.PollInterval = TimeSpan.FromSeconds(1));

Optional health checks: AddHealthChecks().AddLiteBusCommandInboxProcessor().

See Processor Hosting for adaptive polling, lifecycle hooks, Autofac registration, and how this differs from v4 hosting packages.

Clone this wiki locally