Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fixed issue #11. also fixed a lot of other possible race conditions. …
…also changed member variable naming style.

Signed-off-by: Tolga HOŞGÖR <fasdfasdas@gmail.com>
  • Loading branch information
tghosgor committed Jun 8, 2015
1 parent 9d47e49 commit 46c9afc
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 72 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Expand Up @@ -22,9 +22,14 @@ set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Release" "Debug" "Perf")

if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU" OR "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_EXE_LINK_FLAGS_RELEASE} -O2")

set(CMAKE_CXX_FLAGS_PERF "${CMAKE_CXX_FLAGS_RELEASE} -Wno-inline -pg")

set(CMAKE_STATIC_LINKER_FLAGS_PERF "${CMAKE_LINKER_FLAGS_RELEASE} -pg")

set(CMAKE_EXE_LINK_FLAGS "${CMAKE_EXE_LINK_FLAGS} -pthread")
set(CMAKE_EXE_LINK_FLAGS_RELEASE "${CMAKE_EXE_LINK_FLAGS_RELEASE} -pthread -O2")
elseif(MSVC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_RELEASE} /GL")
set(CMAKE_STATIC_LINKER_FLAGS "${CMAKE_STATIC_LINKER_FLAGS} /LTCG")
Expand Down
59 changes: 36 additions & 23 deletions threadpool11/include/threadpool11/pool.hpp
Expand Up @@ -62,11 +62,11 @@ class Pool {
enum class Method { SYNC, ASYNC };

public:
threadpool11_EXPORT Pool(std::size_t m_workerCount = std::thread::hardware_concurrency());
threadpool11_EXPORT Pool(std::size_t worker_count = std::thread::hardware_concurrency());
~Pool();

/*!
* Posts a work to the pool for getting processed.
* \brief postWork Posts a work to the pool for getting processed.
*
* If there are no threads left (i.e. you called Pool::joinAll(); prior to
* this function) all the works you post gets enqueued. If you spawn new threads in
Expand All @@ -79,12 +79,16 @@ class Pool {
// TODO: convert 'type' above to const& when MSVC fixes that bug.

/**
* \brief waitAll Blocks the calling thread until all posted works are finished.
*
* This function suspends the calling thread until all posted works are finished and, therefore, all worker
* threads are free. It guarantees you that before the function returns, all queued works are finished.
*/
threadpool11_EXPORT void waitAll();

/*!
* \brief joinAll Joins the worker threads.
*
* This function joins all the threads in the thread pool as fast as possible.
* All the posted works are NOT GUARANTEED to be finished before the worker threads
* are destroyed and this function returns.
Expand All @@ -97,7 +101,7 @@ class Pool {
threadpool11_EXPORT void joinAll();

/*!
* \brief Pool::getWorkerCount
* \brief getWorkerCount
*
* Properties: thread-safe.
*
Expand Down Expand Up @@ -131,27 +135,36 @@ class Pool {
*/
threadpool11_EXPORT size_t getWorkQueueSize() const;

/**
* This function requires a mutex lock so you should call it wisely if you performance is a life matter to
* you.
*/
/*!
* \brief getActiveWorkerCount Gets the number of active workers when the function is called.
*
* The information this function returns does not really mean much. The worker may be starting to execute a work from queue,
* it may be executing a work or it may have just executed a work.
*
* \return The number of active workers.
*/
threadpool11_EXPORT size_t getActiveWorkerCount() const;

/**
* This function requires a mutex lock so you should call it wisely if you performance is a life matter to
* you.
* \brief getInactiveWorkerCount Gets the number of the inactive worker count.
*
* The information this function returns does not really mean much. The worker may be starting to execute a work from queue,
* it may be executing a work or it may have just executed a work.
*
* \return The number of active workers.
*/
threadpool11_EXPORT size_t getInactiveWorkerCount() const;

/*!
* Increases the number of threads in the pool by n.
* \brief incWorkerCountBy Increases the number of threads in the pool by n.
*
* Properties: thread-safe.
*/
threadpool11_EXPORT void incWorkerCountBy(std::size_t n);

/*!
* Tries to decrease the number of threads in the pool by n.
* \brief decWorkerCountBy Tries to decrease the number of threads in the pool by n.
*
* Setting 'n' higher than the number of workers has no effect.
* Calling without arguments asynchronously terminates all workers.
*
Expand Down Expand Up @@ -192,22 +205,22 @@ class Pool {
void push(Work::Callable* workFunc);

private:
std::atomic<size_t> m_workerCount;
std::atomic<size_t> m_activeWorkerCount;
std::atomic<size_t> worker_count_;
std::atomic<size_t> active_worker_count_;

mutable std::mutex notify_all_finished_signal_mtx;
std::condition_variable m_notifyAllFinishedSignal;
bool m_areAllReallyFinished;
mutable std::mutex notify_all_finished_mutex_;
std::condition_variable notify_all_finished_signal_;
std::atomic<bool> are_all_really_finished_;

mutable std::mutex m_workSignalMutex;
mutable std::mutex work_signal_mutex_;
// bool work_signal_prot; //! wake up protection // <- work_queue_size is used instead of this
std::condition_variable m_workSignal;
std::condition_variable work_signal_;

std::unique_ptr<
boost::lockfree::
queue<Work::Callable*, boost::parameter::void_, boost::parameter::void_, boost::parameter::void_>>
m_workQueue;
std::atomic<size_t> m_workQueueSize;
work_queue_;
std::atomic<size_t> work_queue_size_;
};

template <typename T>
Expand All @@ -222,7 +235,7 @@ threadpool11_EXPORT inline std::future<T> Pool::postWork(std::function<T()> call
auto move_promise = make_move_on_copy(std::move(promise));

auto workFunc = new Work::Callable([move_callable, move_promise, type]() mutable {
move_promise.Value().set_value((move_callable.Value())());
move_promise.value().set_value((move_callable.value())());
return type;
});

Expand All @@ -243,8 +256,8 @@ threadpool11_EXPORT inline std::future<void> Pool::postWork(std::function<void()
auto move_promise = make_move_on_copy(std::move(promise));

auto workFunc = new Work::Callable([move_callable, move_promise, type]() mutable {
(move_callable.Value())();
move_promise.Value().set_value();
(move_callable.value())();
move_promise.value().set_value();
return type;
});

Expand Down
10 changes: 5 additions & 5 deletions threadpool11/include/threadpool11/utility.hpp
Expand Up @@ -35,18 +35,18 @@ template <typename T>
class move_on_copy {
public:
move_on_copy(T&& aValue)
: value(std::move(aValue)) {}
: value_(std::move(aValue)) {}
move_on_copy(const move_on_copy& other)
: value(std::move(other.value)) {}
: value_(std::move(other.value_)) {}

move_on_copy& operator=(move_on_copy&& aValue) = delete; // not needed here
move_on_copy& operator=(const move_on_copy& aValue) = delete; // not needed here

T& Value() { return value; }
const T& Value() const { return value; }
T& value() { return value_; }
const T& value() const { return value_; }

private:
mutable T value;
mutable T value_;
};

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion threadpool11/include/threadpool11/worker.hpp
Expand Up @@ -51,6 +51,6 @@ class Worker {
/*!
* This should always stay at bottom so that it is called at the most end.
*/
std::thread thread;
std::thread thread_;
};
}
41 changes: 21 additions & 20 deletions threadpool11/src/pool.cpp
Expand Up @@ -27,26 +27,25 @@ This file is part of threadpool11.
namespace threadpool11 {

Pool::Pool(std::size_t worker_count)
: m_workerCount(0)
, m_activeWorkerCount(0)
, m_workQueue(new boost::lockfree::queue<Work::Callable*>(0))
, m_workQueueSize(0) {
: worker_count_(0)
, active_worker_count_(0)
, are_all_really_finished_{true}
, work_queue_(new boost::lockfree::queue<Work::Callable*>(0))
, work_queue_size_(0) {
spawnWorkers(worker_count);
}

Pool::~Pool() { joinAll(); }

void Pool::waitAll() {
std::unique_lock<std::mutex> lock(notify_all_finished_signal_mtx);
if (m_activeWorkerCount > 0) {
m_notifyAllFinishedSignal.wait(lock, [this]() { return m_areAllReallyFinished; });
m_areAllReallyFinished = false;
}
std::unique_lock<std::mutex> notify_all_finished_lock(notify_all_finished_mutex_);

notify_all_finished_signal_.wait(notify_all_finished_lock, [this]() { return are_all_really_finished_.load(); });
}

void Pool::joinAll() { decWorkerCountBy(std::numeric_limits<size_t>::max(), Method::SYNC); }

size_t Pool::getWorkerCount() const { return m_workerCount.load(); }
size_t Pool::getWorkerCount() const { return worker_count_.load(); }

void Pool::setWorkerCount(std::size_t n, Method method) {
if (getWorkerCount() < n)
Expand All @@ -55,11 +54,11 @@ void Pool::setWorkerCount(std::size_t n, Method method) {
decWorkerCountBy(getWorkerCount() - n, method);
}

size_t Pool::getWorkQueueSize() const { return m_workQueueSize.load(); }
size_t Pool::getWorkQueueSize() const { return work_queue_size_.load(); }

size_t Pool::getActiveWorkerCount() const { return m_activeWorkerCount.load(); }
size_t Pool::getActiveWorkerCount() const { return active_worker_count_.load(); }

size_t Pool::getInactiveWorkerCount() const { return m_workerCount.load() - m_activeWorkerCount.load(); }
size_t Pool::getInactiveWorkerCount() const { return worker_count_.load() - active_worker_count_.load(); }

void Pool::incWorkerCountBy(std::size_t n) { spawnWorkers(n); }

Expand All @@ -80,18 +79,20 @@ void Pool::decWorkerCountBy(size_t n, Method method) {

void Pool::spawnWorkers(std::size_t n) {
//'OR' makes sure the case where one of the expressions is zero, is valid.
assert(static_cast<size_t>(m_workerCount + n) > n ||
static_cast<size_t>(m_workerCount + n) > m_workerCount);
assert(static_cast<size_t>(worker_count_ + n) > n ||
static_cast<size_t>(worker_count_ + n) > worker_count_);
while (n-- > 0) {
new Worker(*this); //! Worker class takes care of its de-allocation itself after here
++m_workerCount;
++worker_count_;
}
}

void Pool::push(Work::Callable* workFunc) {
std::unique_lock<std::mutex> workSignalLock(m_workSignalMutex);
++m_workQueueSize;
m_workQueue->push(workFunc);
m_workSignal.notify_one();
are_all_really_finished_ = false;

std::unique_lock<std::mutex> work_signal_lock(work_signal_mutex_);
++work_queue_size_;
work_queue_->push(workFunc);
work_signal_.notify_one();
}
}
58 changes: 35 additions & 23 deletions threadpool11/src/worker.cpp
Expand Up @@ -26,40 +26,52 @@ This file is part of threadpool11.
namespace threadpool11 {

Worker::Worker(Pool& pool)
: thread(std::bind(&Worker::execute, this, std::ref(pool))) {
: thread_(std::bind(&Worker::execute, this, std::ref(pool))) {
// std::cout << std::this_thread::get_id() << " Worker created" << std::endl;
thread.detach();
thread_.detach();
}

void Worker::execute(Pool& pool) {
const std::unique_ptr<Worker> self(this); //! auto de-allocation when thread is terminated

while (true) {
Work::Callable* work_;
// std::cout << "\tThread " << std::this_thread::get_id() << " awaken." << std::endl;
while (pool.m_workQueue->pop(work_)) {
const std::unique_ptr<Work::Callable> work(work_);
--pool.m_workQueueSize;
++pool.m_activeWorkerCount;
// std::cout << "\tThread " << std::this_thread::get_id() << " worked." << std::endl;
if ((*work)() == Work::Type::TERMINAL) {
--pool.m_activeWorkerCount;
--pool.m_workerCount;
return;
}

--pool.m_activeWorkerCount;
Work::Callable* work_ptr;
Work::Type work_type = Work::Type::STD;

++pool.active_worker_count_;

while (work_type != Work::Type::TERMINAL
&& pool.work_queue_->pop(work_ptr)) {
--pool.work_queue_size_;

const std::unique_ptr<Work::Callable> work(work_ptr);

work_type = (*work)();
}

if (pool.m_activeWorkerCount == 0) {
std::unique_lock<std::mutex> notifyAllFinishedLock(pool.notify_all_finished_signal_mtx);
pool.m_areAllReallyFinished = true;
pool.m_notifyAllFinishedSignal.notify_all();
std::unique_lock<std::mutex> work_signal_lock(pool.work_signal_mutex_);

// work_queue_size is increased when work_signal_mutex locked
// active_worker_count_ is only decreased here so it is also when mutex is locked

// as far as I can see when there is a work, it is certain that either work_queue_size or active_worker counter
// is greater than zero so I don't see any race condition
if (pool.work_queue_size_ == 0 && --pool.active_worker_count_ == 0) {
std::unique_lock<std::mutex> notify_all_finished_lock(pool.notify_all_finished_mutex_);

pool.are_all_really_finished_ = true;
pool.notify_all_finished_signal_.notify_all();
}

// std::cout << "\tThread " << std::this_thread::get_id() << " will sleep." << std::endl;
std::unique_lock<std::mutex> workSignalLock(pool.m_workSignalMutex);
pool.m_workSignal.wait(workSignalLock, [&pool]() { return (pool.m_workQueueSize.load()); });
if (work_type == Work::Type::TERMINAL) {
--pool.worker_count_;

return;
}

// block here until signalled
pool.work_signal_.wait(work_signal_lock, [&pool]() { return (pool.work_queue_size_ > 0); });
}
}

}
41 changes: 41 additions & 0 deletions threadpool11_demo/src/main.cpp
Expand Up @@ -49,6 +49,18 @@ void test3Func() {
++test3Var;
}

std::size_t factorial(std::size_t i) {
i = std::max(1ul, i);

std::size_t res = i;

while (i > 2) {
res *= --i;
}

return res;
}

} // NS

int main(int argc, char* argv[]) {
Expand Down Expand Up @@ -195,6 +207,35 @@ int main(int argc, char* argv[]) {
std::cout << "Demo 4 took " << std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count()
<< " milliseconds." << std::endl << std::endl;
}

/**
* Demo #5
* For performance test purposes.
*/

const constexpr auto iter = 100ul;
std::array<std::size_t, iter> a;

{
pool.setWorkerCount(std::thread::hardware_concurrency());

std::array<std::future<std::size_t>, iter> futures;

const auto begin = std::chrono::high_resolution_clock::now();

for (auto i = 0u; i < iter; ++i) {
futures[i] = pool.postWork<std::size_t>([i]() { return factorial(i); });
}

for (auto i = 0u; i < iter; ++i) {
a[i] = futures[i].get();
}

const auto end = std::chrono::high_resolution_clock::now();
std::cout << "threadpool11 execution took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count()
<< " milliseconds." << std::endl << std::endl;
}

/**
* Test case for Issue #1 (fixed): Pool::postWork waiting forever, due to posting work before all threads in
Expand Down

0 comments on commit 46c9afc

Please sign in to comment.