Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use case improvements for autowiring::parallel #847

Merged
merged 2 commits into from Feb 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 104 additions & 45 deletions autowiring/Parallel.h
@@ -1,13 +1,15 @@
// Copyright (C) 2012-2015 Leap Motion, Inc. All rights reserved.
#pragma once
#include "AnySharedPointer.h"
#include "CoreContext.h"

#include "auto_id.h"
#include "DispatchQueue.h"
#include <iterator>
#include <unordered_map>
#include <typeindex>
#include <deque>

class CoreContext;

namespace autowiring {

class parallel;
Expand Down Expand Up @@ -107,8 +109,39 @@ struct parallel_iterator<void>
// are run in the thread pool of the current context
class parallel {
public:
/// <summary>
/// Constructs a parallel instance attached to the current context
/// </summary>
/// <remarks>
/// The thread pool will be sized to the value of std::thread::hardware_concurrency()
/// </remarks>
parallel(void);
parallel(CoreContext& ctxt);

/// <summary>
/// Constructs a parallel instance attached to the current context
/// </summary>
parallel(size_t concurrency);

/// <summary>
/// Constructs a pool with exactly the specified number of threads and owning context
/// </summary>
/// <param name="ctxt">The owning context</param>
/// <param name="concurrency">The number of parallel threads, set to 0 to use the system default</param>
/// <remarks>
/// The context's thread pool is not used for this instance. The context is only used to obtain
/// a stop signal for termination and cleanup behaviors.
/// </remarks>
parallel(CoreContext& ctxt, size_t concurrency);

/// <summary>
/// Non-blocking destructor
/// </summary>
/// <remarks>
/// This destructor will cause running threads to eventually conclude in a safe manner, but will
/// not block for their completion. The caller is responsible for manually invoking stop and then
/// barrier if thread termination must be guaranteed.
/// </remarks>
~parallel(void);

// Add job to be run in the thread pool
template<typename _Fx>
Expand All @@ -119,14 +152,14 @@ class parallel {
using RetType = typename std::remove_cv<decltype(fx())>::type;

// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_queueMutex, ++m_outstandingCount;
(std::lock_guard<std::mutex>)m_block->m_lock, ++m_block->m_outstandingCount;

*m_ctxt += [this, fx] {
m_block->dq += [this, fx] {
auto result = std::make_shared<RetType>(fx());

std::lock_guard<std::mutex> lk(m_queueMutex);
m_queue[typeid(RetType)].emplace_back(std::move(result));
m_queueUpdated.notify_all();
std::lock_guard<std::mutex>{m_block->m_lock},
m_block->m_queue[auto_id_t<RetType>{}].emplace_back(std::move(result));
m_block->m_queueUpdated.notify_all();
};
}

Expand All @@ -137,47 +170,47 @@ class parallel {
>::type
operator+=(_Fx&& fx) {
// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_queueMutex, ++m_outstandingCount;
(std::lock_guard<std::mutex>)m_block->m_lock, ++m_block->m_outstandingCount;

*m_ctxt += [this, fx] {
m_block->dq += [this, fx] {
fx();

std::lock_guard<std::mutex> lk(m_queueMutex);
m_nVoidEntries++;
m_queueUpdated.notify_all();
std::lock_guard<std::mutex>{m_block->m_lock},
m_block->m_nVoidEntries++;
m_block->m_queueUpdated.notify_all();
};
}

// Discard the most recent result. Blocks until the next result arives.
template<typename T>
void Pop(void) {
std::unique_lock<std::mutex> lk(m_queueMutex);
if (!m_outstandingCount)
std::unique_lock<std::mutex> lk(m_block->m_lock);
if (!m_block->m_outstandingCount)
throw std::out_of_range("No outstanding jobs");

if (std::is_same<void, T>::value) {
m_queueUpdated.wait(lk, [this] { return m_nVoidEntries != 0; });
m_nVoidEntries--;
m_block->m_queueUpdated.wait(lk, [this] { return m_block->m_nVoidEntries != 0; });
m_block->m_nVoidEntries--;
} else {
auto& qu = m_queue[typeid(T)];
m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
auto& qu = m_block->m_queue[auto_id_t<T>{}];
m_block->m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
qu.pop_front();
}

--m_outstandingCount;
--m_block->m_outstandingCount;
}

// Get the most result from the most recent job. Blocks until a result arrives
// if there isn't one already available
template<typename T>
T Top(void) {
std::unique_lock<std::mutex> lk(m_queueMutex);
std::unique_lock<std::mutex> lk(m_block->m_lock);

if (m_queue[typeid(T)].empty())
m_queueUpdated.wait(lk, [this]{
return !m_queue[typeid(T)].empty();
if (m_block->m_queue[auto_id_t<T>{}].empty())
m_block->m_queueUpdated.wait(lk, [this]{
return !m_block->m_queue[auto_id_t<T>{}].empty();
});
return *static_cast<T*>(m_queue[typeid(T)].front().ptr());
return *static_cast<T*>(m_block->m_queue[auto_id_t<T>{}].front().ptr());
}

// Get a collection containing all entries of the specified type
Expand All @@ -186,27 +219,26 @@ class parallel {
return parallel_collection<T> { begin<T>(), end<T>() };
}

// Blocks until all outstanding work is done
/// <summary>
/// Blocks until all outstanding work is done
/// </summary>
/// <remarks>
/// If a stop call has been made, this method will also block until all owned threads have quit
/// </remarks>
void barrier(void) {
std::unique_lock<std::mutex> lk(m_queueMutex);
m_queueUpdated.wait(lk, [this] {
size_t totalReady = m_nVoidEntries;
for (auto& entry : m_queue)
std::unique_lock<std::mutex> lk(m_block->m_lock);
m_block->m_queueUpdated.wait(lk, [this] {
size_t totalReady = m_block->m_nVoidEntries;
for (auto& entry : m_block->m_queue)
totalReady += entry.second.size();
return m_outstandingCount == totalReady;
return m_block->m_outstandingCount == totalReady;
});
}

// Get an iterator to the begining of out queue of job results
template<typename T>
parallel_iterator<T> begin(void) {
if (!m_ctxt->IsRunning()) {
if (m_ctxt->IsShutdown())
throw std::runtime_error("Attempted to enumerate members of a context after the context was shut down");
else
throw std::runtime_error("Start the context before attempting to enumerate members of an autowiring::parallel collection");
}
return{ *this, m_outstandingCount };
return{ *this, m_block->m_outstandingCount };
}

// Iterator representing no jobs results remaining
Expand All @@ -216,18 +248,45 @@ class parallel {
return { *this, zero };
}

/// <summary>
/// Cleanup behavior, causes all running threads in the pool to quit
/// </summary>
/// <remarks>
/// This method does not block. Use parallel::barrier to wait for all working threads to stop.
/// </remarks>
void stop(void);

protected:
std::mutex m_queueMutex;
std::condition_variable m_queueUpdated;
std::unordered_map<std::type_index, std::deque<AnySharedPointer>> m_queue;
// Internal result maintenance types:
struct StatusBlock {
std::mutex m_lock;
std::condition_variable m_queueUpdated;
std::unordered_map<auto_id, std::deque<AnySharedPointer>> m_queue;

// Holds true as long as the owner exists; false once the owner has been destroyed
bool owned = true;

// For void entries we don't need a queue, we can just keep a general count of "done"
size_t m_nVoidEntries = 0;
// Dispatch queue containing input items:
DispatchQueue dq;

// Total number of entries currently outstanding:
size_t m_outstandingCount = 0;
// For void entries we don't need a queue, we can just keep a general count of "done"
size_t m_nVoidEntries = 0;

// Total number of entries currently outstanding:
size_t m_outstandingCount = 0;
};

// Status block is held outside in order to avoid race conditions
std::shared_ptr<StatusBlock> m_block = std::make_shared<StatusBlock>();

// Shared pointer to the enclosing context
std::shared_ptr<CoreContext> m_ctxt;

// Termination signal registration with our enclosing context:
registration_t onStopReg;

// Unsynchronized version of stop
void stop_unsafe(void);
};

template<typename T>
Expand Down
60 changes: 57 additions & 3 deletions src/autowiring/Parallel.cpp
Expand Up @@ -2,6 +2,7 @@
#include "stdafx.h"
#include "Parallel.h"
#include "autowiring.h"
#include <thread>

using namespace autowiring;

Expand All @@ -10,9 +11,62 @@ void parallel_iterator<void>::operator++(int) {
}

parallel::parallel(void):
m_ctxt(AutoCurrentContext{})
parallel{ *CoreContext::CurrentContext(), std::thread::hardware_concurrency() }
{}

parallel::parallel(CoreContext& ctxt) :
m_ctxt(ctxt.shared_from_this())
parallel::parallel(size_t concurrency) :
parallel{ *CoreContext::CurrentContext(), concurrency }
{}

parallel::parallel(CoreContext& ctxt, size_t concurrency):
m_ctxt(ctxt.shared_from_this())
{
if (!concurrency)
concurrency = std::thread::hardware_concurrency();

auto block = m_block;

// Fire off a bunch of threads to do work:
while (concurrency--)
std::thread(
[block] {
while(block->owned)
try { block->dq.WaitForEvent(); }
catch(dispatch_aborted_exception&) {
// Expected behavior, things tearing down, end here
return;
}
}
).detach();

// Configure our signal after everything else is done
onStopReg = ctxt.onShutdown += [this, block] {
std::lock_guard<std::mutex> lk(block->m_lock);
if (!block->owned)
// Status block already destroyed, we're in a teardown race
// Short-circuit here, no reason to double-call stop
return;

stop_unsafe();
};
}

parallel::~parallel(void) {
stop();
}

void parallel::stop(void) {
std::lock_guard<std::mutex> lk(m_block->m_lock);
stop_unsafe();
}

void parallel::stop_unsafe(void) {
// Trivial return check:
if (!m_block)
return;

m_block->owned = false;
m_ctxt->onShutdown -= onStopReg;
m_ctxt.reset();
m_block.reset();
}
7 changes: 1 addition & 6 deletions src/autowiring/test/ParallelTest.cpp
Expand Up @@ -11,7 +11,6 @@ class ParallelTest:
{};

TEST_F(ParallelTest, Basic) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

std::mt19937_64 mt(time(nullptr));
Expand Down Expand Up @@ -42,7 +41,6 @@ TEST_F(ParallelTest, Basic) {
}

TEST_F(ParallelTest, All) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

for (size_t i = 0; i < 10; i++)
Expand All @@ -57,7 +55,6 @@ TEST_F(ParallelTest, All) {
}

TEST_F(ParallelTest, VoidReturn) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

auto val = std::make_shared<std::atomic<size_t>>(0);
Expand All @@ -71,7 +68,6 @@ TEST_F(ParallelTest, VoidReturn) {
}

TEST_F(ParallelTest, VoidReturnAll) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

auto val = std::make_shared<std::atomic<size_t>>(0);
Expand All @@ -86,9 +82,8 @@ TEST_F(ParallelTest, VoidReturnAll) {
}

TEST_F(ParallelTest, Barrier) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

std::atomic<size_t> x{ 0 };
for (size_t i = 0; i < 1000; i++)
p += [&x] { x++; };
Expand Down