Skip to content

Integration Kafka Extension

Aryeh Citron edited this page Apr 24, 2026 · 16 revisions

The TestTrackingDiagrams.Extensions.Kafka package adds Apache Kafka produce and consume tracking to your test diagrams. Every produce and consume operation appears as a classified event in your sequence diagrams — showing the complete message flow through Kafka topics.

Using a shared library or abstraction layer? If your code doesn't use the Confluent.Kafka SDK directly — e.g. it goes through MassTransit, a shared messaging library, or a custom abstraction — see the MassTransit Extension or Tracking Custom Dependencies for alternative approaches including MessageTracker and TrackingProxy<T>.


How It Works

The extension provides wrapper classes around Confluent.Kafka's IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces. The wrappers intercept produce and consume calls, classify the operation, and log it to RequestResponseLogger with MetaType.Event.

A central KafkaTracker (implementing ITrackingComponent) handles all logging. The two wrapper classes extract topic, partition, and offset details and delegate to the tracker.


Install

dotnet add package TestTrackingDiagrams.Extensions.Kafka

Verbosity Levels

Level Method shown URI shown Message content
Raw Produce → orders-topic partition[2]@42 kafka:///orders-topic/2@42 Key + Value
Detailed Produce → orders-topic kafka:///orders-topic Key + Value
Summarised Produce kafka:/// None

The default is Detailed.

Diagram Label Examples

Operation Raw Detailed Summarised
Produce (async) Produce → orders-topic partition[2]@42 Produce → orders-topic Produce
Produce (sync) Produce → orders-topic partition[0]@10 Produce → orders-topic Produce
Consume Consume ← orders-topic partition[0]@42 Consume ← orders-topic Consume
Subscribe Subscribe orders-topic Subscribe orders-topic Subscribe
Flush Flush Flush Flush

Classified Operations

Operation Trigger
ProduceAsync IProducer.ProduceAsync
Produce IProducer.Produce (with delivery handler)
Consume IConsumer.Consume (non-null, non-EOF result)
Subscribe IConsumer.Subscribe
Unsubscribe IConsumer.Unsubscribe
Commit IConsumer.Commit
Flush IProducer.Flush

Setup

Option A: Wrap the Producer

var producerBuilder = new ProducerBuilder<string, string>(config);
var innerProducer = producerBuilder.Build();

var options = new KafkaTrackingOptions
{
    ServiceName = "Kafka",
    CallingServiceName = "My API",
    Verbosity = KafkaTrackingVerbosity.Detailed,
    CurrentTestInfoFetcher = () =>
    {
        var test = TestContext.Current.Test;
        return test is not null
            ? (test.TestDisplayName, test.UniqueID)
            : ("Unknown", "unknown");
    }
};

var tracker = new KafkaTracker(options);
var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);

Option B: Wrap the Consumer

var consumerBuilder = new ConsumerBuilder<string, string>(config);
var innerConsumer = consumerBuilder.Build();

var tracker = new KafkaTracker(options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);

Option C: Share a Tracker Across Producer and Consumer

var options = new KafkaTrackingOptions
{
    ServiceName = "Kafka",
    CallingServiceName = "My API",
    CurrentTestInfoFetcher = () => /* ... */
};

var tracker = new KafkaTracker(options);

var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);

A single KafkaTracker instance can be shared across multiple producers and consumers. It handles concurrent access safely.

Option D: DI Registration

// In your test's ConfigureTestServices:
services.AddSingleton(sp =>
{
    var options = new KafkaTrackingOptions
    {
        ServiceName = "Kafka",
        CallingServiceName = "My API",
        CurrentTestInfoFetcher = () => /* ... */
    };
    return new KafkaTracker(options);
});

services.AddSingleton<IProducer<string, string>>(sp =>
{
    var tracker = sp.GetRequiredService<KafkaTracker>();
    var inner = new ProducerBuilder<string, string>(config).Build();
    return new TrackingKafkaProducer<string, string>(inner, tracker, tracker.Options);
});

Configuration Reference

KafkaTrackingOptions

Property Type Default Description
ServiceName string "Kafka" The participant name shown in the diagram for the Kafka broker
CallingServiceName string "Caller" The participant name shown for the service producing/consuming messages
Verbosity KafkaTrackingVerbosity Detailed Controls how much detail appears in the diagram (Raw, Detailed, Summarised)
CurrentTestInfoFetcher Func<(string Name, string Id)>? null Returns the current test's name and ID. Required — if null, messages are not logged
CurrentStepTypeFetcher Func<string?>? null Optional — returns the current BDD step type (Given/When/Then)
TrackProduce bool true Whether to track produce operations
TrackConsume bool true Whether to track consume operations
TrackSubscribe bool false Whether to track subscribe/unsubscribe operations
TrackCommit bool false Whether to track commit operations
LogMessageKey bool true Whether to include the message key in diagram content
LogMessageValue bool true Whether to include the message value in diagram content
SetupVerbosity KafkaTrackingVerbosity? null Verbosity override for the Setup phase. See Phase-Aware Tracking
ActionVerbosity KafkaTrackingVerbosity? null Verbosity override for the Action phase. See Phase-Aware Tracking
TrackDuringSetup bool true When false, tracking is suppressed during Setup. See Phase-Aware Tracking
TrackDuringAction bool true When false, tracking is suppressed during Action. See Phase-Aware Tracking

Selective Tracking

You can enable/disable individual operation types:

new KafkaTrackingOptions
{
    TrackProduce = true,
    TrackConsume = true,
    TrackSubscribe = true,   // Also show subscribe events
    TrackCommit = false,     // Don't track commits
    LogMessageKey = false    // Only show values, not keys
}

Message Direction

The extension correctly models message direction in diagrams:

  • Produce — Shown as outgoing from CallingServiceName to ServiceName
  • Consume — Shown as incoming: caller and service names are swapped so the arrow points from the Kafka broker to the consuming service

This means a produce followed by a consume will show the correct bidirectional flow in the sequence diagram.


Wrapper Transparency

Both TrackingKafkaProducer<TKey,TValue> and TrackingKafkaConsumer<TKey,TValue> fully implement the IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces respectively. All non-tracked methods are delegated directly to the inner instance, making them fully transparent drop-in replacements.

The consumer automatically skips logging for:

  • null consume results
  • Partition EOF results (result.IsPartitionEOF)

Invocation Validation

KafkaTracker implements ITrackingComponent and auto-registers with TrackingComponentRegistry on construction. At report generation time, unused components are automatically detected and surfaced as console warnings and in the diagnostic report (when DiagnosticMode=true). This never throws or fails tests.

See Diagnostics and Debugging for full details on the TrackingComponentRegistry API.

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally