-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathtbbpool.cpp
349 lines (303 loc) · 9.98 KB
/
tbbpool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
/*
Implement parallel vectorize workqueue on top of Intel TBB.
*/
#define TBB_PREVIEW_WAITING_FOR_WORKERS 1
/* tbb.h redefines these */
#include "../../_pymodule.h"
#ifdef _POSIX_C_SOURCE
#undef _POSIX_C_SOURCE
#endif
#ifdef _XOPEN_SOURCE
#undef _XOPEN_SOURCE
#endif
#include <tbb/version.h>
#include <tbb/tbb.h>
#include <string.h>
#include <stdio.h>
#include <thread>
#include "workqueue.h"
#include "gufunc_scheduler.h"
/* TBB 2021.6 is the minimum version */
#if (TBB_INTERFACE_VERSION < 12060)
#error "TBB version is incompatible, 2021.6 or greater required, i.e. TBB_INTERFACE_VERSION >= 12060"
#endif
#define _DEBUG 0
#define _TRACE_SPLIT 0
static tbb::task_group *tg = NULL;
static tbb::task_scheduler_handle tsh;
static bool tsh_was_initialized = false;
#ifdef _MSC_VER
#define THREAD_LOCAL(ty) __declspec(thread) ty
#else
/* Non-standard C99 extension that's understood by gcc and clang */
#define THREAD_LOCAL(ty) __thread ty
#endif
// This is the number of threads that is default, it is set on initialisation of
// the threading backend via the launch_threads() call
static int _INIT_NUM_THREADS = -1;
// This is the per-thread thread mask, each thread can carry its own mask.
static THREAD_LOCAL(int) _TLS_num_threads = 0;
static void
set_num_threads(int count)
{
_TLS_num_threads = count;
}
static int
get_num_threads(void)
{
if (_TLS_num_threads == 0)
{
// This is a thread that did not call launch_threads() but is still a
// "main" thread, probably from e.g. threading.Thread() use, it still
// has a TLS slot which is 0 from the lack of launch_threads() call
_TLS_num_threads = _INIT_NUM_THREADS;
}
return _TLS_num_threads;
}
static int
get_thread_id(void)
{
int tid = tbb::this_task_arena::current_thread_index();
// This function may be called from pure python or from a sequential JIT
// region, in either case, the task_arena may not be initialised. This
// condition is intercepted and a thread ID of 0 is returned.
if (tid == tbb::task_arena::not_initialized)
{
return 0;
} else {
return tid;
}
}
// watch the arena, if it decides to create more threads/add threads into the
// arena then make sure they get the right thread count
class fix_tls_observer: public tbb::task_scheduler_observer {
int mask_val;
void on_scheduler_entry( bool is_worker ) override;
public:
fix_tls_observer(tbb::task_arena &arena, int mask) : tbb::task_scheduler_observer(arena), mask_val(mask)
{
observe(true);
}
};
void fix_tls_observer::on_scheduler_entry(bool worker) {
set_num_threads(mask_val);
}
static void
add_task(void *fn, void *args, void *dims, void *steps, void *data)
{
tg->run([=]
{
auto func = reinterpret_cast<void (*)(void *args, void *dims, void *steps, void *data)>(fn);
func(args, dims, steps, data);
});
}
static void
parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *data,
size_t inner_ndim, size_t array_count, int num_threads)
{
static bool printed = false;
if(!printed && _DEBUG)
{
puts("Using parallel_for");
printed = true;
}
// args = <ir.Argument '.1' of type i8**>,
// dimensions = <ir.Argument '.2' of type i64*>
// steps = <ir.Argument '.3' of type i64*>
// data = <ir.Argument '.4' of type i8*>
const size_t arg_len = (inner_ndim + 1);
if(_DEBUG && _TRACE_SPLIT)
{
printf("inner_ndim: %lu\n",inner_ndim);
printf("arg_len: %lu\n", arg_len);
printf("total: %lu\n", dimensions[0]);
printf("dimensions: ");
for(size_t j = 0; j < arg_len; j++)
printf("%lu, ", ((size_t *)dimensions)[j]);
printf("\nsteps: ");
for(size_t j = 0; j < array_count; j++)
printf("%lu, ", steps[j]);
printf("\n*args: ");
for(size_t j = 0; j < array_count; j++)
printf("%p, ", (void *)args[j]);
printf("\n");
}
// This is making the assumption that the calling thread knows the truth
// about num_threads, which should be correct via the following:
// program starts/reinits and the threadpool launches, num_threads TLS is
// set as default. Any thread spawned on init making a call to this function
// will have a valid num_threads TLS slot and so the task_arena is sized
// appropriately and it's value is used in the observer that fixes the TLS
// slots of any subsequent threads joining the task_arena. This leads to
// all threads in a task_arena having valid num_threads TLS slots prior to
// doing any work. Any further call to query the TLS slot value made by any
// thread in the arena is then safe and were any thread to create a nested
// parallel region the same logic applies as per program start/reinit.
tbb::task_arena limited(num_threads);
fix_tls_observer observer(limited, num_threads);
limited.execute([&]{
using range_t = tbb::blocked_range<size_t>;
tbb::parallel_for(range_t(0, dimensions[0]), [=](const range_t &range)
{
size_t * count_space = (size_t *)alloca(sizeof(size_t) * arg_len);
char ** array_arg_space = (char**)alloca(sizeof(char*) * array_count);
memcpy(count_space, dimensions, arg_len * sizeof(size_t));
count_space[0] = range.size();
if(_DEBUG && _TRACE_SPLIT > 1)
{
printf("THREAD %p:", count_space);
printf("count_space: ");
for(size_t j = 0; j < arg_len; j++)
printf("%lu, ", count_space[j]);
printf("\n");
}
for(size_t j = 0; j < array_count; j++)
{
char * base = args[j];
size_t step = steps[j];
ptrdiff_t offset = step * range.begin();
array_arg_space[j] = base + offset;
if(_DEBUG && _TRACE_SPLIT > 2)
{
printf("Index %ld\n", j);
printf("-->Got base %p\n", (void *)base);
printf("-->Got step %lu\n", step);
printf("-->Got offset %ld\n", offset);
printf("-->Got addr %p\n", (void *)array_arg_space[j]);
}
}
if(_DEBUG && _TRACE_SPLIT > 2)
{
printf("array_arg_space: ");
for(size_t j = 0; j < array_count; j++)
printf("%p, ", (void *)array_arg_space[j]);
printf("\n");
}
auto func = reinterpret_cast<void (*)(char **args, size_t *dims, size_t *steps, void *data)>(fn);
func(array_arg_space, count_space, steps, data);
});
});
}
static std::thread::id init_thread_id;
static THREAD_LOCAL(bool) need_reinit_after_fork = false;
static void set_main_thread()
{
init_thread_id = std::this_thread::get_id();
}
static bool is_main_thread()
{
return std::this_thread::get_id() == init_thread_id;
}
static void prepare_fork(void)
{
if(_DEBUG)
{
puts("Suspending TBB: prepare fork");
}
if (tsh_was_initialized)
{
if(is_main_thread())
{
if (!tbb::finalize(tsh, std::nothrow))
{
tsh.release();
puts("Unable to join threads to shut down before fork(). "
"This can break multithreading in child processes.\n");
}
tsh_was_initialized = false;
need_reinit_after_fork = true;
}
else
{
fprintf(stderr, "Numba: Attempted to fork from a non-main thread, "
"the TBB library may be in an invalid state in the "
"child process.\n");
}
}
}
static void reset_after_fork(void)
{
if(_DEBUG)
{
puts("Resuming TBB: after fork");
}
if(need_reinit_after_fork)
{
tsh = tbb::attach();
set_main_thread();
tsh_was_initialized = true;
need_reinit_after_fork = false;
}
}
static void unload_tbb(void)
{
if (tg)
{
if(_DEBUG)
{
puts("Unloading TBB");
}
tg->wait();
delete tg;
tg = NULL;
}
if (tsh_was_initialized)
{
// blocking terminate is not strictly required here, ignore return value
(void)tbb::finalize(tsh, std::nothrow);
tsh_was_initialized = false;
}
}
static void launch_threads(int count)
{
if(tg)
return;
if(_DEBUG)
puts("Using TBB");
if(count < 1)
count = tbb::task_arena::automatic;
tsh = tbb::attach();
tsh_was_initialized = true;
tg = new tbb::task_group;
tg->run([] {}); // start creating threads asynchronously
_INIT_NUM_THREADS = count;
set_main_thread();
#ifndef _MSC_VER
pthread_atfork(prepare_fork, reset_after_fork, reset_after_fork);
#endif
}
static void synchronize(void)
{
tg->wait();
}
static void ready(void)
{
}
MOD_INIT(tbbpool)
{
PyObject *m;
MOD_DEF(m, "tbbpool", "No docs", NULL)
if (m == NULL)
return MOD_ERROR_VAL;
PyModuleDef *md = PyModule_GetDef(m);
if (md)
{
md->m_free = (freefunc)unload_tbb;
}
SetAttrStringFromVoidPointer(m, launch_threads);
SetAttrStringFromVoidPointer(m, synchronize);
SetAttrStringFromVoidPointer(m, ready);
SetAttrStringFromVoidPointer(m, add_task);
SetAttrStringFromVoidPointer(m, parallel_for);
SetAttrStringFromVoidPointer(m, do_scheduling_signed);
SetAttrStringFromVoidPointer(m, do_scheduling_unsigned);
SetAttrStringFromVoidPointer(m, set_num_threads);
SetAttrStringFromVoidPointer(m, get_num_threads);
SetAttrStringFromVoidPointer(m, get_thread_id);
SetAttrStringFromVoidPointer(m, set_parallel_chunksize);
SetAttrStringFromVoidPointer(m, get_parallel_chunksize);
SetAttrStringFromVoidPointer(m, get_sched_size);
SetAttrStringFromVoidPointer(m, allocate_sched);
SetAttrStringFromVoidPointer(m, deallocate_sched);
return MOD_SUCCESS_VAL(m);
}