-
Notifications
You must be signed in to change notification settings - Fork 1
Event Driven Architecture Testing
This page covers how to use Kronikol in event-driven architectures — applications where:
- There are no direct HTTP calls from the test project to the SUT
- Tests trigger behaviour by publishing an event that the SUT consumes
- The SUT processes events in a background service (hosted service, consumer, subscriber)
- The SUT then makes outbound calls to dependencies and databases
This is the most complex tracking scenario because the test thread never directly communicates with the SUT via HTTP — instead, a message broker sits between the test and the SUT's entry point.
┌──────────┐ ┌──────────────┐ ┌──────────────────────┐ ┌────────────────┐
│ Test │───▶│ Message Bus │───▶│ SUT Background Svc │───▶│ Dependencies │
│ Thread │ │ (Kafka, SB, │ │ (Consumer/Handler) │ │ (HTTP, DB, │
│ │ │ MassTransit)│ │ │ │ Cosmos, etc) │
└──────────┘ └──────────────┘ └──────────────────────┘ └────────────────┘
│ │ │
│ kronikol-test-name │ AsyncLocal flows │
│ kronikol-test-id ─── headers ───▶ │ to all subsequent │
│ (injected into │ tracking calls │
│ message metadata) │ │
| Step | What Happens | Mechanism |
|---|---|---|
| 1. Test publishes event | Test identity injected into message metadata | Messaging extension producer (e.g. TrackingKafkaProducer, TrackingSendObserver) |
| 2. Message delivered | Headers travel with the message through the broker | Native transport headers (Kafka Headers, SB ApplicationProperties, etc.) |
| 3. SUT consumer receives | Headers extracted, TestIdentityScope.SetFromMessage() called |
Messaging extension consumer (e.g. TrackingConsumeObserver, TrackingKafkaConsumer) |
| 4. SUT makes HTTP calls |
TestTrackingMessageHandler reads TestIdentityScope.Current
|
Background Thread Correlation#scenario-resolution-reference |
| 5. SUT writes to database | Database extension reads TestIdentityScope.Current
|
Same resolution chain as HTTP |
Because SetFromMessage() sets an AsyncLocal<T> value, all subsequent async operations on the same execution context inherit the test identity. This includes outbound HTTP calls, database writes, further message publishes, and any other tracked interaction.
You need the messaging extension registered on both sides:
| Side | What to Register | Purpose |
|---|---|---|
| Test project (producer) | Messaging extension with PropagateTestIdentity = true
|
Injects kronikol-test-name / kronikol-test-id headers into published messages |
| SUT (consumer) | Same messaging extension with PropagateTestIdentity = true
|
Extracts headers and establishes TestIdentityScope on the consumer thread |
| SUT (outbound HTTP) |
TestTrackingMessageHandler on all HttpClient instances |
Intercepts outbound HTTP calls and resolves test identity from TestIdentityScope.Current
|
| SUT (databases) | Database tracking extensions (CosmosDB, SQL, MongoDB, etc.) | Tracks database interactions and resolves test identity from TestIdentityScope.Current
|
// Test project — WebApplicationFactory ConfigureTestServices
builder.ConfigureTestServices(services =>
{
// Track outbound HTTP calls from the SUT
services.TrackDependenciesForDiagrams(new XUnitTestTrackingMessageHandlerOptions
{
PortsToServiceNames = new Dictionary<int, string>
{
[5001] = "Payment Service",
[5002] = "Inventory Service",
},
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});
// Track messages with identity propagation
services.TrackMessagesForDiagrams(new MessageTrackerOptions
{
CallerName = "Order Service",
ServiceName = "RabbitMQ",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});
// MassTransit tracking — propagates test identity through message headers
services.AddMassTransitTracking(new MassTransitTrackingOptions
{
PropagateTestIdentity = true, // Enabled by default
CallerName = "Order Service",
ServiceName = "RabbitMQ",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});
// Database tracking (e.g. CosmosDB)
services.AddCosmosTracking(new CosmosTrackingOptions
{
ServiceName = "CosmosDB",
CallerName = "Order Service",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});
});services.AddKafkaTracking(new KafkaTrackingOptions
{
PropagateTestIdentity = true,
CallerName = "My Service",
ServiceName = "Kafka",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});services.AddServiceBusTracking(new ServiceBusTrackingOptions
{
PropagateTestIdentity = true,
CallerName = "My Service",
ServiceName = "Service Bus",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});All messaging extensions support
PropagateTestIdentity. See the specific extension page for full configuration: Integration Kafka Extension, Integration MassTransit Extension, Integration ServiceBus Extension, Integration EventHubs Extension, Integration PubSub Extension, Integration SQS Extension, Integration SNS Extension, Integration EventBridge Extension, Integration StorageQueues Extension.
In an event-driven test, the test body typically:
- Publishes an event (triggering the SUT)
- Waits for the SUT to complete processing (polling, waiting for a side-effect, or using a completion signal)
- Asserts on the outcome
[Fact]
public async Task When_order_placed_event_received_Then_payment_is_processed()
{
// Arrange — publish the event that triggers the SUT
var orderEvent = new OrderPlacedEvent { OrderId = Guid.NewGuid(), Amount = 99.99m };
await _bus.Publish(orderEvent);
// Act — wait for the SUT to process the event and call dependencies
var payment = await WaitForPaymentProcessed(orderEvent.OrderId);
// Assert
payment.Status.Should().Be(PaymentStatus.Completed);
}The generated sequence diagram will show:
Test ──▶ RabbitMQ : Publish (OrderPlacedEvent)
RabbitMQ ──▶ Order Service : Consume (OrderPlacedEvent)
Order Service ──▶ Payment Service : POST /payments
Order Service ◀── Payment Service : 200 OK
Order Service ──▶ CosmosDB : Upsert (PaymentRecord)
Event-driven tracking is inherently parallel-safe because each message carries its own test identity:
- Test A publishes Event A → SUT consumer thread 1 gets
TestIdentityScope = Test A - Test B publishes Event B → SUT consumer thread 2 gets
TestIdentityScope = Test B
Each consumer invocation sets its own AsyncLocal value independently. There is no shared state between parallel test executions — each message processing context is isolated.
Important: This parallel safety only applies when the SUT processes messages using standard
async/awaitpatterns. If the consumer dispatches work to pre-existing threads that don't participate in the consumer's async context, see Background Thread Correlation#solution-3-globalfallback-v22816 and the #Pre-Existing Thread Correlation section below.
If the SUT's consumer dispatches work to threads that were started before the message arrived (e.g., a long-lived Task.Run loop, a Change Feed Processor polling thread, or a Hangfire worker), those threads cannot inherit the AsyncLocal value set by SetFromMessage().
| Approach | Parallel-Safe? | Use When |
|---|---|---|
| Background Thread Correlation#solution-1b-automatic-message-header-propagation-v2340 | ✅ Yes | The pre-existing thread processes messages that carry Kronikol headers |
| #Work-Item Correlation (v2.36.0+) | ✅ Yes | The thread processes data items that can carry a correlation ID |
| Background Thread Correlation#solution-2-instance-scoped-test-tracker | ✅ Yes (per-collection) | Tests within a collection run sequentially |
| Background Thread Correlation#solution-3-globalfallback-v22816 | ❌ No | Last resort — serial execution only |
For scenarios where a pre-existing background thread processes data items (e.g., Change Feed documents, queue messages from a custom poller), the TestCorrelationStore system provides parallel-safe correlation without any production code changes:
// The CosmosDB tracking extension auto-populates the correlation store
// on every Create/Upsert/Replace when AutoCorrelateWrites = true (default).
// Just wrap your Change Feed delegate:
var processor = container.GetChangeFeedProcessorBuilder<Order>("processor",
ChangeFeedCorrelation.Wrap<Order>(
async (changes, ct) =>
{
foreach (var order in changes)
await ProcessOrder(order);
},
serviceName: "CosmosDB",
idSelector: order => order.Id))
.Build();For decoupled consumers (channel/queue handoff):
// Processing side — wrap with key selector
var wrapped = ProcessingCorrelation.Wrap<OrderEvent>(
async (order, ct) => await ProcessOrder(order),
order => CorrelationKeys.Kafka("Kafka", order.OrderId));See Parallel-Safe Background Correlation for the full implementation guide, migration instructions, and all supported patterns.
Use the Diagnostics and Debugging report to verify tracking is working:
- Enable diagnostic mode:
DiagnosticMode = truein report options - Run your event-driven test
- Check the diagnostic report for:
- Zero "Unknown" entries for operations triggered by the event
- Correct test attribution — all operations appear under the correct test
- Complete sequence — publish → consume → dependencies all visible
| Symptom | Cause | Fix |
|---|---|---|
| No diagram generated |
PropagateTestIdentity not enabled on producer side |
Enable PropagateTestIdentity = true on the messaging extension options |
| Consume appears but no dependency calls |
TestTrackingMessageHandler not registered on SUT's HttpClients |
Register via TrackDependenciesForDiagrams() or Pattern 5/8 from Tracking Dependencies
|
| All operations show as "Unknown" | Consumer extension not extracting headers | Verify the consumer-side messaging extension is registered with PropagateTestIdentity = true
|
| Diagram shows dependencies but under wrong test | Multiple messages processed on same thread without identity reset | Ensure consumer extension calls SetFromMessage() per message (this is automatic with Kronikol extensions) |
| Database operations not tracked | Database extension not registered | Add the appropriate database extension (e.g. Integration CosmosDB Extension, Integration SqlClient Extension) |
A full working example of event-driven architecture testing:
// ═══════════════════════════════════════════════════════════════
// Collection Fixture — shared infrastructure
// ═══════════════════════════════════════════════════════════════
[CollectionDefinition("OrderProcessing")]
public class OrderProcessingCollection : ICollectionFixture<OrderProcessingFixture>;
public class OrderProcessingFixture : IAsyncLifetime
{
public WebApplicationFactory<Program> Factory { get; private set; } = null!;
public IBus Bus => Factory.Services.GetRequiredService<IBus>();
public async ValueTask InitializeAsync()
{
Factory = new WebApplicationFactory<Program>()
.WithWebHostBuilder(builder =>
{
builder.ConfigureTestServices(services =>
{
// HTTP dependency tracking
services.TrackDependenciesForDiagrams(
new XUnitTestTrackingMessageHandlerOptions
{
CallerName = "Order Service",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
PortsToServiceNames = new Dictionary<int, string>
{
[5001] = "Payment Gateway",
[5002] = "Notification Service",
},
});
// MassTransit tracking with identity propagation
services.AddMassTransitTracking(new MassTransitTrackingOptions
{
PropagateTestIdentity = true,
CallerName = "Order Service",
ServiceName = "RabbitMQ",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});
// Database tracking
services.AddCosmosTracking(new CosmosTrackingOptions
{
ServiceName = "Orders DB",
CallerName = "Order Service",
CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
});
});
});
// Ensure the host is started (background consumers begin listening)
_ = Factory.Server;
}
public async ValueTask DisposeAsync() => await Factory.DisposeAsync();
}
// ═══════════════════════════════════════════════════════════════
// Test Class
// ═══════════════════════════════════════════════════════════════
[Collection("OrderProcessing")]
public class OrderPlacedConsumerTests(OrderProcessingFixture fixture)
: DiagrammedComponentTest
{
[Fact]
public async Task Processes_payment_and_sends_confirmation()
{
// Arrange
var orderEvent = new OrderPlacedEvent
{
OrderId = Guid.NewGuid(),
CustomerId = "cust-123",
Amount = 49.99m,
};
// Act — publish triggers the SUT's background consumer
await fixture.Bus.Publish(orderEvent);
// Assert — wait for observable side-effect
var payment = await Poll.Until(
() => GetPayment(orderEvent.OrderId),
p => p?.Status == PaymentStatus.Completed,
timeout: TimeSpan.FromSeconds(5));
payment.Should().NotBeNull();
}
private async Task<Payment?> GetPayment(Guid orderId) { /* ... */ }
}- Background Thread Correlation — detailed coverage of all background thread identity resolution strategies
-
Event & Message Tracking —
MessageTrackerAPI reference and event annotation styling - Service Bus Tracking Patterns — patterns for Service Bus specifically
- Multi-Host Test Architectures — dual-host (API + Function) testing patterns
- Tracking Dependencies — HTTP tracking fundamentals
- Tracking Custom Dependencies — custom tracker implementations
- Integration MassTransit Extension — MassTransit-specific configuration
- Integration Kafka Extension — Kafka-specific configuration
- Integration ServiceBus Extension — Azure Service Bus-specific configuration
- Phase-Aware Tracking — separating setup from action in diagrams
Getting Started
Common Tasks
Integration Guides
- Integration xUnit3
- Integration xUnit2
- Integration NUnit
- Integration MSTest
- Integration TUnit
- Integration BDDfy xUnit3
- Integration LightBDD xUnit2
- Integration LightBDD xUnit3
- Integration LightBDD TUnit
- Integration ReqNRoll xUnit2
- Integration ReqNRoll xUnit3
- Integration ReqNRoll TUnit
Extensions
- Integration AtlasDataApi Extension
- Integration BigQuery Extension
- Integration Bigtable Extension
- Integration BlobStorage Extension
- Integration ClickHouse Extension
- Integration CloudStorage Extension
- Integration CosmosDB Extension
- Integration Dapper Extension
- Integration DynamoDB Extension
- Integration EF Core Relational Extension
- Integration Elasticsearch Extension
- Integration EventBridge Extension
- Integration EventHubs Extension
- Integration Grpc Extension
- Integration Kafka Extension
- Integration MassTransit Extension
- Integration MongoDB Extension
- Integration MySqlConnector Extension
- Integration Npgsql Extension
- Integration Oracle Extension
- Integration PubSub Extension
- Integration Redis Extension
- Integration S3 Extension
- Integration ServiceBus Extension
- Integration SNS Extension
- Integration Spanner Extension
- Integration SqlClient Extension
- Integration Sqlite Extension
- Integration SQS Extension
- Integration StorageQueues Extension
- Integration OpenTelemetry Extension
- Integration DispatchProxy Extension
- Integration MediatR Extension
- Integration PlantUML IKVM
Configuration
- Tracking Dependencies
- Tracking Custom Dependencies
- HTTP Tracking Setup
- Report Configuration
- Diagram Customisation
- Phase-Aware Tracking
- Content Formatting
- PlantUML Server Configuration
Features
- Generated Reports
- Search Syntax
- Component Diagrams
- PlantUML Browser Rendering
- Inline SVG Rendering
- Internal Flow Tracking
- Tags and Attributes
- Excluding Requests
- Excluded Headers
- Multi-Host Test Architectures
- Event-Driven Architecture Testing
- Service Bus Tracking Patterns
- Background Thread Correlation
- Parallel-Safe Background Correlation
- Event & Message Tracking
- Assertion Tracking
- Step Tracking
- Tabular Attributes
- Large Response and Diagram Handling
- Diagnostics and Debugging
- CI Summary Integration
- CI Artifact Upload
- Merging Parallel Reports
Reference