Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 161 additions & 76 deletions Source/Task/TaskQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,24 +368,7 @@ HRESULT __stdcall TaskQueuePortImpl::QueueItem(
entry.enqueueTime = m_timer.GetDueTime(waitMs);
RETURN_HR_IF(E_OUTOFMEMORY, !m_pendingList->push_back(entry));

// If the entry's enqueue time is < our current time,
// update the timer.
while (true)
{
uint64_t due = m_timerDue;
if (entry.enqueueTime < due)
{
if (m_timerDue.compare_exchange_weak(due, entry.enqueueTime))
{
m_timer.Start(entry.enqueueTime);
break;
}
}
else if (m_timerDue.compare_exchange_weak(due, due))
{
break;
}
}
ArmTimerIfEarlier(entry.enqueueTime);
}

// QueueEntry now owns the ref.
Expand Down Expand Up @@ -1056,16 +1039,154 @@ void TaskQueuePortImpl::EraseQueue(
}
}

// Promotes every delayed entry whose deadline has already arrived and then
// arms the timer for the next future deadline, if one remains.
// Arms the OS timer for dueTime using min-wins CAS with post-Start
// verification. If another thread publishes an earlier deadline between
// our CAS and Start, we detect the overwrite and re-arm. This closes the
// TOCTOU window that could strand a pending entry.
//
// This replaces the older "pop exactly one entry whose enqueueTime matches the
// currently armed due time" flow. That older model made correctness depend on
// timestamps behaving like unique identities. By sweeping everything with
// enqueueTime <= now, equal-deadline siblings and stale timer callbacks both
// collapse into the same simple rule: if a callback is due, move it now; if it
// is still in the future, leave it pending and re-arm for the earliest future
// item.
// Uses <= so callers needing to re-arm for an already-published deadline
// (e.g. SubmitPendingCallbacks on an early timer fire) go through the
// same verified path.
//
// Returns true when the timer is stable (armed at or before dueTime, or
// dueTime is UINT64_MAX). Returns false if m_timerDue moved later (entry
// was promoted), signaling the caller to re-evaluate.
bool TaskQueuePortImpl::ArmTimerIfEarlier(uint64_t dueTime)
{
while (true)
{
uint64_t currentDue = m_timerDue.load();

if (dueTime <= currentDue)
{
if (dueTime == UINT64_MAX)
{
return true; // Nothing to arm.
}

if (m_timerDue.compare_exchange_weak(currentDue, dueTime))
{
m_timer.Start(dueTime);

// Post-Start verification: did m_timerDue change between
// our CAS and Start? If not, the timer is correctly armed.
uint64_t afterDue = m_timerDue.load();
if (afterDue == dueTime)
{
return true; // Unchanged — timer correctly armed.
}

if (afterDue < dueTime)
{
// An earlier deadline was published. Our Start may
// have overwritten a concurrent arm. Fix it.
dueTime = afterDue;
continue;
}

// m_timerDue moved later (e.g. UINT64_MAX from promotion).
// Our entry was already handled. Caller should re-evaluate.
return false;
}
// CAS failed (concurrent modification). Retry with fresh read.
continue;
}

// An earlier deadline is already published; the timer is already
// armed for it or another thread is in the process of arming it
// (with their own post-Start verification).
return true;
}
}

// Replaces the due time that just fired with the next surviving future
// deadline. Unlike ArmTimerIfEarlier, this helper is allowed to move the
// published due time later, but only while the caller's observed due time is
// still current. If another thread already published an earlier/equal
// deadline, leave it alone. Returns false when the published due time moved
// later after Start(), signaling the caller to rescan the pending list.
bool TaskQueuePortImpl::ArmTimerForNextPendingDueTime(
uint64_t previousDueTime,
uint64_t nextDueTime)
{
while (true)
{
if (m_timerDue.compare_exchange_strong(previousDueTime, nextDueTime))
{
m_timer.Start(nextDueTime);

uint64_t afterDue = m_timerDue.load();
if (afterDue == nextDueTime)
{
return true;
}

if (afterDue < nextDueTime)
{
// Another thread published an earlier deadline and is
// responsible for its own Start+verify cycle. The timer
// is already covered.
return true;
}

return false;
}

// CAS failed: compare_exchange loaded the current m_timerDue into
// previousDueTime. If that value is already <= nextDueTime, the
// timer is armed for an earlier-or-equal deadline and we're done.
if (previousDueTime <= nextDueTime)
{
return true;
}
}
}

// Re-arms the exact due time observed by an early/stale timer callback.
// If another thread has already consumed that due time and moved m_timerDue
// later (including to UINT64_MAX), the observed due is stale and the caller
// must re-evaluate instead of resurrecting it.
bool TaskQueuePortImpl::RearmTimerIfDueTimeUnchanged(uint64_t dueTime)
{
while (true)
{
uint64_t currentDue = m_timerDue.load();

if (currentDue < dueTime)
{
return true;
}

if (currentDue > dueTime)
{
return false;
}

if (m_timerDue.compare_exchange_weak(currentDue, dueTime))
{
m_timer.Start(dueTime);

uint64_t afterDue = m_timerDue.load();
if (afterDue == dueTime)
{
return true;
}

if (afterDue < dueTime)
{
dueTime = afterDue;
continue;
}

return false;
}
}
}

// Promote every pending callback whose deadline has arrived, then arm the
// timer for the earliest remaining future deadline. Sweeping all
// enqueueTime <= now avoids treating timestamps as unique identities, so
// equal-deadline siblings and stale timer callbacks follow the same rule.
void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
_In_ uint64_t dueTime,
_In_ uint64_t now)
Expand Down Expand Up @@ -1132,23 +1253,14 @@ void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
{
if (nextItem.portContext->GetStatus() == TaskQueuePortStatus::Active)
{
while (true)
// Replace the due time that just fired with the earliest
// future deadline that survived the ready sweep.
if (!ArmTimerForNextPendingDueTime(dueTime, nextItem.enqueueTime))
{
// Publish the earliest future deadline that survived the
// ready sweep. If another thread already armed an even
// earlier timer, leave that earlier deadline in place.
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
{
m_timer.Start(nextItem.enqueueTime);
break;
}

nextItem.portContext->Release();
now = m_timer.GetCurrentTime();
dueTime = m_timerDue.load();

if (dueTime <= nextItem.enqueueTime)
{
break;
}
continue;
}
}
else
Expand Down Expand Up @@ -1226,45 +1338,18 @@ void TaskQueuePortImpl::SubmitPendingCallbacks()
{
uint64_t dueTime = m_timerDue.load();

if (dueTime == UINT64_MAX)
{
return;
}

// Threadpool timer callbacks that were already queued can still arrive
// after the timer has been retargeted. Treat the callback as advisory and
// only sweep ready entries once the currently armed monotonic deadline has
// actually arrived.
//
// Important: do not just return on an "early" callback. On Win32 the
// threadpool timer's relative wait source is not the same clock object as
// std::chrono::steady_clock, so a legitimate one-shot fire can arrive a
// little before the stored steady-clock deadline. If we drop that callback
// without re-arming the timer, the pending entry can remain stranded until
// some unrelated later timer fire or termination path happens to flush it.
//
// Also do not blindly re-arm the due time we just read. Another thread can
// publish an earlier pending entry between the load above and Start() below.
// If this stale callback then overwrites the timer with the older deadline,
// the newer earlier entry can stay stranded until the older deadline fires.
// Only re-arm when m_timerDue still matches the due time we observed.
// Timer callbacks are advisory: a threadpool fire can arrive after
// retargeting, or slightly before the steady-clock deadline due to
// clock-source differences on Win32. If the deadline hasn't arrived,
// re-arm the same published due time rather than silently dropping the
// callback (which would strand the pending entry).
const uint64_t now = m_timer.GetCurrentTime();
if (now < dueTime)
{
uint64_t expectedDueTime = dueTime;
if (m_timerDue.compare_exchange_weak(expectedDueTime, dueTime))
if (RearmTimerIfDueTimeUnchanged(dueTime))
{
m_timer.Start(dueTime);

// It's possible someone snuck a change into m_timerDue after the CAS
// but before the start call, so we've just written the wrong value to
// the timer. Verify dueTime again before returning.
if (m_timerDue.load() == dueTime)
{
return;
}
return;
}

continue;
}

Expand Down
8 changes: 8 additions & 0 deletions Source/Task/TaskQueueImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ class TaskQueuePortImpl: public Api<ApiId::TaskQueuePort, ITaskQueuePort>
static void EraseQueue(
_In_opt_ LocklessQueue<QueueEntry>* queue);

bool ArmTimerIfEarlier(_In_ uint64_t dueTime);

bool ArmTimerForNextPendingDueTime(
_In_ uint64_t previousDueTime,
_In_ uint64_t nextDueTime);

bool RearmTimerIfDueTimeUnchanged(_In_ uint64_t dueTime);

void PromoteReadyPendingCallbacks(
_In_ uint64_t dueTime,
_In_ uint64_t now);
Expand Down