diff --git a/src/autowiring/CoreJob.cpp b/src/autowiring/CoreJob.cpp index 3737167af..78a07a70b 100644 --- a/src/autowiring/CoreJob.cpp +++ b/src/autowiring/CoreJob.cpp @@ -52,8 +52,10 @@ void CoreJob::OnPended(std::unique_lock&& lk){ // Need to ask the thread pool to handle our events again: m_curEventInTeardown = false; - if (m_curEvent) - delete static_cast*>(m_curEvent); + std::future* future = static_cast*>(std::atomic_exchange(&m_curEvent, nullptr)); + if (future) { + delete future; + } m_curEvent = new std::future( std::async( @@ -97,7 +99,7 @@ bool CoreJob::OnStart(void) { m_running = true; - std::unique_lock lk; + std::unique_lock lk(m_dispatchLock); if(m_pHead) // Simulate a pending event, because we need to set up our async: OnPended(std::move(lk)); @@ -122,21 +124,20 @@ void CoreJob::OnStop(bool graceful) { } void CoreJob::DoAdditionalWait(void) { - if (m_curEvent) { - std::future* ptr = static_cast*>(m_curEvent); - ptr->wait(); - delete ptr; - m_curEvent = nullptr; + std::future* future = static_cast*>(std::atomic_exchange(&m_curEvent, nullptr)); + + if (future) { + future->wait(); + delete future; } } bool CoreJob::DoAdditionalWait(std::chrono::nanoseconds timeout) { - if (!m_curEvent) + std::future* future = static_cast*>(std::atomic_exchange(&m_curEvent, nullptr)); + if (!future) return true; - std::future* ptr = static_cast*>(m_curEvent); - auto status = ptr->wait_for(NanosecondsForFutureWait(timeout)); - delete ptr; - m_curEvent = nullptr; + const auto status = future->wait_for(NanosecondsForFutureWait(timeout)); + delete future; return status == std::future_status::ready; } diff --git a/src/autowiring/CoreJob.h b/src/autowiring/CoreJob.h index c1e0c7b50..700870971 100644 --- a/src/autowiring/CoreJob.h +++ b/src/autowiring/CoreJob.h @@ -18,7 +18,7 @@ class CoreJob: bool m_running = false; // The current outstanding async in the thread pool, if one exists: - void* m_curEvent = nullptr; + std::atomic m_curEvent{ nullptr }; // Flag, indicating whether curEvent is in a teardown pathway. This // flag is highly stateful. diff --git a/src/autowiring/DispatchQueue.h b/src/autowiring/DispatchQueue.h index 54004fb5e..cfc55098f 100644 --- a/src/autowiring/DispatchQueue.h +++ b/src/autowiring/DispatchQueue.h @@ -406,9 +406,10 @@ class DispatchQueue { // Create the thunk first to reduce the amount of time we spend in lock: auto thunk = new autowiring::DispatchThunk<_Fx>(std::forward<_Fx>(fx)); - m_dispatchLock.lock(); + std::unique_lock lk(m_dispatchLock); + if (m_count >= m_dispatchCap) { - m_dispatchLock.unlock(); + lk.unlock(); delete thunk; return false; } @@ -420,16 +421,15 @@ class DispatchQueue { if (m_pHead) { m_pTail->m_pFlink = thunk; m_pTail = thunk; - m_dispatchLock.unlock(); + // Notification as needed: + OnPended(std::move(lk)); } else { m_pHead = m_pTail = thunk; - m_dispatchLock.unlock(); + // Notification as needed: + OnPended(std::move(lk)); m_queueUpdated.notify_all(); } - - // Notification as needed: - OnPended(std::unique_lock{}); return true; } }; diff --git a/src/autowiring/test/CoreJobTest.cpp b/src/autowiring/test/CoreJobTest.cpp index 7cca8d9e2..0a436ad0e 100644 --- a/src/autowiring/test/CoreJobTest.cpp +++ b/src/autowiring/test/CoreJobTest.cpp @@ -2,6 +2,7 @@ #include "stdafx.h" #include #include THREAD_HEADER +#include ARRAY_HEADER class CoreJobTest: public testing::Test @@ -179,3 +180,28 @@ TEST_F(CoreJobTest, RecursiveDeadlock) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); }; } + +TEST_F(CoreJobTest, PendFromMultipleThreads) { + AutoCurrentContext ctxt; + AutoRequired cj; + std::array threads; + const size_t times{ 256 }; + int counter{ 0 }; + + for (size_t i = 0; i < threads.size(); i++) { + threads[i] = std::thread([&] { + ctxt->DelayUntilInitiated(); + for (int j = 0; j < times; j++) { + *cj += [&counter] { + counter++; // Should be updated exclusively in the CoreJob's thread + }; + } + }); + } + ctxt->Initiate(); + for (size_t i = 0; i < threads.size(); i++) { + threads[i].join(); + } + ctxt->SignalShutdown(true); + ASSERT_EQ(times*threads.size(), counter); +}