Skip to content

Event Driven Architecture Testing

aryehcitron@gmail.com edited this page May 17, 2026 · 2 revisions

This page covers how to use Kronikol in event-driven architectures — applications where:

  1. There are no direct HTTP calls from the test project to the SUT
  2. Tests trigger behaviour by publishing an event that the SUT consumes
  3. The SUT processes events in a background service (hosted service, consumer, subscriber)
  4. The SUT then makes outbound calls to dependencies and databases

This is the most complex tracking scenario because the test thread never directly communicates with the SUT via HTTP — instead, a message broker sits between the test and the SUT's entry point.


How It Works

┌──────────┐    ┌──────────────┐    ┌──────────────────────┐    ┌────────────────┐
│   Test   │───▶│ Message Bus  │───▶│  SUT Background Svc  │───▶│  Dependencies  │
│  Thread  │    │ (Kafka, SB,  │    │  (Consumer/Handler)  │    │  (HTTP, DB,    │
│          │    │  MassTransit)│    │                      │    │   Cosmos, etc) │
└──────────┘    └──────────────┘    └──────────────────────┘    └────────────────┘
     │                                         │                        │
     │  kronikol-test-name                          │  AsyncLocal flows      │
     │  kronikol-test-id     ─── headers ───▶       │  to all subsequent     │
     │  (injected into                         │  tracking calls        │
     │   message metadata)                     │                        │

The Identity Propagation Chain

Step What Happens Mechanism
1. Test publishes event Test identity injected into message metadata Messaging extension producer (e.g. TrackingKafkaProducer, TrackingSendObserver)
2. Message delivered Headers travel with the message through the broker Native transport headers (Kafka Headers, SB ApplicationProperties, etc.)
3. SUT consumer receives Headers extracted, TestIdentityScope.SetFromMessage() called Messaging extension consumer (e.g. TrackingConsumeObserver, TrackingKafkaConsumer)
4. SUT makes HTTP calls TestTrackingMessageHandler reads TestIdentityScope.Current Background Thread Correlation#scenario-resolution-reference
5. SUT writes to database Database extension reads TestIdentityScope.Current Same resolution chain as HTTP

Because SetFromMessage() sets an AsyncLocal<T> value, all subsequent async operations on the same execution context inherit the test identity. This includes outbound HTTP calls, database writes, further message publishes, and any other tracked interaction.


Configuration

Prerequisites

You need the messaging extension registered on both sides:

Side What to Register Purpose
Test project (producer) Messaging extension with PropagateTestIdentity = true Injects kronikol-test-name / kronikol-test-id headers into published messages
SUT (consumer) Same messaging extension with PropagateTestIdentity = true Extracts headers and establishes TestIdentityScope on the consumer thread
SUT (outbound HTTP) TestTrackingMessageHandler on all HttpClient instances Intercepts outbound HTTP calls and resolves test identity from TestIdentityScope.Current
SUT (databases) Database tracking extensions (CosmosDB, SQL, MongoDB, etc.) Tracks database interactions and resolves test identity from TestIdentityScope.Current

Example: MassTransit

// Test project — WebApplicationFactory ConfigureTestServices
builder.ConfigureTestServices(services =>
{
    // Track outbound HTTP calls from the SUT
    services.TrackDependenciesForDiagrams(new XUnitTestTrackingMessageHandlerOptions
    {
        PortsToServiceNames = new Dictionary<int, string>
        {
            [5001] = "Payment Service",
            [5002] = "Inventory Service",
        },
        CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
    });

    // Track messages with identity propagation
    services.TrackMessagesForDiagrams(new MessageTrackerOptions
    {
        CallerName = "Order Service",
        ServiceName = "RabbitMQ",
        CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
    });

    // MassTransit tracking — propagates test identity through message headers
    services.AddMassTransitTracking(new MassTransitTrackingOptions
    {
        PropagateTestIdentity = true, // Enabled by default
        CallerName = "Order Service",
        ServiceName = "RabbitMQ",
        CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
    });

    // Database tracking (e.g. CosmosDB)
    services.AddCosmosTracking(new CosmosTrackingOptions
    {
        ServiceName = "CosmosDB",
        CallerName = "Order Service",
        CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
    });
});

Example: Kafka

services.AddKafkaTracking(new KafkaTrackingOptions
{
    PropagateTestIdentity = true,
    CallerName = "My Service",
    ServiceName = "Kafka",
    CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});

Example: Azure Service Bus

services.AddServiceBusTracking(new ServiceBusTrackingOptions
{
    PropagateTestIdentity = true,
    CallerName = "My Service",
    ServiceName = "Service Bus",
    CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});

All messaging extensions support PropagateTestIdentity. See the specific extension page for full configuration: Integration Kafka Extension, Integration MassTransit Extension, Integration ServiceBus Extension, Integration EventHubs Extension, Integration PubSub Extension, Integration SQS Extension, Integration SNS Extension, Integration EventBridge Extension, Integration StorageQueues Extension.


Test Structure

In an event-driven test, the test body typically:

  1. Publishes an event (triggering the SUT)
  2. Waits for the SUT to complete processing (polling, waiting for a side-effect, or using a completion signal)
  3. Asserts on the outcome
[Fact]
public async Task When_order_placed_event_received_Then_payment_is_processed()
{
    // Arrange — publish the event that triggers the SUT
    var orderEvent = new OrderPlacedEvent { OrderId = Guid.NewGuid(), Amount = 99.99m };
    await _bus.Publish(orderEvent);

    // Act — wait for the SUT to process the event and call dependencies
    var payment = await WaitForPaymentProcessed(orderEvent.OrderId);

    // Assert
    payment.Status.Should().Be(PaymentStatus.Completed);
}

The generated sequence diagram will show:

Test ──▶ RabbitMQ : Publish (OrderPlacedEvent)
         RabbitMQ ──▶ Order Service : Consume (OrderPlacedEvent)
                      Order Service ──▶ Payment Service : POST /payments
                      Order Service ◀── Payment Service : 200 OK
                      Order Service ──▶ CosmosDB : Upsert (PaymentRecord)

Parallel Test Execution

Event-driven tracking is inherently parallel-safe because each message carries its own test identity:

  • Test A publishes Event A → SUT consumer thread 1 gets TestIdentityScope = Test A
  • Test B publishes Event B → SUT consumer thread 2 gets TestIdentityScope = Test B

Each consumer invocation sets its own AsyncLocal value independently. There is no shared state between parallel test executions — each message processing context is isolated.

Important: This parallel safety only applies when the SUT processes messages using standard async/await patterns. If the consumer dispatches work to pre-existing threads that don't participate in the consumer's async context, see Background Thread Correlation#solution-3-globalfallback-v22816 and the #Pre-Existing Thread Correlation section below.


Pre-Existing Thread Correlation

If the SUT's consumer dispatches work to threads that were started before the message arrived (e.g., a long-lived Task.Run loop, a Change Feed Processor polling thread, or a Hangfire worker), those threads cannot inherit the AsyncLocal value set by SetFromMessage().

Solutions (in order of preference)

Approach Parallel-Safe? Use When
Background Thread Correlation#solution-1b-automatic-message-header-propagation-v2340 ✅ Yes The pre-existing thread processes messages that carry Kronikol headers
#Work-Item Correlation (v2.36.0+) ✅ Yes The thread processes data items that can carry a correlation ID
Background Thread Correlation#solution-2-instance-scoped-test-tracker ✅ Yes (per-collection) Tests within a collection run sequentially
Background Thread Correlation#solution-3-globalfallback-v22816 ❌ No Last resort — serial execution only

Work-Item Correlation (v2.36.0+)

For scenarios where a pre-existing background thread processes data items (e.g., Change Feed documents, queue messages from a custom poller), the TestCorrelationStore system provides parallel-safe correlation without any production code changes:

// The CosmosDB tracking extension auto-populates the correlation store
// on every Create/Upsert/Replace when AutoCorrelateWrites = true (default).
// Just wrap your Change Feed delegate:
var processor = container.GetChangeFeedProcessorBuilder<Order>("processor",
    ChangeFeedCorrelation.Wrap<Order>(
        async (changes, ct) =>
        {
            foreach (var order in changes)
                await ProcessOrder(order);
        },
        serviceName: "CosmosDB",
        idSelector: order => order.Id))
    .Build();

For decoupled consumers (channel/queue handoff):

// Processing side — wrap with key selector
var wrapped = ProcessingCorrelation.Wrap<OrderEvent>(
    async (order, ct) => await ProcessOrder(order),
    order => CorrelationKeys.Kafka("Kafka", order.OrderId));

See Parallel-Safe Background Correlation for the full implementation guide, migration instructions, and all supported patterns.


Verification

Use the Diagnostics and Debugging report to verify tracking is working:

  1. Enable diagnostic mode: DiagnosticMode = true in report options
  2. Run your event-driven test
  3. Check the diagnostic report for:
    • Zero "Unknown" entries for operations triggered by the event
    • Correct test attribution — all operations appear under the correct test
    • Complete sequence — publish → consume → dependencies all visible

Common Issues

Symptom Cause Fix
No diagram generated PropagateTestIdentity not enabled on producer side Enable PropagateTestIdentity = true on the messaging extension options
Consume appears but no dependency calls TestTrackingMessageHandler not registered on SUT's HttpClients Register via TrackDependenciesForDiagrams() or Pattern 5/8 from Tracking Dependencies
All operations show as "Unknown" Consumer extension not extracting headers Verify the consumer-side messaging extension is registered with PropagateTestIdentity = true
Diagram shows dependencies but under wrong test Multiple messages processed on same thread without identity reset Ensure consumer extension calls SetFromMessage() per message (this is automatic with Kronikol extensions)
Database operations not tracked Database extension not registered Add the appropriate database extension (e.g. Integration CosmosDB Extension, Integration SqlClient Extension)

Complete Example

A full working example of event-driven architecture testing:

// ═══════════════════════════════════════════════════════════════
// Collection Fixture — shared infrastructure
// ═══════════════════════════════════════════════════════════════

[CollectionDefinition("OrderProcessing")]
public class OrderProcessingCollection : ICollectionFixture<OrderProcessingFixture>;

public class OrderProcessingFixture : IAsyncLifetime
{
    public WebApplicationFactory<Program> Factory { get; private set; } = null!;
    public IBus Bus => Factory.Services.GetRequiredService<IBus>();

    public async ValueTask InitializeAsync()
    {
        Factory = new WebApplicationFactory<Program>()
            .WithWebHostBuilder(builder =>
            {
                builder.ConfigureTestServices(services =>
                {
                    // HTTP dependency tracking
                    services.TrackDependenciesForDiagrams(
                        new XUnitTestTrackingMessageHandlerOptions
                        {
                            CallerName = "Order Service",
                            CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
                            PortsToServiceNames = new Dictionary<int, string>
                            {
                                [5001] = "Payment Gateway",
                                [5002] = "Notification Service",
                            },
                        });

                    // MassTransit tracking with identity propagation
                    services.AddMassTransitTracking(new MassTransitTrackingOptions
                    {
                        PropagateTestIdentity = true,
                        CallerName = "Order Service",
                        ServiceName = "RabbitMQ",
                        CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
                    });

                    // Database tracking
                    services.AddCosmosTracking(new CosmosTrackingOptions
                    {
                        ServiceName = "Orders DB",
                        CallerName = "Order Service",
                        CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
                    });
                });
            });

        // Ensure the host is started (background consumers begin listening)
        _ = Factory.Server;
    }

    public async ValueTask DisposeAsync() => await Factory.DisposeAsync();
}

// ═══════════════════════════════════════════════════════════════
// Test Class
// ═══════════════════════════════════════════════════════════════

[Collection("OrderProcessing")]
public class OrderPlacedConsumerTests(OrderProcessingFixture fixture)
    : DiagrammedComponentTest
{
    [Fact]
    public async Task Processes_payment_and_sends_confirmation()
    {
        // Arrange
        var orderEvent = new OrderPlacedEvent
        {
            OrderId = Guid.NewGuid(),
            CustomerId = "cust-123",
            Amount = 49.99m,
        };

        // Act — publish triggers the SUT's background consumer
        await fixture.Bus.Publish(orderEvent);

        // Assert — wait for observable side-effect
        var payment = await Poll.Until(
            () => GetPayment(orderEvent.OrderId),
            p => p?.Status == PaymentStatus.Completed,
            timeout: TimeSpan.FromSeconds(5));

        payment.Should().NotBeNull();
    }

    private async Task<Payment?> GetPayment(Guid orderId) { /* ... */ }
}

Related Pages

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally