Skip to content

Heap Corruption in Concurrent XTaskQueue Termination #936

@jhugard

Description

@jhugard

Summary

Concurrent termination operations on XTaskQueue cause heap corruption and access violations due to unsynchronized access to internal termination tracking lists.

Environment

  • Platform: Windows x64
  • Build: Debug with Page Heap enabled
  • Test: VerifyCompositeTerminationRaceRepro (64 threads, 100 iterations)

Symptoms

  • Crash Type: Access violation (0xc0000005)
  • Location: AtomicVector<ITaskQueuePortContext*>::Remove → corrupted vector iterator
  • Failure Bucket: INVALID_POINTER_WRITE_c0000005_ntdll.dll!RtlpWaitOnCriticalSection
  • Trigger: High-concurrency composite queue termination stress test

Root Cause Analysis

Race Condition #1: Nested Terminate During SignalTerminations

Scenario: Thread A iterates m_terminationList in SignalTerminations(), invokes a termination callback, which calls XTaskQueueTerminate() on another queue sharing the same port, causing concurrent modification of the list being iterated.

Code Path:

SignalTerminations()
  → m_terminationList->remove_if([...])
    → entry->callback(callbackContext)
      → User callback calls XTaskQueueTerminate()
        → ScheduleTermination()
          → m_terminationList->push_back()  // RACE: concurrent modification

Race Condition #2: Concurrent ScheduleTermination

Scenario: Multiple threads call ScheduleTermination() simultaneously (e.g., multiple delegate queues terminating concurrently), causing heap corruption in the lockless queue's push_back operation.

Code Path:

Thread A: ScheduleTermination() → m_terminationList->push_back(...)
Thread B: ScheduleTermination() → m_terminationList->push_back(...)
// RACE: concurrent push_back corrupts internal vector state

Reproduction

Minimal Reproduction

// Create root queue and get ports
XTaskQueueHandle root;
XTaskQueueCreate(XTaskQueueDispatchMode::ThreadPool,
                 XTaskQueueDispatchMode::ThreadPool, &root);
XTaskQueuePortHandle workPort, completionPort;
XTaskQueueGetPort(root, XTaskQueuePort::Work, &workPort);
XTaskQueueGetPort(root, XTaskQueuePort::Completion, &completionPort);

// 64 threads, each creating 100 delegate queues
// Terminate all concurrently → triggers heap corruption
XTaskQueueCreateComposite(workPort, completionPort, &delegate);
XTaskQueueTerminate(delegate, false, context, callback);
XTaskQueueTerminate(root, true, nullptr, nullptr);

Full Test Case (Trimmed)

DEFINE_TEST_CASE(VerifyCompositeTerminationRaceRepro)
{
    // Stress test for two race conditions in XTaskQueue termination:
    // Race #1: Nested Terminate during SignalTerminations iteration
    // Race #2: Concurrent ScheduleTermination heap corruption
    //
    // CRITICAL: Run with page heap enabled for improved Race #2 detection!
    // gflags /p /enable <test_exe> /full
    
    // Test parameters: Aggressive concurrency to trigger race conditions
    constexpr size_t k_thread_count = 64;              // High thread count
    constexpr size_t k_iterations_per_thread = 100;    // Iterations per thread
    constexpr size_t k_work_items_per_queue = 10;      // Work items per queue

    // Create root queue with thread pool dispatch
    AutoQueueHandle root;
    XTaskQueueCreate(XTaskQueueDispatchMode::ThreadPool,
                     XTaskQueueDispatchMode::ThreadPool, &root);

    // Get ports for creating composite delegates
    XTaskQueuePortHandle workPort = nullptr;
    XTaskQueuePortHandle completionPort = nullptr;
    XTaskQueueGetPort(root, XTaskQueuePort::Work, &workPort);
    XTaskQueueGetPort(root, XTaskQueuePort::Completion, &completionPort);

    // Synchronization and counters
    std::atomic<int> ready{0}, done{0}, createErrors{0}, terminateErrors{0};
    std::atomic<int> workCallbackCount{0};
    std::atomic<int> delegateTerminationsRemaining{0};  // Track terminations
    std::atomic<bool> go{false};

    std::mutex cvMutex;
    std::condition_variable cv;
    std::vector<std::thread> threads;

    // Spawn worker threads
    for (size_t t = 0; t < k_thread_count; ++t)
    {
        threads.emplace_back([&]
        {
            // Coordinated start (ready/go synchronization)...
            
            // Rapidly create, populate, and terminate composite queues
            for (size_t iter = 0; iter < k_iterations_per_thread; ++iter)
            {
                XTaskQueueHandle delegateQueue = nullptr;
                XTaskQueueCreateComposite(workPort, completionPort, &delegateQueue);

                // Submit work items executing during termination
                for (size_t w = 0; w < k_work_items_per_queue; ++w)
                {
                    XTaskQueueSubmitCallback(delegateQueue, XTaskQueuePort::Work,
                        &workCallbackCount,
                        [](void* context, bool canceled) {
                            if (!canceled) {
                                std::atomic<int>* counter = 
                                    static_cast<std::atomic<int>*>(context);
                                counter->fetch_add(1, std::memory_order_relaxed);
                                std::this_thread::sleep_for(
                                    std::chrono::microseconds(50));
                            }
                        });
                }

                // Track this delegate's termination
                delegateTerminationsRemaining.fetch_add(1, std::memory_order_acq_rel);

                // Terminate with callback to track completion
                XTaskQueueTerminate(delegateQueue, false,  // wait=false
                    &delegateTerminationsRemaining,
                    [](void* context) {
                        std::atomic<int>* counter = 
                            static_cast<std::atomic<int>*>(context);
                        counter->fetch_sub(1, std::memory_order_acq_rel);
                    });
                
                XTaskQueueCloseHandle(delegateQueue);
            }
            
            done.fetch_add(1, std::memory_order_acq_rel);
        });
    }

    // Wait for all threads ready, start simultaneously, wait for completion...

    // Terminate root BEFORE waiting for delegates (proves independence)
    XTaskQueueTerminate(root, true, nullptr, nullptr);
    XTaskQueueCloseHandle(root.Release());

    // Wait for all delegate terminations with timeout
    UINT64 waitStartTicks = GetTickCount64();
    while (delegateTerminationsRemaining.load(std::memory_order_acquire) > 0
        && (GetTickCount64() - waitStartTicks) < 60000)
    {
        std::this_thread::yield();
    }
    
    VERIFY_ARE_EQUAL(0, delegateTerminationsRemaining.load());
    VERIFY_ARE_EQUAL(0, terminateErrors.load());
}

Page Heap Detection:

gflags /p /enable <test_exe> /full

Stack Trace

ntdll!RtlpWaitOnCriticalSection+0xb3
  → std::_Iterator_base12::_Adopt_locked+0x44
  → std::vector::end+0x4a
  → AtomicVector<ITaskQueuePortContext*>::Remove
  → TaskQueuePortImpl::Detach
  → TaskQueueImpl::RundownObject
  → Api<2,ITaskQueue>::Release

Impact

  • Severity: High - causes crashes in production when queues are heavily used
  • Frequency: Very rare under normal load, heisenbug reproduction under stress (256 threads may take 30-60 runs to repro)
  • Scope: Any code using composite queues with concurrent termination

Proposed Fix

  1. Add mutex (m_terminationLock) to serialize termination list access
  2. Refactor SignalTerminations() to collect entries under lock, process callbacks outside lock
  3. Add AddRef/Release around callbacks to prevent UAF
  4. Protect all list operations: PrepareTermination, CancelTermination, Terminate, ScheduleTermination, ResumeTermination, ResumePort
  5. Add TerminationListEmpty() helper for safe empty checks

Validation

  • Stress test reproduces crash before fix in < 100 runs, 0% after 100 runs post-fix
  • Page heap clean after fix (no heap corruption detected)
  • Full test suite passes (81/81 tests)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions