Skip to content

Parallel Safe Background Correlation

Aryeh Citron edited this page May 15, 2026 · 1 revision

Parallel-Safe Background Correlation

New in v2.36.0 — This page covers the TestCorrelationStore system that enables parallel-safe test attribution for background processing threads that cannot inherit AsyncLocal values.


Problem Statement

When a test writes data to a database and a background processor (Change Feed Processor, Change Stream, Hangfire worker, etc.) picks up that data on a pre-existing thread, the AsyncLocal set by TestIdentityScope.Begin() does not propagate. Previously, the only solution was GlobalFallback — a single process-wide static that breaks under parallel test execution.

Test A writes doc X  ─────────────────────────▶  CosmosDB
Test B writes doc Y  ─────────────────────────▶  CosmosDB

                        Change Feed Processor (pre-existing thread)
                        ├─ Picks up doc X  →  Which test owns this?
                        └─ Picks up doc Y  →  Which test owns this?

With GlobalFallback, whichever test last called SetGlobalFallback() "wins" — causing cross-contamination.


When You Need This

Scenario Solution Parallel-Safe?
Standard async/await consumer (MassTransit, Kafka, SB) Already works — SetFromMessage() flows via AsyncLocal
Decoupled consumer (channel/queue handoff, headers lost) AutoCorrelateOnConsume + ProcessingCorrelation.Wrap<T>()
Change Feed / database polling AutoCorrelateWrites + ChangeFeedCorrelation.Wrap<T>()
MongoDB Change Streams AutoCorrelateWrites + ChangeStreamCorrelation.Wrap<T>()
Periodic background work (no data trigger) Still needs GlobalFallback (serial only)

How It Works

Architecture

┌──────────────────────┐  write (tracked)  ┌─────────────────────┐
│  Test A writes doc X │──────────────────▶│  CosmosDB / MongoDB │
│  (via WebAppFactory) │                   │                     │
└──────────────────────┘                   └─────────────────────┘
         │                                            │
         │ auto-populate                              │ Change Feed delivers X
         ▼                                            ▼
┌──────────────────────────────┐    ┌──────────────────────────────────────┐
│  TestCorrelationStore        │◀───│  ChangeFeedCorrelation.Wrap<T>()     │
│  { "cosmos:DB:X" → Test A }  │    │  1. Extract document ID from item    │
│                              │    │  2. Resolve key from store            │
└──────────────────────────────┘    │  3. TestIdentityScope.Begin(Test A)  │
                                    │  4. Call real handler                 │
                                    │  5. Dispose scope                    │
                                    └──────────────────────────────────────┘

Auto-Population (Write Side)

Database extensions (CosmosDB, MongoDB) automatically populate TestCorrelationStore on every tracked write operation when AutoCorrelateWrites = true (the default). No extra code needed.

Messaging extensions (Kafka, ServiceBus, EventHubs, PubSub) automatically populate the store on consume when AutoCorrelateOnConsume = true (the default).

Resolution (Processing Side)

Background processing decorators call CorrelatedProcessingScope.Begin(key) which:

  1. Looks up the key in TestCorrelationStore
  2. If found, calls TestIdentityScope.Begin(testName, testId)
  3. Returns an IDisposable that restores the previous identity

Setup Guide: Cosmos Change Feed

The CosmosDB tracking extension handles both sides automatically. You only need to wrap your Change Feed delegate:

// Production code (unchanged)
container.GetChangeFeedProcessorBuilder<OrderDocument>("processor",
    async (changes, ct) =>
    {
        foreach (var order in changes)
            await ProcessOrder(order);
    })
    .Build();

// Test DI — wrap the delegate with correlation
container.GetChangeFeedProcessorBuilder<OrderDocument>("processor",
    ChangeFeedCorrelation.Wrap<OrderDocument>(
        async (changes, ct) =>
        {
            foreach (var order in changes)
                await ProcessOrder(order);
        },
        serviceName: "CosmosDB",
        idSelector: doc => doc.Id))
    .Build();

Or in a test fixture, override the Change Feed setup:

builder.ConfigureTestServices(services =>
{
    services.AddCosmosTracking(new CosmosTrackingOptions
    {
        ServiceName = "Orders DB",
        CallerName = "Order Service",
        AutoCorrelateWrites = true,  // Default — auto-populates store
        CurrentTestInfoFetcher = CurrentTestInfo.Fetcher,
    });
});

The tracking handler auto-populates TestCorrelationStore on every Create/Upsert/Replace. The ChangeFeedCorrelation.Wrap<T>() helper resolves the document ID back to its originating test.

Custom Key Extraction

If your documents use composite keys (partition key + ID):

services.AddCosmosTracking(new CosmosTrackingOptions
{
    ServiceName = "Orders DB",
    ChangeFeedKeyExtractor = (serviceName, docId) =>
        CorrelationKeys.Cosmos(serviceName, docId),
});

Setup Guide: MongoDB Change Streams

Same pattern as CosmosDB:

// Wrap your Change Stream processing
var wrapped = ChangeStreamCorrelation.Wrap<OrderDocument>(
    async (item, ct) => await ProcessOrder(item),
    serviceName: "Orders",
    idSelector: doc => doc.Id);

The MongoDB tracking subscriber auto-populates the correlation store on Insert/Update/FindAndModify when AutoCorrelateWrites = true.


Setup Guide: Decoupled Kafka/ServiceBus Consumer

When your SUT consumes messages on one thread but passes them through a Channel<T> or queue to a separate processing thread:

Pattern A: Full message envelope flows through (headers available)

The processing thread has access to message headers. Use ProcessingCorrelation:

var wrapped = ProcessingCorrelation.Wrap<ConsumeResult<string, OrderEvent>>(
    async (result, ct) => await ProcessOrder(result.Message.Value),
    result => CorrelationKeys.Kafka("Order Events", result.Message.Key));

Pattern B: Only deserialized payload flows through (headers lost)

The processing thread only has the payload. The consumer auto-correlates, and the processing thread looks up by business key:

// Kafka options — auto-correlate on consume (enabled by default)
services.AddKafkaTracking(new KafkaTrackingOptions
{
    AutoCorrelateOnConsume = true,  // Stores messageKey → testIdentity
});

// Processing side — wrap with key selector
var wrapped = ProcessingCorrelation.Wrap<OrderEvent>(
    async (order, ct) => await ProcessOrder(order),
    order => CorrelationKeys.Kafka("Kafka", order.OrderId));

Setup Guide: Custom Background Processors

For any processing pattern not covered by built-in extensions (Hangfire, custom hosted services, channel readers):

// 1. Seed correlation in your test setup
TestCorrelationStore.Correlate(
    CorrelationKeys.Custom("hangfire", "email-sender", jobId),
    testName, testId);

// 2. Wrap the processor
var wrapped = ProcessingCorrelation.Wrap<EmailJob>(
    async (job, ct) => await SendEmail(job),
    job => CorrelationKeys.Custom("hangfire", "email-sender", job.Id));

Comparison Table

Approach Parallel-Safe Requires Prod Changes Best For
AsyncLocal (automatic via messaging extensions) Standard message consumers
TestCorrelationStore + decorators Change Feed, background processors
GlobalFallback Serial execution only (legacy)
Embedding correlation in documents ⚠️ Minimal Custom scenarios with unusual threading

API Reference

TestCorrelationStore

Member Description
Correlate(key, testName, testId) Stores a correlation entry (auto-called by extensions)
Resolve(key) Looks up test identity for a key; returns null if not found/expired
Remove(key) Removes a specific entry
Clear() Removes all entries (call in fixture teardown)
Seed(key, testName, testId) Seeds correlation for pre-existing data
DefaultTtl Time-to-live for entries (default: 30 minutes)
OnResolveMiss Optional diagnostic callback for debugging

CorrelatedProcessingScope

Member Description
Begin(correlationKey) Resolves key → sets TestIdentityScope; returns IDisposable?

CorrelationKeys

Method Format
Cosmos(service, docId) cosmos:{service}:{docId}
Cosmos(service, pk, docId) cosmos:{service}:{pk}:{docId}
Mongo(service, docId) mongo:{service}:{docId}
Kafka(service, msgKey) kafka:{service}:{msgKey}
ServiceBus(service, msgId) servicebus:{service}:{msgId}
EventHubs(service, eventId) eventhubs:{service}:{eventId}
PubSub(service, msgId) pubsub:{service}:{msgId}
Sqs(service, msgId) sqs:{service}:{msgId}
Sns(service, msgId) sns:{service}:{msgId}
StorageQueue(service, msgId) storagequeue:{service}:{msgId}
Custom(prefix, service, itemId) {prefix}:{service}:{itemId}

ProcessingCorrelation

Method Description
Wrap<T>(handler, keySelector) Wraps async processing delegate
WrapSync<T>(handler, keySelector) Wraps sync processing action
WrapBatch<T>(handler, keySelector) Wraps batch processing delegate

ChangeFeedCorrelation (CosmosDB)

Method Description
Wrap<T>(handler, serviceName, idSelector?) Wraps Change Feed delegate
WrapJson(handler, serviceName, idSelector?) Wraps JSON Change Feed delegate

ChangeStreamCorrelation (MongoDB)

Method Description
Wrap<T>(handler, serviceName, idSelector) Wraps Change Stream processing
WrapBatch<T>(handler, serviceName, idSelector) Wraps batch Change Stream processing

Troubleshooting

Symptom Cause Fix
Change Feed operations show as "Unknown" AutoCorrelateWrites disabled or delegate not wrapped Enable AutoCorrelateWrites = true and wrap your Change Feed delegate
Consumer tracked but downstream HTTP calls "Unknown" Decoupled pattern — processing thread lacks AsyncLocal Add ProcessingCorrelation.Wrap<T>() on processing side
Correlation returns null for items TTL expired (default 30 min) or item predates test process Increase DefaultTtl or use TestCorrelationStore.Seed()
Same document owned by wrong test Multiple tests writing same doc ID Use unique IDs per test
OnResolveMiss fires for known items Key format mismatch between write and resolve side Use CorrelationKeys.*() helpers consistently

Known Limitations

  1. Same document written by multiple tests — correlations overwrite; the last write wins. Use unique document IDs per test.
  2. Periodic background work with no data trigger — still needs GlobalFallback (serial execution only).
  3. Data from before test process started — no correlation entry exists; processing will be untracked. Use TestCorrelationStore.Seed() in test setup.
  4. Extremely slow Change Feed lag — if lag exceeds DefaultTtl (30 min default), correlation expires. Increase the TTL.
  5. Custom Change Feed builders — if production code uses a non-DI builder, the auto-wrapping won't intercept it. Use ChangeFeedCorrelation.Wrap<T>() explicitly.

Migrating from GlobalFallback

Before (serial execution only)

public async ValueTask InitializeAsync()
{
    TestIdentityScope.SetGlobalFallback(testName, testId);
}

public async ValueTask DisposeAsync()
{
    TestIdentityScope.ClearGlobalFallback();
}

After (parallel-safe, v2.36.0+)

Remove all SetGlobalFallback/ClearGlobalFallback calls. The database extension's AutoCorrelateWrites handles population, and your Change Feed/Stream wrapper handles resolution:

// No GlobalFallback needed — just wrap the processing delegate
var processor = container.GetChangeFeedProcessorBuilder<Order>("myProcessor",
    ChangeFeedCorrelation.Wrap<Order>(HandleChanges, "CosmosDB", o => o.Id))
    .Build();

Verification

After migration, run your test suite and check the diagnostic report:

  • 0% "Unknown" entries → migration successful
  • If some operations still show "Unknown" → check that all background processing delegates are wrapped

Rollback

If something breaks, GlobalFallback still works. Add it back temporarily while you debug:

// Temporary fallback — remove once wrapped delegates are confirmed working
TestIdentityScope.SetGlobalFallback(testName, testId);

See Also

Home


Demo


Getting Started

Common Tasks

Integration Guides

Extensions

Configuration

Features

Reference

Clone this wiki locally