Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull WaitForEvent to DispatchQueue #478

Merged
merged 1 commit into from Mar 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 0 additions & 33 deletions autowiring/CoreThread.h
Expand Up @@ -45,39 +45,6 @@ class CoreThread:
virtual void DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shared_ptr<CoreObject>&& refTracker) override;

public:
/// \internal
/// <summary>
/// Waits until a lambda function is ready to run in this thread's dispatch queue,
/// dispatches the function, and then returns.
/// </summary>
void WaitForEvent(void);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time period elapses, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::milliseconds milliseconds);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time is reached, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::steady_clock::time_point wakeTime);

/// \internal
/// <summary>
/// An unsafe variant of WaitForEvent
/// </summary>
bool WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime);

/// \internal
/// <summary>
/// Called automatically to begin core thread execution.
Expand Down
33 changes: 33 additions & 0 deletions autowiring/DispatchQueue.h
Expand Up @@ -147,6 +147,39 @@ class DispatchQueue {
/// <returns>The total number of events dispatched</returns>
int DispatchAllEvents(void);

/// \internal
/// <summary>
/// Waits until a lambda function is ready to run in this thread's dispatch queue,
/// dispatches the function, and then returns.
/// </summary>
void WaitForEvent(void);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time period elapses, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::milliseconds milliseconds);

/// \internal
/// <summary>
/// Waits until a lambda function in the dispatch queue is ready to run or the specified
/// time is reached, whichever comes first.
/// </summary>
/// <returns>
/// False if the timeout period elapsed before an event could be dispatched, true otherwise
/// </returns>
bool WaitForEvent(std::chrono::steady_clock::time_point wakeTime);

/// \internal
/// <summary>
/// An unsafe variant of WaitForEvent
/// </summary>
bool WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime);

public:
/// <summary>
/// Explicit overload for already-constructed dispatch thunk types
Expand Down
71 changes: 0 additions & 71 deletions src/autowiring/CoreThread.cpp
Expand Up @@ -26,77 +26,6 @@ void CoreThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shar
BasicThread::DoRunLoopCleanup(std::move(ctxt), std::move(refTracker));
}

void CoreThread::WaitForEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
if(m_aborted)
throw dispatch_aborted_exception();

// Unconditional delay:
m_queueUpdated.wait(lk, [this] () -> bool {
if(m_aborted)
throw dispatch_aborted_exception();

return
// We will need to transition out if the delay queue receives any items:
!this->m_delayedQueue.empty() ||

// We also transition out if the dispatch queue has any events:
!this->m_dispatchQueue.empty();
});

if(m_dispatchQueue.empty()) {
// The delay queue has items but the dispatch queue does not, we need to switch
// to the suggested sleep timeout variant:
WaitForEventUnsafe(lk, m_delayedQueue.top().GetReadyTime());
} else {
// We have an event, we can just hop over to this variant:
DispatchEventUnsafe(lk);
}
}

bool CoreThread::WaitForEvent(std::chrono::milliseconds milliseconds) {
return WaitForEvent(std::chrono::steady_clock::now() + milliseconds);
}

bool CoreThread::WaitForEvent(std::chrono::steady_clock::time_point wakeTime) {
if(wakeTime == std::chrono::steady_clock::time_point::max()) {
// Maximal wait--we can optimize by using the zero-arguments version
return WaitForEvent(), true;
}

std::unique_lock<std::mutex> lk(m_dispatchLock);
return WaitForEventUnsafe(lk, wakeTime);
}

bool CoreThread::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime) {
if(m_aborted)
throw dispatch_aborted_exception();

while(m_dispatchQueue.empty()) {
// Derive a wakeup time using the high precision timer:
wakeTime = SuggestSoonestWakeupTimeUnsafe(wakeTime);

// Now we wait, either for the timeout to elapse or for the dispatch queue itself to
// transition to the "aborted" state.
std::cv_status status = m_queueUpdated.wait_until(lk, wakeTime);

// Short-circuit if the queue was aborted
if(m_aborted)
throw dispatch_aborted_exception();

if (PromoteReadyDispatchersUnsafe())
// Dispatcher is ready to run! Exit our loop and dispatch an event
break;

if(status == std::cv_status::timeout)
// Can't proceed, queue is empty and nobody is ready to be run
return false;
}

DispatchEventUnsafe(lk);
return true;
}

void CoreThread::Run() {
while(!ShouldStop())
WaitForEvent();
Expand Down
72 changes: 72 additions & 0 deletions src/autowiring/DispatchQueue.cpp
Expand Up @@ -76,6 +76,78 @@ void DispatchQueue::Abort(void) {
m_queueUpdated.notify_all();
}

void DispatchQueue::WaitForEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
if (m_aborted)
throw dispatch_aborted_exception();

// Unconditional delay:
m_queueUpdated.wait(lk, [this]() -> bool {
if (m_aborted)
throw dispatch_aborted_exception();

return
// We will need to transition out if the delay queue receives any items:
!this->m_delayedQueue.empty() ||

// We also transition out if the dispatch queue has any events:
!this->m_dispatchQueue.empty();
});

if (m_dispatchQueue.empty()) {
// The delay queue has items but the dispatch queue does not, we need to switch
// to the suggested sleep timeout variant:
WaitForEventUnsafe(lk, m_delayedQueue.top().GetReadyTime());
}
else {
// We have an event, we can just hop over to this variant:
DispatchEventUnsafe(lk);
}
}

bool DispatchQueue::WaitForEvent(std::chrono::milliseconds milliseconds) {
return WaitForEvent(std::chrono::steady_clock::now() + milliseconds);
}

bool DispatchQueue::WaitForEvent(std::chrono::steady_clock::time_point wakeTime) {
if (wakeTime == std::chrono::steady_clock::time_point::max()) {
// Maximal wait--we can optimize by using the zero-arguments version
return WaitForEvent(), true;
}

std::unique_lock<std::mutex> lk(m_dispatchLock);
return WaitForEventUnsafe(lk, wakeTime);
}

bool DispatchQueue::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime) {
if (m_aborted)
throw dispatch_aborted_exception();

while (m_dispatchQueue.empty()) {
// Derive a wakeup time using the high precision timer:
wakeTime = SuggestSoonestWakeupTimeUnsafe(wakeTime);

// Now we wait, either for the timeout to elapse or for the dispatch queue itself to
// transition to the "aborted" state.
std::cv_status status = m_queueUpdated.wait_until(lk, wakeTime);

// Short-circuit if the queue was aborted
if (m_aborted)
throw dispatch_aborted_exception();

if (PromoteReadyDispatchersUnsafe())
// Dispatcher is ready to run! Exit our loop and dispatch an event
break;

if (status == std::cv_status::timeout)
// Can't proceed, queue is empty and nobody is ready to be run
return false;
}

DispatchEventUnsafe(lk);
return true;
}

bool DispatchQueue::DispatchEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);

Expand Down