Skip to content

Commit

Permalink
Merge pull request #478 from leapmotion/ref-waitforevent
Browse files Browse the repository at this point in the history
Pull WaitForEvent to DispatchQueue
  • Loading branch information
yeswalrus committed Mar 31, 2015
2 parents 58b5884 + aa5f87f commit b9b479b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 104 deletions.
33 changes: 0 additions & 33 deletions autowiring/CoreThread.h
Expand Up @@ -45,39 +45,6 @@ class CoreThread:
virtual void DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shared_ptr<CoreObject>&& refTracker) override;

public:
/// \internal
/// <summary>
/// Waits until a lambda function is ready to run in this thread's dispatch queue,
/// dispatches the function, and then returns.
/// </summary>
void WaitForEvent(void);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time period elapses, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::milliseconds milliseconds);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time is reached, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::steady_clock::time_point wakeTime);

/// \internal
/// <summary>
/// An unsafe variant of WaitForEvent
/// </summary>
bool WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime);

/// \internal
/// <summary>
/// Called automatically to begin core thread execution.
Expand Down
33 changes: 33 additions & 0 deletions autowiring/DispatchQueue.h
Expand Up @@ -147,6 +147,39 @@ class DispatchQueue {
/// <returns>The total number of events dispatched</returns>
int DispatchAllEvents(void);

/// \internal
/// <summary>
/// Waits until a lambda function is ready to run in this thread's dispatch queue,
/// dispatches the function, and then returns.
/// </summary>
void WaitForEvent(void);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time period elapses, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::milliseconds milliseconds);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time is reached, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::steady_clock::time_point wakeTime);

/// \internal
/// <summary>
/// An unsafe variant of WaitForEvent
/// </summary>
bool WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime);

public:
/// <summary>
/// Explicit overload for already-constructed dispatch thunk types
Expand Down
71 changes: 0 additions & 71 deletions src/autowiring/CoreThread.cpp
Expand Up @@ -26,77 +26,6 @@ void CoreThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shar
BasicThread::DoRunLoopCleanup(std::move(ctxt), std::move(refTracker));
}

void CoreThread::WaitForEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
if(m_aborted)
throw dispatch_aborted_exception();

// Unconditional delay:
m_queueUpdated.wait(lk, [this] () -> bool {
if(m_aborted)
throw dispatch_aborted_exception();

return
// We will need to transition out if the delay queue receives any items:
!this->m_delayedQueue.empty() ||

// We also transition out if the dispatch queue has any events:
!this->m_dispatchQueue.empty();
});

if(m_dispatchQueue.empty()) {
// The delay queue has items but the dispatch queue does not, we need to switch
// to the suggested sleep timeout variant:
WaitForEventUnsafe(lk, m_delayedQueue.top().GetReadyTime());
} else {
// We have an event, we can just hop over to this variant:
DispatchEventUnsafe(lk);
}
}

bool CoreThread::WaitForEvent(std::chrono::milliseconds milliseconds) {
return WaitForEvent(std::chrono::steady_clock::now() + milliseconds);
}

bool CoreThread::WaitForEvent(std::chrono::steady_clock::time_point wakeTime) {
if(wakeTime == std::chrono::steady_clock::time_point::max()) {
// Maximal wait--we can optimize by using the zero-arguments version
return WaitForEvent(), true;
}

std::unique_lock<std::mutex> lk(m_dispatchLock);
return WaitForEventUnsafe(lk, wakeTime);
}

bool CoreThread::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime) {
if(m_aborted)
throw dispatch_aborted_exception();

while(m_dispatchQueue.empty()) {
// Derive a wakeup time using the high precision timer:
wakeTime = SuggestSoonestWakeupTimeUnsafe(wakeTime);

// Now we wait, either for the timeout to elapse or for the dispatch queue itself to
// transition to the "aborted" state.
std::cv_status status = m_queueUpdated.wait_until(lk, wakeTime);

// Short-circuit if the queue was aborted
if(m_aborted)
throw dispatch_aborted_exception();

if (PromoteReadyDispatchersUnsafe())
// Dispatcher is ready to run! Exit our loop and dispatch an event
break;

if(status == std::cv_status::timeout)
// Can't proceed, queue is empty and nobody is ready to be run
return false;
}

DispatchEventUnsafe(lk);
return true;
}

void CoreThread::Run() {
while(!ShouldStop())
WaitForEvent();
Expand Down
72 changes: 72 additions & 0 deletions src/autowiring/DispatchQueue.cpp
Expand Up @@ -76,6 +76,78 @@ void DispatchQueue::Abort(void) {
m_queueUpdated.notify_all();
}

void DispatchQueue::WaitForEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
if (m_aborted)
throw dispatch_aborted_exception();

// Unconditional delay:
m_queueUpdated.wait(lk, [this]() -> bool {
if (m_aborted)
throw dispatch_aborted_exception();

return
// We will need to transition out if the delay queue receives any items:
!this->m_delayedQueue.empty() ||

// We also transition out if the dispatch queue has any events:
!this->m_dispatchQueue.empty();
});

if (m_dispatchQueue.empty()) {
// The delay queue has items but the dispatch queue does not, we need to switch
// to the suggested sleep timeout variant:
WaitForEventUnsafe(lk, m_delayedQueue.top().GetReadyTime());
}
else {
// We have an event, we can just hop over to this variant:
DispatchEventUnsafe(lk);
}
}

bool DispatchQueue::WaitForEvent(std::chrono::milliseconds milliseconds) {
return WaitForEvent(std::chrono::steady_clock::now() + milliseconds);
}

bool DispatchQueue::WaitForEvent(std::chrono::steady_clock::time_point wakeTime) {
if (wakeTime == std::chrono::steady_clock::time_point::max()) {
// Maximal wait--we can optimize by using the zero-arguments version
return WaitForEvent(), true;
}

std::unique_lock<std::mutex> lk(m_dispatchLock);
return WaitForEventUnsafe(lk, wakeTime);
}

bool DispatchQueue::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime) {
if (m_aborted)
throw dispatch_aborted_exception();

while (m_dispatchQueue.empty()) {
// Derive a wakeup time using the high precision timer:
wakeTime = SuggestSoonestWakeupTimeUnsafe(wakeTime);

// Now we wait, either for the timeout to elapse or for the dispatch queue itself to
// transition to the "aborted" state.
std::cv_status status = m_queueUpdated.wait_until(lk, wakeTime);

// Short-circuit if the queue was aborted
if (m_aborted)
throw dispatch_aborted_exception();

if (PromoteReadyDispatchersUnsafe())
// Dispatcher is ready to run! Exit our loop and dispatch an event
break;

if (status == std::cv_status::timeout)
// Can't proceed, queue is empty and nobody is ready to be run
return false;
}

DispatchEventUnsafe(lk);
return true;
}

bool DispatchQueue::DispatchEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);

Expand Down

0 comments on commit b9b479b

Please sign in to comment.