Skip to content

Integration Kafka Extension

aryehcitron@gmail.com edited this page May 24, 2026 · 16 revisions

The Kronikol.Extensions.Kafka package adds Apache Kafka produce and consume tracking to your test diagrams. Every produce and consume operation appears as a classified event in your sequence diagrams — showing the complete message flow through Kafka topics.

Using a shared library or abstraction layer? If your code doesn't use the Confluent.Kafka SDK directly — e.g. it goes through MassTransit, a shared messaging library, or a custom abstraction — see the MassTransit Extension or Tracking Custom Dependencies for alternative approaches including MessageTracker and TrackingProxy<T>.


How It Works

The extension provides wrapper classes around Confluent.Kafka's IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces. The wrappers intercept produce and consume calls, classify the operation, and log it to RequestResponseLogger with MetaType.Event.

A central KafkaTracker (implementing ITrackingComponent) handles all logging. The two wrapper classes extract topic, partition, and offset details and delegate to the tracker.


Install

dotnet add package Kronikol.Extensions.Kafka

Verbosity Levels

Level Method shown URI shown Message content
Raw ProduceAsync orders-topic[2]@42 kafka:///orders-topic/2@42 Key + Value
Detailed Produce → orders-topic kafka:///orders-topic Key + Value
Summarised Produce kafka:/// None

The default is Detailed.

Diagram Label Examples

Operation Raw Detailed Summarised
Produce (async) ProduceAsync orders-topic[2]@42 Produce → orders-topic Produce
Produce (sync) Produce orders-topic[0]@10 Produce → orders-topic Produce
Consume Consume orders-topic[0]@42 Consume ← orders-topic Consume
Subscribe Subscribe orders-topic Subscribe orders-topic Subscribe
Unsubscribe Unsubscribe Unsubscribe Unsubscribe
Commit Commit Commit Commit
Flush Flush Flush Flush
InitTransactions InitTransactions InitTransactions Init Txn
BeginTransaction BeginTransaction BeginTransaction Begin Txn
CommitTransaction CommitTransaction CommitTransaction Commit Txn
AbortTransaction AbortTransaction AbortTransaction Abort Txn
SendOffsetsToTransaction SendOffsetsToTransaction SendOffsetsToTransaction Send Offsets

Classified Operations

Operation Trigger Default
ProduceAsync IProducer.ProduceAsync Tracked
Produce IProducer.Produce (with delivery handler) Tracked
Consume IConsumer.Consume (non-null, non-EOF result) Tracked
Subscribe IConsumer.Subscribe Opt-in (TrackSubscribe)
Unsubscribe IConsumer.Unsubscribe Opt-in (TrackUnsubscribe)
Commit IConsumer.Commit (all 3 overloads) Opt-in (TrackCommit)
Flush IProducer.Flush (both overloads) Opt-in (TrackFlush)
InitTransactions IProducer.InitTransactions Opt-in (TrackTransactions)
BeginTransaction IProducer.BeginTransaction Opt-in (TrackTransactions)
CommitTransaction IProducer.CommitTransaction (both overloads) Opt-in (TrackTransactions)
AbortTransaction IProducer.AbortTransaction (both overloads) Opt-in (TrackTransactions)
SendOffsetsToTransaction IProducer.SendOffsetsToTransaction Opt-in (TrackTransactions)

Setup

Option A: Wrap the Producer

var producerBuilder = new ProducerBuilder<string, string>(config);
var innerProducer = producerBuilder.Build();

var options = new KafkaTrackingOptions
{
    ServiceName = "Kafka",
    CallerName = "My API",
    Verbosity = KafkaTrackingVerbosity.Detailed,
    CurrentTestInfoFetcher = CurrentTestInfo.Fetcher
};

var tracker = new KafkaTracker(options);
var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);

Option B: Wrap the Consumer

var consumerBuilder = new ConsumerBuilder<string, string>(config);
var innerConsumer = consumerBuilder.Build();

var tracker = new KafkaTracker(options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);

Option C: Share a Tracker Across Producer and Consumer

var options = new KafkaTrackingOptions
{
    ServiceName = "Kafka",
    CallerName = "My API",
    CurrentTestInfoFetcher = () => /* ... */
};

var tracker = new KafkaTracker(options);

var producer = new TrackingKafkaProducer<string, string>(innerProducer, tracker, options);
var consumer = new TrackingKafkaConsumer<string, string>(innerConsumer, tracker, options);

A single KafkaTracker instance can be shared across multiple producers and consumers. It handles concurrent access safely.

Option D: DI Registration

v2.23.11+ Recommended: Use the built-in DI extension methods shown below. They automatically resolve IHttpContextAccessor from DI and handle the dual-resolution test identity pattern.

// In your test's ConfigureTestServices:

// Decorate existing IProducer<TKey,TValue> registrations with tracking:
services.AddKafkaProducerTestTracking<string, string>(options =>
{
    options.ServiceName = "Kafka";
    options.CallerName = "My API";
    options.Verbosity = KafkaTrackingVerbosity.Detailed;
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

// Decorate existing IConsumer<TKey,TValue> registrations with tracking:
services.AddKafkaConsumerTestTracking<string, string>(options =>
{
    options.ServiceName = "Kafka";
    options.CallerName = "My API";
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

These methods use DecorateAll<T> internally — they find all existing registrations of IProducer<TKey,TValue> (or IConsumer<TKey,TValue>), replace each with a tracking wrapper, and preserve the original service lifetime. No-op if no matching registrations exist.

Tip: If your service code runs inside the SUT's request pipeline (via WebApplicationFactory), the tracker automatically resolves test identity from HTTP request headers first, then falls back to CurrentTestInfoFetcher. You can simplify this with TestInfoResolver.CreateHttpFallbackFetcher():

services.AddKafkaProducerTestTracking<string, string>(options =>
{
    options.ServiceName = "Kafka";
    options.CallerName = "My API";
    // No need for manual httpContext dance — the tracker handles this automatically
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

Manual DI Registration (Advanced)

If you need more control (e.g. shared trackers, custom wiring), you can register manually:

// In your test's ConfigureTestServices:
services.AddSingleton(sp =>
{
    var options = new KafkaTrackingOptions
    {
        ServiceName = "Kafka",
        CallerName = "My API",
        CurrentTestInfoFetcher = () => /* ... */
    };
    return new KafkaTracker(options, sp.GetService<IHttpContextAccessor>());
});

services.AddSingleton<IProducer<string, string>>(sp =>
{
    var tracker = sp.GetRequiredService<KafkaTracker>();
    var inner = new ProducerBuilder<string, string>(config).Build();
    return new TrackingKafkaProducer<string, string>(inner, tracker, new KafkaTrackingOptions());
});

Configuration Reference

KafkaTrackingOptions

Property Type Default Description
ServiceName string "Kafka" The participant name shown in the diagram for the Kafka broker
CallerName string "Caller" The participant name shown for the service producing/consuming messages
Verbosity KafkaTrackingVerbosity Detailed Controls how much detail appears in the diagram (Raw, Detailed, Summarised)
CurrentTestInfoFetcher Func<(string Name, string Id)>? null Returns the current test's name and ID. Required — if null, messages are not logged
CurrentStepTypeFetcher Func<string?>? null Optional — returns the current BDD step type (Given/When/Then)
TrackProduce bool true Whether to track produce operations
TrackConsume bool true Whether to track consume operations
TrackSubscribe bool false Whether to track subscribe operations
TrackUnsubscribe bool false Whether to track unsubscribe operations
TrackCommit bool false Whether to track commit operations
TrackFlush bool false Whether to track flush operations
TrackTransactions bool false Whether to track transactional producer operations (InitTransactions, BeginTransaction, CommitTransaction, AbortTransaction, SendOffsetsToTransaction)
LogMessageKey bool true Whether to include the message key in diagram content
LogMessageValue bool true Whether to include the message value in diagram content
SetupVerbosity KafkaTrackingVerbosity? null Verbosity override for the Setup phase. See Phase-Aware Tracking
ActionVerbosity KafkaTrackingVerbosity? null Verbosity override for the Action phase. See Phase-Aware Tracking
TrackDuringSetup bool true When false, tracking is suppressed during Setup. See Phase-Aware Tracking
TrackDuringAction bool true When false, tracking is suppressed during Action. See Phase-Aware Tracking
PropagateTestIdentity bool true When true, producers inject kronikol-test-name / kronikol-test-id headers and consumers extract them to establish TestIdentityScope. See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+)
AutoCorrelateOnConsume bool true When true, consumed messages auto-populate TestCorrelationStore for parallel-safe background thread correlation
ConsumeKeyExtractor Func<string, string, string>? null Optional custom key extractor for consume correlation

v2.23.0+ Dual-Resolution: KafkaTracker accepts an optional IHttpContextAccessor? httpContextAccessor constructor parameter. When provided, the tracker resolves test identity from HTTP request headers first (propagated by TestTrackingMessageHandler through the SUT pipeline), then falls back to CurrentTestInfoFetcher. This is essential when Kafka produce/consume operations are triggered inside the SUT's request pipeline via WebApplicationFactory, where the test framework's AsyncLocal<T> context is not available. See HTTP Tracking Setup#Dual-Resolution Test Identity (v2.23.0+) for full details and examples.

Selective Tracking

You can enable/disable individual operation types:

new KafkaTrackingOptions
{
    TrackProduce = true,
    TrackConsume = true,
    TrackSubscribe = true,       // Also show subscribe events
    TrackUnsubscribe = true,     // Also show unsubscribe events
    TrackCommit = true,          // Show offset commits
    TrackFlush = false,          // Don't track producer flushes
    TrackTransactions = true,    // Show exactly-once transaction lifecycle
    LogMessageKey = false        // Only show values, not keys
}

Message Direction

The extension correctly models message direction in diagrams:

  • Produce — Shown as outgoing from CallerName to ServiceName
  • Consume — Shown as incoming: caller and service names are swapped so the arrow points from the Kafka broker to the consuming service

This means a produce followed by a consume will show the correct bidirectional flow in the sequence diagram.


Consumer Content

Both the producer and consumer wrappers build diagram content consistently:

  • Key — Included when LogMessageKey = true (default). Shown as Key: <value>
  • Value — Included when LogMessageValue = true (default). Shown as Value: <value>
  • Summarised mode — Content is suppressed entirely regardless of key/value settings

Example content at Detailed verbosity with both enabled: Key: order-123, Value: {"amount":99}


Consumer Patterns

The extension supports all common Kafka consumer patterns:

Simple Poll Loop

The most common pattern — call Consume() in a loop:

while (!cancellationToken.IsCancellationRequested)
{
    var result = consumer.Consume(cancellationToken);
    // process result...
}

All three Consume() overloads (int, TimeSpan, CancellationToken) are tracked. Null results and partition EOF markers are automatically skipped.

Consumer Group Lifecycle

Enable TrackSubscribe, TrackUnsubscribe, and TrackCommit to see the full consumer group lifecycle in your diagrams:

new KafkaTrackingOptions
{
    TrackSubscribe = true,
    TrackUnsubscribe = true,
    TrackCommit = true,
    // ...
}

This will show:

  1. Subscribe orders-topic — when the consumer joins the group
  2. Consume ← orders-topic — for each message consumed
  3. Commit — when offsets are committed
  4. Unsubscribe — when the consumer leaves the group

Manual Partition Assignment

If your code uses Assign() / Seek() instead of Subscribe(), the consume tracking still works. Assign(), Seek(), Pause(), and Resume() are delegated directly to the inner consumer without tracking (they are partition management, not message operations).

Transactional Producer (Exactly-Once Semantics)

Enable TrackTransactions to see the full transactional lifecycle in your diagrams:

new KafkaTrackingOptions
{
    TrackTransactions = true,
    // ...
}

This tracks all five transactional producer methods:

  1. InitTransactions — initialises the transactional producer
  2. BeginTransaction — starts a new transaction
  3. Produce → orders-topic — messages within the transaction (always tracked via TrackProduce)
  4. SendOffsetsToTransaction — commits consumer offsets as part of the transaction
  5. CommitTransaction or AbortTransaction — completes the transaction

In Summarised mode, these appear as shortened labels: Init Txn, Begin Txn, Commit Txn, Abort Txn, Send Offsets.


Internally-Built Consumers and Producers

Many Kafka consumers are built inside BackgroundService.ExecuteAsync() using new ConsumerBuilder<TKey,TValue>(...).Build() rather than resolved from DI. The standard AddKafkaConsumerTestTracking cannot decorate these because there is no IConsumer registration in the service collection.

Two additional approaches are provided for this scenario:

Option E: Static Interceptor (BuildTracked() / Tracked())

v2.27.9+ Built-in static interceptor with minimal production code change.

The KafkaTrackingInterceptor provides a global interception mechanism. In your test setup, enable tracking for the relevant type combination. In production code, replace .Build() with .BuildTracked() (one-token change per consumer creation site).

Production code change (one token):

// Before:
using var consumer = new ConsumerBuilder<string, string>(consumerConfig)
    .SetKeyDeserializer(Deserializers.Utf8)
    .SetValueDeserializer(Deserializers.Utf8)
    .Build();

// After:
using var consumer = new ConsumerBuilder<string, string>(consumerConfig)
    .SetKeyDeserializer(Deserializers.Utf8)
    .SetValueDeserializer(Deserializers.Utf8)
    .BuildTracked();   // ← only change; no-op when interceptor is not active

Test setup:

// In your test fixture or ConfigureTestServices:
KafkaTrackingInterceptor.EnableConsumerTracking<string, string>(options =>
{
    options.ServiceName = "Kafka Broker";
    options.CallerName = "My API";
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

The same pattern works for producers:

// Production code:
using var producer = new ProducerBuilder<string, string>(config).BuildTracked();

// Test setup:
KafkaTrackingInterceptor.EnableProducerTracking<string, string>(options => { /* ... */ });

Alternatively, if you cannot change the .Build() call, you can wrap an already-built consumer or producer using the .Tracked() extension method:

var consumer = existingConsumerBuilder.Build().Tracked();

Cleanup: Call KafkaTrackingInterceptor.Reset() in test teardown (e.g. Dispose()) to clear all global state between tests.

Note: BuildTracked() / Tracked() are no-ops when the interceptor is not active — in production there is zero overhead. The interceptor prevents double-wrapping: calling .Tracked() on an already-tracked consumer returns the same instance.

Option F: Consumer / Producer Factory

v2.27.9+ Factory interfaces for DI-based consumer/producer creation.

If you prefer a clean DI-based approach, the extension provides factory interfaces that standardise how consumers and producers are created:

  • IKafkaConsumerFactory<TKey, TValue> — creates IConsumer<TKey, TValue> from a ConsumerConfig configuration action
  • IKafkaProducerFactory<TKey, TValue> — creates IProducer<TKey, TValue> from a ProducerConfig configuration action

Production code change (inject factory):

public class ReportingKafkaConsumerService(
    IKafkaConsumerFactory<string, string> consumerFactory, // ← inject factory
    IOptions<KafkaConfig> kafkaOptions) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // ... build config ...

        using var consumer = consumerFactory.Create(config =>
        {
            config.BootstrapServers = kafkaConfig.BootstrapServers;
            config.GroupId = "my-group";
            // ... other config ...
        });

        consumer.Subscribe(topic);
        // ... consume loop unchanged ...
    }
}

Test setup:

// In your test's ConfigureTestServices:
services.AddKafkaConsumerFactoryTestTracking<string, string>(options =>
{
    options.ServiceName = "Kafka Broker";
    options.CallerName = "My API";
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

This method:

  1. Registers a default KafkaConsumerFactory<TKey,TValue> if none exists
  2. Decorates it with TrackingKafkaConsumerFactory<TKey,TValue> (via DecorateAll)
  3. Every consumer created through the factory is automatically a TrackingKafkaConsumer

The equivalent producer factory method is AddKafkaProducerFactoryTestTracking<TKey, TValue>().

Tip: If your code already has its own factory (e.g. IProducerFactory), you can keep using it and DecorateAll to wrap it with tracking — the built-in factory is just a convenience for codebases that don't have one yet.

Choosing an Approach

Approach Production Code Change DI Required Best For
Option D (DI Decoration) None IConsumer/IProducer must be in DI Standard DI-registered consumers
Option E (Static Interceptor) .Build().BuildTracked() Not required BackgroundService consumers, test-side consumers
Option F (Factory) Inject IKafkaConsumerFactory Factory in DI Clean architecture, multiple consumer creation sites
Option G (Build Interception) None Not required Zero-change tracking of internally-built consumers/producers

Option G: Automatic Build() Interception (Zero Production Changes)

v2.27.16+ Requires the addon package Kronikol.Extensions.Kafka.BuildInterception.

This approach intercepts ConsumerBuilder<TKey,TValue>.Build() and ProducerBuilder<TKey,TValue>.Build() at runtime using Harmony. When enabled, any call to .Build() — anywhere in the process — automatically returns a TrackingKafkaConsumer / TrackingKafkaProducer wrapping the real instance. No production code changes, no DI changes, no .BuildTracked() needed.

This is ideal when your SUT builds consumers inside BackgroundService.ExecuteAsync() and you cannot modify the production code:

// Production code — completely untouched
public class ReportingKafkaConsumerService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var consumer = new ConsumerBuilder<string, string>(config)
            .SetKeyDeserializer(Deserializers.Utf8)
            .SetValueDeserializer(Deserializers.Utf8)
            .Build();  // ← Automatically tracked when interceptor is active

        consumer.Subscribe(topic);
        // ... consume loop ...
    }
}

Install:

dotnet add package Kronikol.Extensions.Kafka.BuildInterception

This package is test-only — add it to your test project, never to the production API project. The Harmony dependency is isolated in this addon and does not affect the core Kronikol.Extensions.Kafka package.

Test setup:

using Kronikol.Extensions.Kafka.BuildInterception;

// Enable both consumer and producer tracking + patch both Build() methods:
KafkaBuildInterceptor.EnableTracking<string, string>(options =>
{
    options.ServiceName = "Kafka Broker";
    options.CallerName = "My API";
    options.CurrentTestInfoFetcher = CurrentTestInfo.Fetcher;
});

Or enable them individually:

// Consumer only:
KafkaBuildInterceptor.EnableConsumerTracking<string, string>(options => { /* ... */ });

// Producer only:
KafkaBuildInterceptor.EnableProducerTracking<string, string>(options => { /* ... */ });

Disable / cleanup:

// Disable a specific type pair (patch remains but becomes a no-op):
KafkaBuildInterceptor.DisableConsumerTracking<string, string>();
KafkaBuildInterceptor.DisableProducerTracking<string, string>();

// Full cleanup — clears all tracking state and removes all Harmony patches:
KafkaBuildInterceptor.Reset();

Important: Always call KafkaBuildInterceptor.Reset() in test teardown (e.g. Dispose()) to clean up Harmony patches between tests.

How it works internally:

  1. EnableConsumerTracking<TKey,TValue>() registers tracking state in the base KafkaTrackingInterceptor (same as Option E)
  2. It then applies a Harmony postfix patch on ConsumerBuilder<TKey,TValue>.Build() specifically for that generic type combination
  3. The postfix calls KafkaTrackingInterceptor.WrapConsumer() on the build result — the same wrapping logic used by .BuildTracked()
  4. WrapConsumer() checks for double-wrapping, so using KafkaBuildInterceptor together with .BuildTracked() is harmless

Compatibility:

  • .BuildTracked() and .Tracked() continue to work as before — they are redundant when the Harmony patch is active but cause no harm
  • Different <TKey, TValue> combinations are independent — patching <string, string> does not affect <Guid, string>
  • DisableConsumerTracking/DisableProducerTracking are independent — disabling consumer tracking does not affect producer tracking

Wrapper Transparency

Both TrackingKafkaProducer<TKey,TValue> and TrackingKafkaConsumer<TKey,TValue> fully implement the IProducer<TKey,TValue> and IConsumer<TKey,TValue> interfaces respectively. All non-tracked methods are delegated directly to the inner instance, making them fully transparent drop-in replacements.

The consumer automatically skips logging for:

  • null consume results
  • Partition EOF results (result.IsPartitionEOF)

Invocation Validation

KafkaTracker implements ITrackingComponent and auto-registers with TrackingComponentRegistry on construction. At report generation time, unused components are automatically detected and surfaced as console warnings and in the diagnostic report (when DiagnosticMode=true). This never throws or fails tests.

See Diagnostics and Debugging for full details on the TrackingComponentRegistry API.


Test Identity Propagation (v2.34.0+)

When your application consumes Kafka messages on background threads (e.g. inside a BackgroundService), the test framework's TestContext is unavailable — making it impossible to determine which test triggered the message. The Kafka extension solves this automatically via message header propagation.

How It Works

  1. Producer side: TrackingKafkaProducer injects two headers into every outgoing message:

    • kronikol-test-name — the human-readable test name
    • kronikol-test-id — the unique test identifier
  2. Consumer side: TrackingKafkaConsumer reads these headers from each consumed message and calls TestIdentityScope.SetFromMessage(), establishing the test identity on the consumer's thread via AsyncLocal.

  3. All subsequent tracking (HTTP calls, database operations, further message produces) within that consumer's processing scope will correctly resolve to the originating test.

Example

// Test produces a message (headers injected automatically):
await producer.ProduceAsync("orders-topic", new Message<string, string>
{
    Key = "order-123",
    Value = "{\"amount\": 99}"
});

// Application's BackgroundService consumes the message:
// → Headers extracted → TestIdentityScope established
// → Any HTTP/DB/messaging tracking inside the handler resolves to this test
var result = consumer.Consume(cancellationToken);
// TestIdentityScope.Current is now set from the message headers
await ProcessOrder(result.Message.Value);

Disabling Propagation

var options = new KafkaTrackingOptions
{
    PropagateTestIdentity = false,  // Headers not injected or extracted
    // ...
};

Header Format

Headers are stored as UTF-8 byte arrays in Confluent.Kafka's Headers collection:

Header Key Value
kronikol-test-name UTF-8 bytes of the test name
kronikol-test-id UTF-8 bytes of the test ID

These headers are lightweight (typically <200 bytes combined) and have negligible impact on message size.

See Background Thread Correlation#Solution 1b: Automatic Message Header Propagation (v2.34.0+) for the full architecture overview.

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally