Skip to content

Integration Kafka Extension

Aryeh Citron edited this page Apr 28, 2026 · 16 revisions

The TestTrackingDiagrams.Extensions.Kafka package adds Apache Kafka produce and consume tracking to your test diagrams. Every produce and consume operation appears as a classified event in your sequence diagrams — showing the complete message flow through Kafka topics.

Using a shared library or abstraction layer? If your code doesn't use the Confluent.Kafka SDK directly — e.g. it goes through MassTransit, a shared messaging library, or a custom abstraction — see the MassTransit Extension or Tracking Custom Dependencies for alternative approaches including MessageTracker and TrackingProxy<T>.


How It Works

The extension provides wrapper classes around Confluent.Kafka's IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces. The wrappers intercept produce and consume calls, classify the operation, and log it to RequestResponseLogger with MetaType.Event.

A central KafkaTracker (implementing ITrackingComponent) handles all logging. The two wrapper classes extract topic, partition, and offset details and delegate to the tracker.


Install

dotnet add package TestTrackingDiagrams.Extensions.Kafka

Verbosity Levels

Level Method shown URI shown Message content
Raw Produce → orders-topic partition[2]@42 kafka:///orders-topic/2@42 Key + Value
Detailed Produce → orders-topic kafka:///orders-topic Key + Value
Summarised Produce kafka:/// None

The default is Detailed.

Diagram Label Examples

Operation Raw Detailed Summarised
Produce (async) Produce → orders-topic partition[2]@42 Produce → orders-topic Produce
Produce (sync) Produce → orders-topic partition[0]@10 Produce → orders-topic Produce
Consume Consume ← orders-topic partition[0]@42 Consume ← orders-topic Consume
Subscribe Subscribe orders-topic Subscribe orders-topic Subscribe
Unsubscribe Unsubscribe Unsubscribe Unsubscribe
Commit Commit Commit Commit
Flush Flush Flush Flush

Classified Operations

Operation Trigger Default
ProduceAsync IProducer.ProduceAsync Tracked
Produce IProducer.Produce (with delivery handler) Tracked
Consume IConsumer.Consume (non-null, non-EOF result) Tracked
Subscribe IConsumer.Subscribe Opt-in (TrackSubscribe)
Unsubscribe IConsumer.Unsubscribe Opt-in (TrackUnsubscribe)
Commit IConsumer.Commit (all 3 overloads) Opt-in (TrackCommit)
Flush IProducer.Flush (both overloads) Opt-in (TrackFlush)

Setup

Option A: Wrap the Producer

var producerBuilder = new ProducerBuilder<string, string>(config);
var innerProducer = producerBuilder.Build();

var options = new KafkaTrackingOptions
{
    ServiceName = "Kafka",
    CallingServiceName = "My API",
    Verbosity = KafkaTrackingVerbosity.Detailed,
    CurrentTestInfoFetcher = CurrentTestInfo.Fetcher
};

var tracker = new KafkaTracker(options);
var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);

Option B: Wrap the Consumer

var consumerBuilder = new ConsumerBuilder<string, string>(config);
var innerConsumer = consumerBuilder.Build();

var tracker = new KafkaTracker(options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);

Option C: Share a Tracker Across Producer and Consumer

var options = new KafkaTrackingOptions
{
    ServiceName = "Kafka",
    CallingServiceName = "My API",
    CurrentTestInfoFetcher = () => /* ... */
};

var tracker = new KafkaTracker(options);

var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);

A single KafkaTracker instance can be shared across multiple producers and consumers. It handles concurrent access safely.

Option D: DI Registration

v2.23.11+ Recommended: Use the built-in DI extension methods shown below. They automatically resolve IHttpContextAccessor from DI and handle the dual-resolution test identity pattern.

// In your test's ConfigureTestServices:

// Decorate existing IProducer<TKey,TValue> registrations with tracking:
services.AddKafkaProducerTestTracking<string, string>(options =>
{
    options.ServiceName = "Kafka";
    options.CallingServiceName = "My API";
    options.Verbosity = KafkaTrackingVerbosity.Detailed;
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

// Decorate existing IConsumer<TKey,TValue> registrations with tracking:
services.AddKafkaConsumerTestTracking<string, string>(options =>
{
    options.ServiceName = "Kafka";
    options.CallingServiceName = "My API";
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

These methods use DecorateAll<T> internally — they find all existing registrations of IProducer<TKey,TValue> (or IConsumer<TKey,TValue>), replace each with a tracking wrapper, and preserve the original service lifetime. No-op if no matching registrations exist.

Tip: If your service code runs inside the SUT's request pipeline (via WebApplicationFactory), the tracker automatically resolves test identity from HTTP request headers first, then falls back to CurrentTestInfoFetcher. You can simplify this with TestInfoResolver.CreateHttpFallbackFetcher():

services.AddKafkaProducerTestTracking<string, string>(options =>
{
    options.ServiceName = "Kafka";
    options.CallingServiceName = "My API";
    // No need for manual httpContext dance — the tracker handles this automatically
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

Manual DI Registration (Advanced)

If you need more control (e.g. shared trackers, custom wiring), you can register manually:

// In your test's ConfigureTestServices:
services.AddSingleton(sp =>
{
    var options = new KafkaTrackingOptions
    {
        ServiceName = "Kafka",
        CallingServiceName = "My API",
        CurrentTestInfoFetcher = () => /* ... */
    };
    return new KafkaTracker(options, sp.GetService<IHttpContextAccessor>());
});

services.AddSingleton<IProducer<string, string>>(sp =>
{
    var tracker = sp.GetRequiredService<KafkaTracker>();
    var inner = new ProducerBuilder<string, string>(config).Build();
    return new TrackingKafkaProducer<string, string>(inner, tracker, new KafkaTrackingOptions());
});

Configuration Reference

KafkaTrackingOptions

Property Type Default Description
ServiceName string "Kafka" The participant name shown in the diagram for the Kafka broker
CallingServiceName string "Caller" The participant name shown for the service producing/consuming messages
Verbosity KafkaTrackingVerbosity Detailed Controls how much detail appears in the diagram (Raw, Detailed, Summarised)
CurrentTestInfoFetcher Func<(string Name, string Id)>? null Returns the current test's name and ID. Required — if null, messages are not logged
CurrentStepTypeFetcher Func<string?>? null Optional — returns the current BDD step type (Given/When/Then)

v2.23.0+ Dual-Resolution: KafkaTracker accepts an optional IHttpContextAccessor? httpContextAccessor constructor parameter. When provided, the tracker resolves test identity from HTTP request headers first (propagated by TestTrackingMessageHandler through the SUT pipeline), then falls back to CurrentTestInfoFetcher. This is essential when Kafka produce/consume operations are triggered inside the SUT's request pipeline via WebApplicationFactory, where the test framework's AsyncLocal<T> context is not available. See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+) for full details and examples. | TrackProduce | bool | true | Whether to track produce operations | | TrackConsume | bool | true | Whether to track consume operations | | TrackSubscribe | bool | false | Whether to track subscribe operations | | TrackUnsubscribe | bool | false | Whether to track unsubscribe operations | | TrackCommit | bool | false | Whether to track commit operations | | TrackFlush | bool | false | Whether to track flush operations | | LogMessageKey | bool | true | Whether to include the message key in diagram content | | LogMessageValue | bool | true | Whether to include the message value in diagram content | | SetupVerbosity | KafkaTrackingVerbosity? | null | Verbosity override for the Setup phase. See Phase-Aware Tracking | | ActionVerbosity | KafkaTrackingVerbosity? | null | Verbosity override for the Action phase. See Phase-Aware Tracking | | TrackDuringSetup | bool | true | When false, tracking is suppressed during Setup. See Phase-Aware Tracking | | TrackDuringAction | bool | true | When false, tracking is suppressed during Action. See Phase-Aware Tracking |

Selective Tracking

You can enable/disable individual operation types:

new KafkaTrackingOptions
{
    TrackProduce = true,
    TrackConsume = true,
    TrackSubscribe = true,     // Also show subscribe events
    TrackUnsubscribe = true,   // Also show unsubscribe events
    TrackCommit = true,        // Show offset commits
    TrackFlush = false,        // Don't track producer flushes
    LogMessageKey = false      // Only show values, not keys
}

Message Direction

The extension correctly models message direction in diagrams:

  • Produce — Shown as outgoing from CallingServiceName to ServiceName
  • Consume — Shown as incoming: caller and service names are swapped so the arrow points from the Kafka broker to the consuming service

This means a produce followed by a consume will show the correct bidirectional flow in the sequence diagram.


Consumer Content

Both the producer and consumer wrappers build diagram content consistently:

  • Key — Included when LogMessageKey = true (default). Shown as Key: <value>
  • Value — Included when LogMessageValue = true (default). Shown as Value: <value>
  • Summarised mode — Content is suppressed entirely regardless of key/value settings

Example content at Detailed verbosity with both enabled: Key: order-123, Value: {"amount":99}


Consumer Patterns

The extension supports all common Kafka consumer patterns:

Simple Poll Loop

The most common pattern — call Consume() in a loop:

while (!cancellationToken.IsCancellationRequested)
{
    var result = consumer.Consume(cancellationToken);
    // process result...
}

All three Consume() overloads (int, TimeSpan, CancellationToken) are tracked. Null results and partition EOF markers are automatically skipped.

Consumer Group Lifecycle

Enable TrackSubscribe, TrackUnsubscribe, and TrackCommit to see the full consumer group lifecycle in your diagrams:

new KafkaTrackingOptions
{
    TrackSubscribe = true,
    TrackUnsubscribe = true,
    TrackCommit = true,
    // ...
}

This will show:

  1. Subscribe orders-topic — when the consumer joins the group
  2. Consume ← orders-topic — for each message consumed
  3. Commit — when offsets are committed
  4. Unsubscribe — when the consumer leaves the group

Manual Partition Assignment

If your code uses Assign() / Seek() instead of Subscribe(), the consume tracking still works. Assign(), Seek(), Pause(), and Resume() are delegated directly to the inner consumer without tracking (they are partition management, not message operations).


Wrapper Transparency

Both TrackingKafkaProducer<TKey,TValue> and TrackingKafkaConsumer<TKey,TValue> fully implement the IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces respectively. All non-tracked methods are delegated directly to the inner instance, making them fully transparent drop-in replacements.

The consumer automatically skips logging for:

  • null consume results
  • Partition EOF results (result.IsPartitionEOF)

Invocation Validation

KafkaTracker implements ITrackingComponent and auto-registers with TrackingComponentRegistry on construction. At report generation time, unused components are automatically detected and surfaced as console warnings and in the diagnostic report (when DiagnosticMode=true). This never throws or fails tests.

See Diagnostics and Debugging for full details on the TrackingComponentRegistry API.

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally