-
Notifications
You must be signed in to change notification settings - Fork 4
/
ThreadPool.cc
84 lines (69 loc) · 1.66 KB
/
ThreadPool.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include "ThreadPool.h"
#include <stdexcept>
#include <signal.h>
#include <unistd.h>
ThreadPool::ThreadPool(size_t threads) : mCancelling(false) {
Lock lock(mCond);
if (threads == 0)
threads = systemCPUs();
mThreads.resize(threads);
for (size_t i = 0; i < threads; ++i) {
mThreads.push_back(ThreadInfo(this, i));
ThreadInfo& info = mThreads.back();
pthread_create(&info.pthread, 0, &threadFunc, &info);
}
}
void *ThreadPool::threadFunc(void *val) {
// Don't run signal handlers on worker threads
sigset_t allsig;
sigfillset(&allsig);
pthread_sigmask(SIG_BLOCK, &allsig, NULL);
ThreadInfo *info = reinterpret_cast<ThreadInfo*>(val);
while (true) {
Job *job = info->pool->nextJob();
try {
if (!job)
pthread_exit(0); // we're being cancelled
(*job)();
} catch (...) {
if (job)
delete job;
throw;
}
delete job;
}
}
ThreadPool::~ThreadPool() {
{
Lock lock(mCond);
mCancelling = true;
// delete any jobs left in the queue
while (!mJobs.empty()) {
delete mJobs.front();
mJobs.pop();
}
for (size_t i = 0; i < mThreads.size(); ++i)
mJobs.push(0); // request cancellation
mCond.broadcast();
}
for (ThreadList::iterator i = mThreads.begin(); i < mThreads.end(); ++i)
pthread_join(i->pthread, 0);
}
size_t ThreadPool::systemCPUs() const {
return sysconf(_SC_NPROCESSORS_ONLN);
}
ThreadPool::Job* ThreadPool::nextJob() {
Lock lock(mCond);
while (mJobs.empty())
mCond.wait();
Job *job = mJobs.front();
mJobs.pop();
return job;
}
void ThreadPool::enqueue(Job *job) {
Lock lock(mCond);
if (mCancelling)
throw std::runtime_error("Can't add jobs while cancelling");
mJobs.push(job);
mCond.signal();
}