1
+ // Reference : https://13abyknight.wordpress.com/2013/03/20/a-simple-thread-pool-c-implementation-on-linux/
2
+ #include < iostream>
3
+ #include < queue>
4
+ #include < pthread.h>
5
+ #include " ThreadPool.h"
6
+
7
+ using namespace std ;
8
+
9
+
10
+ // Compute a pseudorandom integer.
11
+ // Output value in range [0, 32767]
12
+ inline int fast_rand (void ) {
13
+ g_seed = (214013 *g_seed+2531011 );
14
+ return (g_seed>>16 )&0x43 ;
15
+ }
16
+
17
+
18
+ #include < random>
19
+ int thread_pool::getRandomNumber (){
20
+
21
+ return fast_rand ();
22
+
23
+ std::random_device rd; // obtain a random number from hardware
24
+ std::mt19937 eng (rd ()); // seed the generator
25
+ std::uniform_int_distribution<> distr (0 , numOfThreads-1 ); // define the range
26
+ return distr (eng);
27
+ }
28
+
29
+
30
+ // centralized job queue
31
+ deque<job*> cDequeue;
32
+ pthread_mutex_t cDequeue_lock;
33
+
34
+
35
+ bool stealon = false ;
36
+
37
+ void thread_pool::shareon (){
38
+ stealon = true ;
39
+ }
40
+
41
+ thread_pool::thread_pool (){
42
+ numOfThreads = 2 ;
43
+ for (int i = 0 ; i < numOfThreads; ++i)
44
+ {
45
+ threads.push_back (new worker_thread (i, this ));
46
+ }
47
+ }
48
+
49
+ thread_pool::thread_pool (int num){
50
+ numOfThreads = num;
51
+ for (int i = 0 ; i < numOfThreads; ++i)
52
+ {
53
+ threads.push_back (new worker_thread (i, this ));
54
+ }
55
+ }
56
+
57
+ void thread_pool::terminate (){
58
+ for (int i = 0 ; i < numOfThreads; ++i)
59
+ {
60
+ threads[i]->terminate ();
61
+ }
62
+ }
63
+
64
+ void worker_thread::terminate (){
65
+ pthread_cancel (thread);
66
+ }
67
+
68
+ void thread_pool::start ()
69
+ {
70
+ for (int i = 0 ; i < numOfThreads; i++)
71
+ {
72
+ threads[i]->start ();
73
+ }
74
+ }
75
+
76
+
77
+
78
+ void thread_pool::assignJob (job* _job_, int threadid)
79
+ {
80
+ // static int threadid = 0;
81
+ // some huerestic now trying round robin
82
+
83
+ // cout << "Assigning to " << thid << " from " << threadid << endl;
84
+ pthread_mutex_lock (&cDequeue_lock);
85
+ cDequeue.push_back (_job_);
86
+ pthread_mutex_unlock (&cDequeue_lock);
87
+
88
+ }
89
+
90
+ void thread_pool::assignJobMe (job* _job_, int tid){
91
+ threads[tid]->assignJob (_job_);
92
+ }
93
+
94
+
95
+ bool thread_pool::empty (){
96
+ for (int i = 0 ; i < numOfThreads; ++i)
97
+ {
98
+ cout << " Checking for " << i<< endl;
99
+ pthread_mutex_lock (&threads[i]->jobDequeue_lock );
100
+ if (!threads[i]->jobDequeue .empty ())
101
+ {
102
+ // pthread_mutex_unlock(&threads[i]->jobDequeue_lock);
103
+ cout << " Queue " << i << " is not empty\n " ;
104
+ // return false;
105
+ }
106
+ pthread_mutex_unlock (&threads[i]->jobDequeue_lock );
107
+ }
108
+ return true ;
109
+ }
110
+
111
+ job* thread_pool::StealTask (worker_thread* p, int mytid){
112
+
113
+ if (!stealon)
114
+ {
115
+ return NULL ;
116
+ }
117
+ // Implement your steal algo
118
+ // dont look for own queue here else u will get a deadlock
119
+
120
+ int i = getRandomNumber ();
121
+
122
+
123
+
124
+ // for (int i = 0; i < numOfThreads; ++i)
125
+ // {
126
+ if (i == mytid)
127
+ {
128
+ // cout << "Do check for " << i << endl;
129
+ return NULL ;
130
+ }
131
+
132
+ if (!threads[i]->jobDequeue .empty ())
133
+ {
134
+ // cout << "--------------Thread " << p->tid << " trying to steal Task from------" << i << endl;
135
+ pthread_mutex_lock (&threads[i]->jobDequeue_lock );
136
+ job* j = NULL ;
137
+ if (!threads[i]->jobDequeue .empty ())
138
+ {
139
+ j = threads[i]->jobDequeue .back ();
140
+ threads[i]->jobDequeue .pop_back ();
141
+ }
142
+ pthread_mutex_unlock (&threads[i]->jobDequeue_lock );
143
+ // cout << "--------------Thread " << p->tid << " steal Task from " << i << " success------ jobid " << j->jobID << endl;
144
+ return j;
145
+ }
146
+ // pthread_mutex_unlock(&threads[i]->jobDequeue_lock);
147
+ // }
148
+ return NULL ;
149
+ }
150
+
151
+ thread_pool::thread_pool (const thread_pool &tp){
152
+
153
+ cout << " Copy constr" ;
154
+ for (int i = 0 ; i < tp.threads .size (); ++i)
155
+ {
156
+ threads.push_back (tp.threads [i]);
157
+ }
158
+ numOfThreads = tp.numOfThreads ;
159
+ }
160
+
161
+
162
+ worker_thread::worker_thread (int _tid, thread_pool* _parentPool){
163
+ // :tid(_tid), parentPool(_parentPool)
164
+
165
+ tid = _tid;
166
+ parentPool = _parentPool;
167
+
168
+ // thread = new pthread_t[1];
169
+ // jobDequeue_lock = PTHREAD_MUTEX_INITIALIZER;
170
+ // jobDequeue_cond = PTHREAD_COND_INITIALIZER;
171
+ }
172
+
173
+ worker_thread::~worker_thread (){
174
+ // delete thread;
175
+ // thread = NULL;
176
+ }
177
+
178
+ void worker_thread::assignJob (job *_job_){
179
+ pthread_mutex_lock (&jobDequeue_lock);
180
+ jobDequeue.push_back (_job_);
181
+ // cout << "Assigned to " << tid << endl;
182
+ pthread_mutex_unlock (&jobDequeue_lock);
183
+ // pthread_cond_signal(&jobDequeue_cond);
184
+ }
185
+
186
+ bool worker_thread::loadJob (job*& _job_, worker_thread* p)
187
+ {
188
+ pthread_mutex_lock (&cDequeue_lock);
189
+
190
+ // while(jobDequeue.empty() && (_job_ = StealTask(p)) && !_job_){
191
+ // pthread_cond_wait(&jobDequeue_cond, &jobDequeue_lock);
192
+ // }
193
+
194
+ // while(jobDequeue.empty())
195
+ // pthread_cond_wait(&jobDequeue_cond, &jobDequeue_lock);
196
+
197
+ // first look for own job queue
198
+ if (!cDequeue.empty ())
199
+ {
200
+ _job_ = cDequeue.front ();
201
+ cDequeue.pop_front ();
202
+ pthread_mutex_unlock (&cDequeue_lock);
203
+ return true ;
204
+ }
205
+
206
+ // if((_job_ = StealTask(p)) && _job_){
207
+ // pthread_mutex_unlock(&jobDequeue_lock);
208
+ // return true;
209
+ // }
210
+
211
+ // if own job queue is empty look for other threads job queue and extract from back
212
+ // _job_ = StealTask(p);
213
+ pthread_mutex_unlock (&cDequeue_lock);
214
+ return true ;
215
+ }
216
+
217
+ void worker_thread::start (){
218
+
219
+ pthread_create (&thread, NULL , &worker_thread::threadExecute, (void *)this );
220
+ // cout << "Thread:" << tid << " is alive now!\n";
221
+
222
+ }
223
+
224
+ void *worker_thread::threadExecute (void *param)
225
+ {
226
+ worker_thread *p = (worker_thread *)param;
227
+ job *oneJob = NULL ;
228
+ while (p->loadJob (oneJob, p))
229
+ {
230
+ if (oneJob)
231
+ oneJob->working ((void *)&p->tid );
232
+ // delete oneJob;
233
+ oneJob = NULL ;
234
+ }
235
+ return NULL ;
236
+ }
237
+
238
+ job* worker_thread::StealTask (worker_thread* p){
239
+ return parentPool->StealTask (p, tid);
240
+ }
0 commit comments