-
Notifications
You must be signed in to change notification settings - Fork 1
Integration Kafka Extension
The TestTrackingDiagrams.Extensions.Kafka package adds Apache Kafka produce and consume tracking to your test diagrams. Every produce and consume operation appears as a classified event in your sequence diagrams — showing the complete message flow through Kafka topics.
Using a shared library or abstraction layer? If your code doesn't use the Confluent.Kafka SDK directly — e.g. it goes through MassTransit, a shared messaging library, or a custom abstraction — see the MassTransit Extension or Tracking Custom Dependencies for alternative approaches including
MessageTrackerandTrackingProxy<T>.
The extension provides wrapper classes around Confluent.Kafka's IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces. The wrappers intercept produce and consume calls, classify the operation, and log it to RequestResponseLogger with MetaType.Event.
A central KafkaTracker (implementing ITrackingComponent) handles all logging. The two wrapper classes extract topic, partition, and offset details and delegate to the tracker.
dotnet add package TestTrackingDiagrams.Extensions.Kafka| Level | Method shown | URI shown | Message content |
|---|---|---|---|
| Raw | Produce → orders-topic partition[2]@42 |
kafka:///orders-topic/2@42 |
Key + Value |
| Detailed | Produce → orders-topic |
kafka:///orders-topic |
Key + Value |
| Summarised | Produce |
kafka:/// |
None |
The default is Detailed.
| Operation | Raw | Detailed | Summarised |
|---|---|---|---|
| Produce (async) | Produce → orders-topic partition[2]@42 |
Produce → orders-topic |
Produce |
| Produce (sync) | Produce → orders-topic partition[0]@10 |
Produce → orders-topic |
Produce |
| Consume | Consume ← orders-topic partition[0]@42 |
Consume ← orders-topic |
Consume |
| Subscribe | Subscribe orders-topic |
Subscribe orders-topic |
Subscribe |
| Flush | Flush |
Flush |
Flush |
| Operation | Trigger |
|---|---|
ProduceAsync |
IProducer.ProduceAsync |
Produce |
IProducer.Produce (with delivery handler) |
Consume |
IConsumer.Consume (non-null, non-EOF result) |
Subscribe |
IConsumer.Subscribe |
Unsubscribe |
IConsumer.Unsubscribe |
Commit |
IConsumer.Commit |
Flush |
IProducer.Flush |
var producerBuilder = new ProducerBuilder<string, string>(config);
var innerProducer = producerBuilder.Build();
var options = new KafkaTrackingOptions
{
ServiceName = "Kafka",
CallingServiceName = "My API",
Verbosity = KafkaTrackingVerbosity.Detailed,
CurrentTestInfoFetcher = () =>
{
var test = TestContext.Current.Test;
return test is not null
? (test.TestDisplayName, test.UniqueID)
: ("Unknown", "unknown");
}
};
var tracker = new KafkaTracker(options);
var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);var consumerBuilder = new ConsumerBuilder<string, string>(config);
var innerConsumer = consumerBuilder.Build();
var tracker = new KafkaTracker(options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);var options = new KafkaTrackingOptions
{
ServiceName = "Kafka",
CallingServiceName = "My API",
CurrentTestInfoFetcher = () => /* ... */
};
var tracker = new KafkaTracker(options);
var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);A single KafkaTracker instance can be shared across multiple producers and consumers. It handles concurrent access safely.
// In your test's ConfigureTestServices:
services.AddSingleton(sp =>
{
var options = new KafkaTrackingOptions
{
ServiceName = "Kafka",
CallingServiceName = "My API",
CurrentTestInfoFetcher = () => /* ... */
};
return new KafkaTracker(options);
});
services.AddSingleton<IProducer<string, string>>(sp =>
{
var tracker = sp.GetRequiredService<KafkaTracker>();
var inner = new ProducerBuilder<string, string>(config).Build();
return new TrackingKafkaProducer<string, string>(inner, tracker, tracker.Options);
});| Property | Type | Default | Description |
|---|---|---|---|
ServiceName |
string |
"Kafka" |
The participant name shown in the diagram for the Kafka broker |
CallingServiceName |
string |
"Caller" |
The participant name shown for the service producing/consuming messages |
Verbosity |
KafkaTrackingVerbosity |
Detailed |
Controls how much detail appears in the diagram (Raw, Detailed, Summarised) |
CurrentTestInfoFetcher |
Func<(string Name, string Id)>? |
null |
Returns the current test's name and ID. Required — if null, messages are not logged |
CurrentStepTypeFetcher |
Func<string?>? |
null |
Optional — returns the current BDD step type (Given/When/Then) |
TrackProduce |
bool |
true |
Whether to track produce operations |
TrackConsume |
bool |
true |
Whether to track consume operations |
TrackSubscribe |
bool |
false |
Whether to track subscribe/unsubscribe operations |
TrackCommit |
bool |
false |
Whether to track commit operations |
LogMessageKey |
bool |
true |
Whether to include the message key in diagram content |
LogMessageValue |
bool |
true |
Whether to include the message value in diagram content |
You can enable/disable individual operation types:
new KafkaTrackingOptions
{
TrackProduce = true,
TrackConsume = true,
TrackSubscribe = true, // Also show subscribe events
TrackCommit = false, // Don't track commits
LogMessageKey = false // Only show values, not keys
}The extension correctly models message direction in diagrams:
-
Produce — Shown as outgoing from
CallingServiceNametoServiceName - Consume — Shown as incoming: caller and service names are swapped so the arrow points from the Kafka broker to the consuming service
This means a produce followed by a consume will show the correct bidirectional flow in the sequence diagram.
Both TrackingKafkaProducer<TKey,TValue> and TrackingKafkaConsumer<TKey,TValue> fully implement the IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces respectively. All non-tracked methods are delegated directly to the inner instance, making them fully transparent drop-in replacements.
The consumer automatically skips logging for:
-
nullconsume results - Partition EOF results (
result.IsPartitionEOF)
KafkaTracker implements ITrackingComponent and auto-registers with TrackingComponentRegistry on construction. At report generation time, unused components are automatically detected and surfaced as console warnings and in the diagnostic report (when DiagnosticMode=true). This never throws or fails tests.
See Diagnostics and Debugging for full details on the TrackingComponentRegistry API.
Getting Started
Common Tasks
Integration Guides
- Integration xUnit3
- Integration xUnit2
- Integration NUnit
- Integration MSTest
- Integration TUnit
- Integration BDDfy xUnit3
- Integration LightBDD xUnit2
- Integration LightBDD xUnit3
- Integration LightBDD TUnit
- Integration ReqNRoll xUnit2
- Integration ReqNRoll xUnit3
- Integration ReqNRoll TUnit
Extensions
- Integration AtlasDataApi Extension
- Integration BigQuery Extension
- Integration Bigtable Extension
- Integration BlobStorage Extension
- Integration ClickHouse Extension
- Integration CloudStorage Extension
- Integration CosmosDB Extension
- Integration Dapper Extension
- Integration DynamoDB Extension
- Integration EF Core Relational Extension
- Integration Elasticsearch Extension
- Integration EventBridge Extension
- Integration EventHubs Extension
- Integration Grpc Extension
- Integration Kafka Extension
- Integration MassTransit Extension
- Integration MongoDB Extension
- Integration MySqlConnector Extension
- Integration Npgsql Extension
- Integration Oracle Extension
- Integration PubSub Extension
- Integration Redis Extension
- Integration S3 Extension
- Integration ServiceBus Extension
- Integration SNS Extension
- Integration Spanner Extension
- Integration SqlClient Extension
- Integration Sqlite Extension
- Integration SQS Extension
- Integration StorageQueues Extension
- Integration OpenTelemetry Extension
- Integration DispatchProxy Extension
- Integration MediatR Extension
- Integration PlantUML IKVM
Configuration
- Tracking Dependencies
- Tracking Custom Dependencies
- HTTP Tracking Setup
- Report Configuration
- Diagram Customisation
- Phase-Aware Tracking
- Content Formatting
- PlantUML Server Configuration
Features
- Generated Reports
- Search Syntax
- Component Diagrams
- PlantUML Browser Rendering
- Inline SVG Rendering
- Internal Flow Tracking
- Tags and Attributes
- Excluding Requests
- Excluded Headers
- Multi-Host Test Architectures
- Event-Driven Architecture Testing
- Service Bus Tracking Patterns
- Background Thread Correlation
- Parallel-Safe Background Correlation
- Event & Message Tracking
- Assertion Tracking
- Step Tracking
- Tabular Attributes
- Large Response and Diagram Handling
- Diagnostics and Debugging
- CI Summary Integration
- CI Artifact Upload
- Merging Parallel Reports
Reference