-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathparallel.py
763 lines (610 loc) · 25.7 KB
/
parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
"""
This file implements the code-generator for parallel-vectorize.
ParallelUFunc is the platform independent base class for generating
the thread dispatcher. This thread dispatcher launches threads
that execute the generated function of UFuncCore.
UFuncCore is subclassed to specialize for the input/output types.
The actual workload is invoked inside the function generated by UFuncCore.
UFuncCore also defines a work-stealing mechanism that allows idle threads
to steal works from other threads.
"""
import os
import sys
import warnings
from threading import RLock as threadRLock
from ctypes import CFUNCTYPE, c_int, CDLL, POINTER, c_uint
import numpy as np
import llvmlite.binding as ll
from llvmlite import ir
from numba.np.numpy_support import as_dtype
from numba.core import types, cgutils, config, errors
from numba.core.typing import signature
from numba.np.ufunc.wrappers import _wrapper_info
from numba.np.ufunc import ufuncbuilder
from numba.extending import overload, intrinsic
_IS_OSX = sys.platform.startswith('darwin')
_IS_LINUX = sys.platform.startswith('linux')
_IS_WINDOWS = sys.platform.startswith('win32')
def get_thread_count():
"""
Gets the available thread count.
"""
t = config.NUMBA_NUM_THREADS
if t < 1:
raise ValueError("Number of threads specified must be > 0.")
return t
NUM_THREADS = get_thread_count()
def build_gufunc_kernel(library, ctx, info, sig, inner_ndim):
"""Wrap the original CPU ufunc/gufunc with a parallel dispatcher.
This function will wrap gufuncs and ufuncs something like.
Args
----
ctx
numba's codegen context
info: (library, env, name)
inner function info
sig
type signature of the gufunc
inner_ndim
inner dimension of the gufunc (this is len(sig.args) in the case of a
ufunc)
Returns
-------
wrapper_info : (library, env, name)
The info for the gufunc wrapper.
Details
-------
The kernel signature looks like this:
void kernel(char **args, npy_intp *dimensions, npy_intp* steps, void* data)
args - the input arrays + output arrays
dimensions - the dimensions of the arrays
steps - the step size for the array (this is like sizeof(type))
data - any additional data
The parallel backend then stages multiple calls to this kernel concurrently
across a number of threads. Practically, for each item of work, the backend
duplicates `dimensions` and adjusts the first entry to reflect the size of
the item of work, it also forms up an array of pointers into the args for
offsets to read/write from/to with respect to its position in the items of
work. This allows the same kernel to be used for each item of work, with
simply adjusted reads/writes/domain sizes and is safe by virtue of the
domain partitioning.
NOTE: The execution backend is passed the requested thread count, but it can
choose to ignore it (TBB)!
"""
assert isinstance(info, tuple) # guard against old usage
# Declare types and function
byte_t = ir.IntType(8)
byte_ptr_t = ir.PointerType(byte_t)
byte_ptr_ptr_t = ir.PointerType(byte_ptr_t)
intp_t = ctx.get_value_type(types.intp)
intp_ptr_t = ir.PointerType(intp_t)
fnty = ir.FunctionType(ir.VoidType(), [ir.PointerType(byte_ptr_t),
ir.PointerType(intp_t),
ir.PointerType(intp_t),
byte_ptr_t])
wrapperlib = ctx.codegen().create_library('parallelgufuncwrapper')
mod = wrapperlib.create_ir_module('parallel.gufunc.wrapper')
kernel_name = ".kernel.{}_{}".format(id(info.env), info.name)
lfunc = ir.Function(mod, fnty, name=kernel_name)
bb_entry = lfunc.append_basic_block('')
# Function body starts
builder = ir.IRBuilder(bb_entry)
args, dimensions, steps, data = lfunc.args
# Release the GIL (and ensure we have the GIL)
# Note: numpy ufunc may not always release the GIL; thus,
# we need to ensure we have the GIL.
pyapi = ctx.get_python_api(builder)
gil_state = pyapi.gil_ensure()
thread_state = pyapi.save_thread()
def as_void_ptr(arg):
return builder.bitcast(arg, byte_ptr_t)
# Array count depends on whether an "output" array is needed. In the case
# of a void return type cf. gufunc it is the number of args, in the case of
# a non-void return type cf. ufunc it is the number of args + 1 so as to
# account for the output array.
array_count = len(sig.args)
if not isinstance(sig.return_type, types.NoneType):
array_count += 1
parallel_for_ty = ir.FunctionType(ir.VoidType(),
[byte_ptr_t] * 5 + [intp_t, ] * 3)
parallel_for = cgutils.get_or_insert_function(mod, parallel_for_ty,
'numba_parallel_for')
# Reference inner-function and link
innerfunc_fnty = ir.FunctionType(
ir.VoidType(),
[byte_ptr_ptr_t, intp_ptr_t, intp_ptr_t, byte_ptr_t],
)
tmp_voidptr = cgutils.get_or_insert_function(mod, innerfunc_fnty,
info.name,)
wrapperlib.add_linking_library(info.library)
get_num_threads = cgutils.get_or_insert_function(
builder.module,
ir.FunctionType(ir.IntType(types.intp.bitwidth), []),
"get_num_threads")
num_threads = builder.call(get_num_threads, [])
# Prepare call
fnptr = builder.bitcast(tmp_voidptr, byte_ptr_t)
innerargs = [as_void_ptr(x) for x
in [args, dimensions, steps, data]]
builder.call(parallel_for, [fnptr] + innerargs +
[intp_t(x) for x in (inner_ndim, array_count)] + [num_threads])
# Release the GIL
pyapi.restore_thread(thread_state)
pyapi.gil_release(gil_state)
builder.ret_void()
wrapperlib.add_ir_module(mod)
wrapperlib.add_linking_library(library)
return _wrapper_info(library=wrapperlib, name=lfunc.name, env=info.env)
# ------------------------------------------------------------------------------
class ParallelUFuncBuilder(ufuncbuilder.UFuncBuilder):
def build(self, cres, sig):
_launch_threads()
# Builder wrapper for ufunc entry point
ctx = cres.target_context
signature = cres.signature
library = cres.library
fname = cres.fndesc.llvm_func_name
info = build_ufunc_wrapper(library, ctx, fname, signature, cres)
ptr = info.library.get_pointer_to_function(info.name)
# Get dtypes
dtypenums = [np.dtype(a.name).num for a in signature.args]
dtypenums.append(np.dtype(signature.return_type.name).num)
keepalive = ()
return dtypenums, ptr, keepalive
def build_ufunc_wrapper(library, ctx, fname, signature, cres):
innerfunc = ufuncbuilder.build_ufunc_wrapper(library, ctx, fname,
signature, objmode=False,
cres=cres)
info = build_gufunc_kernel(library, ctx, innerfunc, signature,
len(signature.args))
return info
# ---------------------------------------------------------------------------
class ParallelGUFuncBuilder(ufuncbuilder.GUFuncBuilder):
def __init__(self, py_func, signature, identity=None, cache=False,
targetoptions=None, writable_args=()):
# Force nopython mode
if targetoptions is None:
targetoptions = {}
targetoptions.update(dict(nopython=True))
super(
ParallelGUFuncBuilder,
self).__init__(
py_func=py_func,
signature=signature,
identity=identity,
cache=cache,
targetoptions=targetoptions,
writable_args=writable_args)
def build(self, cres):
"""
Returns (dtype numbers, function ptr, EnvironmentObject)
"""
_launch_threads()
# Build wrapper for ufunc entry point
info = build_gufunc_wrapper(
self.py_func, cres, self.sin, self.sout, cache=self.cache,
is_parfors=False,
)
ptr = info.library.get_pointer_to_function(info.name)
env = info.env
# Get dtypes
dtypenums = []
for a in cres.signature.args:
if isinstance(a, types.Array):
ty = a.dtype
else:
ty = a
dtypenums.append(as_dtype(ty).num)
return dtypenums, ptr, env
# This is not a member of the ParallelGUFuncBuilder function because it is
# called without an enclosing instance from parfors
def build_gufunc_wrapper(py_func, cres, sin, sout, cache, is_parfors):
"""Build gufunc wrapper for the given arguments.
The *is_parfors* is a boolean indicating whether the gufunc is being
built for use as a ParFors kernel. This changes codegen and caching
behavior.
"""
library = cres.library
ctx = cres.target_context
signature = cres.signature
innerinfo = ufuncbuilder.build_gufunc_wrapper(
py_func, cres, sin, sout, cache=cache, is_parfors=is_parfors,
)
sym_in = set(sym for term in sin for sym in term)
sym_out = set(sym for term in sout for sym in term)
inner_ndim = len(sym_in | sym_out)
info = build_gufunc_kernel(
library, ctx, innerinfo, signature, inner_ndim,
)
return info
# ---------------------------------------------------------------------------
_backend_init_thread_lock = threadRLock()
_windows = sys.platform.startswith('win32')
class _nop(object):
"""A no-op contextmanager
"""
def __enter__(self):
pass
def __exit__(self, *args):
pass
_backend_init_process_lock = None
def _set_init_process_lock():
global _backend_init_process_lock
try:
# Force the use of an RLock in the case a fork was used to start the
# process and thereby the init sequence, some of the threading backend
# init sequences are not fork safe. Also, windows global mp locks seem
# to be fine.
with _backend_init_thread_lock: # protect part-initialized module access
import multiprocessing
if "fork" in multiprocessing.get_start_method() or _windows:
ctx = multiprocessing.get_context()
_backend_init_process_lock = ctx.RLock()
else:
_backend_init_process_lock = _nop()
except OSError as e:
# probably lack of /dev/shm for semaphore writes, warn the user
msg = (
"Could not obtain multiprocessing lock due to OS level error: %s\n"
"A likely cause of this problem is '/dev/shm' is missing or "
"read-only such that necessary semaphores cannot be written.\n"
"*** The responsibility of ensuring multiprocessing safe access to "
"this initialization sequence/module import is deferred to the "
"user! ***\n"
)
warnings.warn(msg % str(e), errors.NumbaSystemWarning)
_backend_init_process_lock = _nop()
_is_initialized = False
# this is set by _launch_threads
_threading_layer = None
def threading_layer():
"""
Get the name of the threading layer in use for parallel CPU targets
"""
if _threading_layer is None:
raise ValueError("Threading layer is not initialized.")
else:
return _threading_layer
def _check_tbb_version_compatible():
"""
Checks that if TBB is present it is of a compatible version.
"""
try:
# first check that the TBB version is new enough
if _IS_WINDOWS:
libtbb_name = 'tbb12.dll'
elif _IS_OSX:
libtbb_name = 'libtbb.12.dylib'
elif _IS_LINUX:
libtbb_name = 'libtbb.so.12'
else:
raise ValueError("Unknown operating system")
libtbb = CDLL(libtbb_name)
version_func = libtbb.TBB_runtime_interface_version
version_func.argtypes = []
version_func.restype = c_int
tbb_iface_ver = version_func()
if tbb_iface_ver < 12060: # magic number from TBB
msg = ("The TBB threading layer requires TBB "
"version 2021 update 6 or later i.e., "
"TBB_INTERFACE_VERSION >= 12060. Found "
"TBB_INTERFACE_VERSION = %s. The TBB "
"threading layer is disabled.") % tbb_iface_ver
problem = errors.NumbaWarning(msg)
warnings.warn(problem)
raise ImportError("Problem with TBB. Reason: %s" % msg)
except (ValueError, OSError) as e:
# Translate as an ImportError for consistent error class use, this error
# will never materialise
raise ImportError("Problem with TBB. Reason: %s" % e)
def _launch_threads():
if not _backend_init_process_lock:
_set_init_process_lock()
with _backend_init_process_lock:
with _backend_init_thread_lock:
global _is_initialized
if _is_initialized:
return
def select_known_backend(backend):
"""
Loads a specific threading layer backend based on string
"""
lib = None
if backend.startswith("tbb"):
try:
# check if TBB is present and compatible
_check_tbb_version_compatible()
# now try and load the backend
from numba.np.ufunc import tbbpool as lib
except ImportError:
pass
elif backend.startswith("omp"):
# TODO: Check that if MKL is present that it is a version
# that understands GNU OMP might be present
try:
from numba.np.ufunc import omppool as lib
except ImportError:
pass
elif backend.startswith("workqueue"):
from numba.np.ufunc import workqueue as lib
else:
msg = "Unknown value specified for threading layer: %s"
raise ValueError(msg % backend)
return lib
def select_from_backends(backends):
"""
Selects from presented backends and returns the first working
"""
lib = None
for backend in backends:
lib = select_known_backend(backend)
if lib is not None:
break
else:
backend = ''
return lib, backend
t = str(config.THREADING_LAYER).lower()
namedbackends = config.THREADING_LAYER_PRIORITY
if not (len(namedbackends) == 3 and
set(namedbackends) == {'tbb', 'omp', 'workqueue'}):
raise ValueError(
"THREADING_LAYER_PRIORITY invalid: %s. "
"It must be a permutation of "
"{'tbb', 'omp', 'workqueue'}"
% namedbackends
)
lib = None
err_helpers = dict()
err_helpers['TBB'] = ("Intel TBB is required, try:\n"
"$ conda/pip install tbb")
err_helpers['OSX_OMP'] = ("Intel OpenMP is required, try:\n"
"$ conda/pip install intel-openmp")
requirements = []
def raise_with_hint(required):
errmsg = "No threading layer could be loaded.\n%s"
hintmsg = "HINT:\n%s"
if len(required) == 0:
hint = ''
if len(required) == 1:
hint = hintmsg % err_helpers[required[0]]
if len(required) > 1:
options = '\nOR\n'.join([err_helpers[x] for x in required])
hint = hintmsg % ("One of:\n%s" % options)
raise ValueError(errmsg % hint)
if t in namedbackends:
# Try and load the specific named backend
lib = select_known_backend(t)
if not lib:
# something is missing preventing a valid backend from
# loading, set requirements for hinting
if t == 'tbb':
requirements.append('TBB')
elif t == 'omp' and _IS_OSX:
requirements.append('OSX_OMP')
libname = t
elif t in ['threadsafe', 'forksafe', 'safe']:
# User wants a specific behaviour...
available = ['tbb']
requirements.append('TBB')
if t == "safe":
# "safe" is TBB, which is fork and threadsafe everywhere
pass
elif t == "threadsafe":
if _IS_OSX:
requirements.append('OSX_OMP')
# omp is threadsafe everywhere
available.append('omp')
elif t == "forksafe":
# everywhere apart from linux (GNU OpenMP) has a guaranteed
# forksafe OpenMP, as OpenMP has better performance, prefer
# this to workqueue
if not _IS_LINUX:
available.append('omp')
if _IS_OSX:
requirements.append('OSX_OMP')
# workqueue is forksafe everywhere
available.append('workqueue')
else: # unreachable
msg = "No threading layer available for purpose %s"
raise ValueError(msg % t)
# select amongst available
lib, libname = select_from_backends(available)
elif t == 'default':
# If default is supplied, try them in order, tbb, omp,
# workqueue
lib, libname = select_from_backends(namedbackends)
if not lib:
# set requirements for hinting
requirements.append('TBB')
if _IS_OSX:
requirements.append('OSX_OMP')
else:
msg = "The threading layer requested '%s' is unknown to Numba."
raise ValueError(msg % t)
# No lib found, raise and hint
if not lib:
raise_with_hint(requirements)
ll.add_symbol('numba_parallel_for', lib.parallel_for)
ll.add_symbol('do_scheduling_signed', lib.do_scheduling_signed)
ll.add_symbol('do_scheduling_unsigned', lib.do_scheduling_unsigned)
ll.add_symbol('allocate_sched', lib.allocate_sched)
ll.add_symbol('deallocate_sched', lib.deallocate_sched)
launch_threads = CFUNCTYPE(None, c_int)(lib.launch_threads)
launch_threads(NUM_THREADS)
_load_threading_functions(lib) # load late
# set library name so it can be queried
global _threading_layer
_threading_layer = libname
_is_initialized = True
def _load_threading_functions(lib):
ll.add_symbol('get_num_threads', lib.get_num_threads)
ll.add_symbol('set_num_threads', lib.set_num_threads)
ll.add_symbol('get_thread_id', lib.get_thread_id)
global _set_num_threads
_set_num_threads = CFUNCTYPE(None, c_int)(lib.set_num_threads)
_set_num_threads(NUM_THREADS)
global _get_num_threads
_get_num_threads = CFUNCTYPE(c_int)(lib.get_num_threads)
global _get_thread_id
_get_thread_id = CFUNCTYPE(c_int)(lib.get_thread_id)
ll.add_symbol('set_parallel_chunksize', lib.set_parallel_chunksize)
ll.add_symbol('get_parallel_chunksize', lib.get_parallel_chunksize)
ll.add_symbol('get_sched_size', lib.get_sched_size)
global _set_parallel_chunksize
_set_parallel_chunksize = CFUNCTYPE(c_uint,
c_uint)(lib.set_parallel_chunksize)
global _get_parallel_chunksize
_get_parallel_chunksize = CFUNCTYPE(c_uint)(lib.get_parallel_chunksize)
global _get_sched_size
_get_sched_size = CFUNCTYPE(c_uint,
c_uint,
c_uint,
POINTER(c_int),
POINTER(c_int))(lib.get_sched_size)
# Some helpers to make set_num_threads jittable
def gen_snt_check():
from numba.core.config import NUMBA_NUM_THREADS
msg = "The number of threads must be between 1 and %s" % NUMBA_NUM_THREADS
def snt_check(n):
if n > NUMBA_NUM_THREADS or n < 1:
raise ValueError(msg)
return snt_check
snt_check = gen_snt_check()
@overload(snt_check)
def ol_snt_check(n):
return snt_check
def set_num_threads(n):
"""
Set the number of threads to use for parallel execution.
By default, all :obj:`numba.config.NUMBA_NUM_THREADS` threads are used.
This functionality works by masking out threads that are not used.
Therefore, the number of threads *n* must be less than or equal to
:obj:`~.NUMBA_NUM_THREADS`, the total number of threads that are launched.
See its documentation for more details.
This function can be used inside of a jitted function.
Parameters
----------
n: The number of threads. Must be between 1 and NUMBA_NUM_THREADS.
See Also
--------
get_num_threads, numba.config.NUMBA_NUM_THREADS,
numba.config.NUMBA_DEFAULT_NUM_THREADS, :envvar:`NUMBA_NUM_THREADS`
"""
_launch_threads()
if not isinstance(n, (int, np.integer)):
raise TypeError("The number of threads specified must be an integer")
snt_check(n)
_set_num_threads(n)
@overload(set_num_threads)
def ol_set_num_threads(n):
_launch_threads()
if not isinstance(n, types.Integer):
msg = "The number of threads specified must be an integer"
raise errors.TypingError(msg)
def impl(n):
snt_check(n)
_set_num_threads(n)
return impl
def get_num_threads():
"""
Get the number of threads used for parallel execution.
By default (if :func:`~.set_num_threads` is never called), all
:obj:`numba.config.NUMBA_NUM_THREADS` threads are used.
This number is less than or equal to the total number of threads that are
launched, :obj:`numba.config.NUMBA_NUM_THREADS`.
This function can be used inside of a jitted function.
Returns
-------
The number of threads.
See Also
--------
set_num_threads, numba.config.NUMBA_NUM_THREADS,
numba.config.NUMBA_DEFAULT_NUM_THREADS, :envvar:`NUMBA_NUM_THREADS`
"""
_launch_threads()
num_threads = _get_num_threads()
if num_threads <= 0:
raise RuntimeError("Invalid number of threads. "
"This likely indicates a bug in Numba. "
"(thread_id=%s, num_threads=%s)" %
(get_thread_id(), num_threads))
return num_threads
@overload(get_num_threads)
def ol_get_num_threads():
_launch_threads()
def impl():
num_threads = _get_num_threads()
if num_threads <= 0:
print("Broken thread_id: ", get_thread_id())
print("num_threads: ", num_threads)
raise RuntimeError("Invalid number of threads. "
"This likely indicates a bug in Numba.")
return num_threads
return impl
@intrinsic
def _iget_num_threads(typingctx):
_launch_threads()
def codegen(context, builder, signature, args):
mod = builder.module
fnty = ir.FunctionType(cgutils.intp_t, [])
fn = cgutils.get_or_insert_function(mod, fnty, "get_num_threads")
return builder.call(fn, [])
return signature(types.intp), codegen
def get_thread_id():
"""
Returns a unique ID for each thread in the range 0 (inclusive)
to :func:`~.get_num_threads` (exclusive).
"""
# Called from the interpreter directly, this should return 0
# Called from a sequential JIT region, this should return 0
# Called from a parallel JIT region, this should return 0..N
# Called from objmode in a parallel JIT region, this should return 0..N
_launch_threads()
return _get_thread_id()
@overload(get_thread_id)
def ol_get_thread_id():
_launch_threads()
def impl():
return _iget_thread_id()
return impl
@intrinsic
def _iget_thread_id(typingctx):
def codegen(context, builder, signature, args):
mod = builder.module
fnty = ir.FunctionType(cgutils.intp_t, [])
fn = cgutils.get_or_insert_function(mod, fnty, "get_thread_id")
return builder.call(fn, [])
return signature(types.intp), codegen
_DYLD_WORKAROUND_SET = 'NUMBA_DYLD_WORKAROUND' in os.environ
_DYLD_WORKAROUND_VAL = int(os.environ.get('NUMBA_DYLD_WORKAROUND', 0))
if _DYLD_WORKAROUND_SET and _DYLD_WORKAROUND_VAL:
_launch_threads()
def set_parallel_chunksize(n):
_launch_threads()
if not isinstance(n, (int, np.integer)):
raise TypeError("The parallel chunksize must be an integer")
global _set_parallel_chunksize
if n < 0:
raise ValueError("chunksize must be greater than or equal to zero")
return _set_parallel_chunksize(n)
def get_parallel_chunksize():
_launch_threads()
global _get_parallel_chunksize
return _get_parallel_chunksize()
@overload(set_parallel_chunksize)
def ol_set_parallel_chunksize(n):
_launch_threads()
if not isinstance(n, types.Integer):
msg = "The parallel chunksize must be an integer"
raise errors.TypingError(msg)
def impl(n):
if n < 0:
raise ValueError("chunksize must be greater than or equal to zero")
return _set_parallel_chunksize(n)
return impl
@overload(get_parallel_chunksize)
def ol_get_parallel_chunksize():
_launch_threads()
def impl():
return _get_parallel_chunksize()
return impl