From ff4e28009b1c3c11e7c9dd678ae5582fa3c95de5 Mon Sep 17 00:00:00 2001 From: Jason Lokerson Date: Mon, 8 Feb 2016 16:30:03 -0800 Subject: [PATCH] Implement a try-to-dispatch concept This is useful for cases where consumers want to be able to retry lambdas from the dispatch queue while preserving order in single-threaded use cases. --- autowiring/DispatchQueue.h | 28 ++++++++++++++++ src/autowiring/DispatchQueue.cpp | 34 ++++++++++++++++++++ src/autowiring/test/DispatchQueueTest.cpp | 39 +++++++++++++++++++++++ 3 files changed, 101 insertions(+) diff --git a/autowiring/DispatchQueue.h b/autowiring/DispatchQueue.h index 481c7a4ea..2fd2a4bf8 100644 --- a/autowiring/DispatchQueue.h +++ b/autowiring/DispatchQueue.h @@ -77,6 +77,16 @@ class DispatchQueue { /// void DispatchEventUnsafe(std::unique_lock& lk); + /// + /// Similar to TryDispatchEvent, except assumes that the dispatch lock is currently held + /// + /// A lock on m_dispatchLock + /// + /// This method assumes that the dispatch lock is held and that m_aborted is false. It + /// is an error to call this method without those preconditions met. + /// + void TryDispatchEventUnsafe(std::unique_lock& lk); + /// /// Utility virtual, called whenever a new event is deferred /// @@ -154,6 +164,24 @@ class DispatchQueue { /// bool DispatchEvent(void); + /// + /// Similar to WaitForEvent, but does not block + /// + /// True if an event was dispatched, false if the queue was empty when checked + /// + /// Implements a retry capability for the dispatch queue + /// + /// If the dispatch queue is empty, this method will check the delayed dispatch queue. Unlike + /// DispatchEvent, if the pended lambda throws an exception, the lambda is put back at the front + /// of the queue rather than being deleted. + /// + /// This method may break the strict sequentiality guarantee of DispatchQueue if it is used in a + /// concurrent or reentrant use case. Consider a queue consisting of two lambdas, [A, B]. If A + /// throws an exception the first time it is invoked, and B does not throw, and A calls + /// DispatchEvent, then the call order will be [A(throws), B, A]. + /// + bool TryDispatchEvent(void); + /// /// Similar to DispatchEvent, but will attempt to dispatch all events currently queued /// diff --git a/src/autowiring/DispatchQueue.cpp b/src/autowiring/DispatchQueue.cpp index 9bc960d8e..7ddda970c 100644 --- a/src/autowiring/DispatchQueue.cpp +++ b/src/autowiring/DispatchQueue.cpp @@ -67,6 +67,31 @@ void DispatchQueue::DispatchEventUnsafe(std::unique_lock& lk) { (*thunk)(); } +void DispatchQueue::TryDispatchEventUnsafe(std::unique_lock& lk) { + // Pull the ready thunk off of the front of the queue and pop it while we hold the lock. + // Then, we will excecute the call while the lock has been released so we do not create + // deadlocks. + DispatchThunkBase* pThunk = m_pHead; + m_pHead = pThunk->m_pFlink; + lk.unlock(); + + try { (*pThunk)(); } + catch (...) { + // Failed to execute thunk, put it back + lk.lock(); + pThunk->m_pFlink = m_pHead; + m_pHead = pThunk; + throw; + } + + if (!--m_count) { + // Notify that we have hit zero: + std::lock_guard{ *lk.mutex() }; + m_queueUpdated.notify_all(); + } + delete pThunk; +} + void DispatchQueue::Abort(void) { // Do not permit any more lambdas to be pended to our queue DispatchThunkBase* pHead; @@ -194,6 +219,15 @@ bool DispatchQueue::DispatchEvent(void) { return true; } +bool DispatchQueue::TryDispatchEvent(void) { + std::unique_lock lk(m_dispatchLock); + if (!m_pHead && !PromoteReadyDispatchersUnsafe()) + return false; + + TryDispatchEventUnsafe(lk); + return true; +} + int DispatchQueue::DispatchAllEvents(void) { int retVal = 0; while(DispatchEvent()) diff --git a/src/autowiring/test/DispatchQueueTest.cpp b/src/autowiring/test/DispatchQueueTest.cpp index 875500aca..e96da0375 100644 --- a/src/autowiring/test/DispatchQueueTest.cpp +++ b/src/autowiring/test/DispatchQueueTest.cpp @@ -249,3 +249,42 @@ TEST_F(DispatchQueueTest, AbortObserver) { dq.Abort(); ASSERT_TRUE(onAbortedCalled) << "Abort signal handler not asserted as expected"; } + +TEST_F(DispatchQueueTest, TryDispatchTest) { + DispatchQueue dq; + dq += [] { + throw std::runtime_error{"Error!"}; + }; + ASSERT_THROW(dq.TryDispatchEvent(), std::runtime_error); + ASSERT_EQ(1UL, dq.GetDispatchQueueLength()); +} + +TEST_F(DispatchQueueTest, VerifyRetry) { + DispatchQueue dq; + size_t nCalled = 0; + dq += [&nCalled] { + if(!nCalled++) + throw std::runtime_error{ "Error!" }; + }; + ASSERT_THROW(dq.TryDispatchEvent(), std::runtime_error); + ASSERT_EQ(1U, dq.GetDispatchQueueLength()); + ASSERT_TRUE(dq.TryDispatchEvent()); + ASSERT_EQ(2U, nCalled); +} + +TEST_F(DispatchQueueTest, TwoDispatchRetry) { + DispatchQueue dq; + size_t nCalled = 0; + dq += [&nCalled] { + if (!nCalled++) + throw std::runtime_error{ "Error!" }; + }; + dq += [] {}; + ASSERT_THROW(dq.TryDispatchEvent(), std::runtime_error); + ASSERT_EQ(2U, dq.GetDispatchQueueLength()); + ASSERT_TRUE(dq.TryDispatchEvent()); + ASSERT_EQ(1U, dq.GetDispatchQueueLength()); + ASSERT_TRUE(dq.TryDispatchEvent()); + ASSERT_EQ(2U, nCalled); + ASSERT_EQ(0U, dq.GetDispatchQueueLength()); +}