Skip to content

Integration MassTransit Extension

aryehcitron@gmail.com edited this page May 17, 2026 · 12 revisions

The Kronikol.Extensions.MassTransit package adds MassTransit message tracking to your test diagrams. Send, Publish, and Consume operations appear as classified events in your sequence diagrams — showing the complete message flow across your services.

Because MassTransit is transport-agnostic, this single extension covers RabbitMQ, Azure Service Bus, Amazon SQS, and all other MassTransit-supported transports.

Using a custom messaging abstraction on top of MassTransit? If your code doesn't call ISendEndpointProvider, IPublishEndpoint, or consume via MassTransit consumers directly — e.g. it goes through a shared messaging library or custom abstraction that wraps MassTransit — this extension's observers should still fire automatically since they hook into the MassTransit pipeline. If they don't (e.g. your abstraction bypasses MassTransit entirely), see Tracking Custom Dependencies for alternative approaches including MessageTracker and TrackingProxy<T>.


How It Works

MassTransit provides observer interfaces (ISendObserver, IPublishObserver, IConsumeObserver) that fire on every message operation. The extension implements these observers to capture and classify each operation, then logs it to RequestResponseLogger with MetaType.Event.

A central MassTransitTracker (implementing ITrackingComponent) handles all logging. The three observer classes are thin adapters that extract context properties and delegate to the tracker.


Install

dotnet add package Kronikol.Extensions.MassTransit

Verbosity Levels

Level Method shown URI shown Headers Message body
Raw Send OrderCreated → rabbitmq://localhost/orders Transport URI N/A Full JSON
Detailed Send OrderCreated masstransit:///orders-queue N/A Full JSON
Summarised → OrderCreated (send/publish) or ← OrderCreated (consume) masstransit:///OrderCreated N/A None

The default is Detailed.

Diagram Label Examples

Operation Raw Detailed Summarised
Send message Send OrderCreated → rabbitmq://localhost/orders Send OrderCreated → OrderCreated
Publish event Publish UserRegistered → rabbitmq://localhost/events Publish UserRegistered → UserRegistered
Consume message Consume OrderCreated → rabbitmq://localhost/orders Consume OrderCreated ← OrderCreated
Send fault SendFault OrderCreated → rabbitmq://localhost/orders Send Fault OrderCreated SendFault

Classified Operations

Operation Trigger
Send ISendObserver.PostSend
Publish IPublishObserver.PostPublish
Consume IConsumeObserver.PostConsume
SendFault ISendObserver.SendFault
PublishFault IPublishObserver.PublishFault
ConsumeFault IConsumeObserver.ConsumeFault

Setup

Option A: Bus Configuration Extension (Recommended)

services.AddMassTransit(cfg =>
{
    cfg.UsingRabbitMq((ctx, rabbit) =>
    {
        rabbit.WithTestTracking(new MassTransitTrackingOptions
        {
            ServiceName = "RabbitMQ",
            CallerName = "My API",
            Verbosity = MassTransitTrackingVerbosity.Detailed,
            CurrentTestInfoFetcher = CurrentTestInfo.Fetcher
        });

        rabbit.ConfigureEndpoints(ctx);
    });
});

WithTestTracking connects all three observers (send, publish, consume) to the bus and returns a MassTransitTracker instance.

Option B: With Azure Service Bus Transport

services.AddMassTransit(cfg =>
{
    cfg.UsingAzureServiceBus((ctx, sb) =>
    {
        sb.WithTestTracking(new MassTransitTrackingOptions
        {
            ServiceName = "ServiceBus",
            CallerName = "My API",
            CurrentTestInfoFetcher = () => /* ... */
        });

        sb.ConfigureEndpoints(ctx);
    });
});

Option C: With In-Memory Transport (Test Harness)

services.AddMassTransit(cfg =>
{
    cfg.UsingInMemory((ctx, mem) =>
    {
        mem.WithTestTracking(new MassTransitTrackingOptions
        {
            ServiceName = "MessageBus",
            CallerName = "My API",
            CurrentTestInfoFetcher = () => /* ... */
        });

        mem.ConfigureEndpoints(ctx);
    });
});

Option D: Manual Observer Registration

If you need more control:

var tracker = new MassTransitTracker(new MassTransitTrackingOptions
{
    ServiceName = "RabbitMQ",
    CallerName = "My API",
    CurrentTestInfoFetcher = () => /* ... */
});

services.AddMassTransit(cfg =>
{
    cfg.UsingRabbitMq((ctx, rabbit) =>
    {
        rabbit.ConnectSendObserver(new TrackingSendObserver(tracker));
        rabbit.ConnectPublishObserver(new TrackingPublishObserver(tracker));
        rabbit.ConnectConsumeObserver(new TrackingConsumeObserver(tracker));

        rabbit.ConfigureEndpoints(ctx);
    });
});

Configuration Reference

MassTransitTrackingOptions

Property Type Default Description
ServiceName string "MassTransit" The participant name shown in the diagram for the message bus
CallerName string "Caller" The participant name shown for the service making/receiving messages
Verbosity MassTransitTrackingVerbosity Detailed Controls how much detail appears in the diagram (Raw, Detailed, Summarised)
CurrentTestInfoFetcher Func<(string Name, string Id)>? null Returns the current test's name and ID. Required — if null, messages are not logged
CurrentStepTypeFetcher Func<string?>? null Optional — returns the current BDD step type (Given/When/Then)
HttpContextAccessor IHttpContextAccessor? null Optional — enables dual-resolution of test identity from HTTP headers. Auto-resolved by DI extensions (v2.26.3+). See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+)

v2.23.0+ Dual-Resolution: MassTransitTracker accepts an optional IHttpContextAccessor? httpContextAccessor constructor parameter for resolving test identity from HTTP request headers when running inside the SUT's request pipeline. v2.26.3+: Set HttpContextAccessor on MassTransitTrackingOptions instead — the tracker reads it automatically. See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+) for details. | TrackSend | bool | true | Whether to track ISendObserver events | | TrackPublish | bool | true | Whether to track IPublishObserver events | | TrackConsume | bool | true | Whether to track IConsumeObserver events | | LogMessageBody | bool | true | Whether to serialize and include message bodies in diagrams | | LogFaults | bool | true | Whether to log fault (exception) events | | SetupVerbosity | MassTransitTrackingVerbosity? | null | Verbosity override for the Setup phase. See Phase-Aware Tracking | | ActionVerbosity | MassTransitTrackingVerbosity? | null | Verbosity override for the Action phase. See Phase-Aware Tracking | | TrackDuringSetup | bool | true | When false, tracking is suppressed during Setup. See Phase-Aware Tracking | | TrackDuringAction | bool | true | When false, tracking is suppressed during Action. See Phase-Aware Tracking | | PropagateTestIdentity | bool | true | When true, send/publish observers inject kronikol-test-name / kronikol-test-id into transport headers and consume observers extract them to establish TestIdentityScope. See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+) |

Selective Tracking

You can enable/disable individual observer types:

new MassTransitTrackingOptions
{
    TrackSend = true,
    TrackPublish = true,
    TrackConsume = false,  // Don't track consumer side
    LogFaults = false       // Don't log fault events
}

Message Direction

The extension correctly models message direction in diagrams:

  • Send/Publish — Shown as outgoing from CallerName to ServiceName
  • Consume — Shown as incoming: caller and service names are swapped so the arrow points from the bus to the consuming service

This means a publish followed by a consume will show the correct bidirectional flow in the sequence diagram.


Invocation Validation

MassTransitTracker implements ITrackingComponent and auto-registers with TrackingComponentRegistry on construction. At report generation time, unused components are automatically detected and surfaced as console warnings and in the diagnostic report (when DiagnosticMode=true). This never throws or fails tests.

See Diagnostics and Debugging for full details on the TrackingComponentRegistry API.


Test Identity Propagation (v2.34.0+)

When your application consumes MassTransit messages on background threads, the test framework's TestContext is unavailable. The extension solves this automatically via transport header propagation.

How It Works

  1. Send/Publish side: TrackingSendObserver and TrackingPublishObserver inject kronikol-test-name and kronikol-test-id into transport headers via context.Headers.Set() in PreSend/PrePublish.
  2. Consume side: TrackingConsumeObserver reads these headers via context.Headers.Get<string>() in PreConsume and calls TestIdentityScope.SetFromMessage().
  3. All subsequent tracking within the consumer resolves to the originating test.

Disabling Propagation

var options = new MassTransitTrackingOptions
{
    PropagateTestIdentity = false,
    // ...
};

See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+) for the full architecture overview.

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally