diff --git a/buildscripts/azure/azure-windows.yml b/buildscripts/azure/azure-windows.yml index 51860a379b3..d9b82d8c0ab 100644 --- a/buildscripts/azure/azure-windows.yml +++ b/buildscripts/azure/azure-windows.yml @@ -37,32 +37,20 @@ jobs: buildscripts\\incremental\\setup_conda_environment.cmd displayName: 'Before Install' - # VC 9.0 cannot build tbbpool.cpp in Numba, so we need to remove - # tbb from the environment before the build stage. - script: | + # use TBB call activate %CONDA_ENV% - conda remove -y tbb tbb-devel - displayName: 'Remove TBB' - condition: eq(variables['PYTHON'], '2.7') + conda install -y tbb tbb-devel + displayName: 'Add in TBB' - script: | buildscripts\\incremental\\build.cmd displayName: 'Build' - script: | - # One of the tbb tests is failing on Azure. Removing tbb before - # testing until we can figure out why. Only do this for Python 3 - # because we already removed TBB before build on Python 2. call activate %CONDA_ENV% - conda remove -y tbb tbb-devel - displayName: 'Remove TBB' - condition: ne(variables['PYTHON'], '2.7') - - # not working on windows? - #- script: | - # call activate %CONDA_ENV% - # numba.exe -s - # displayName: 'Display numba system information' + python -m numba -s + displayName: 'Display numba system information' - script: | call activate %CONDA_ENV% diff --git a/buildscripts/condarecipe.local/meta.yaml b/buildscripts/condarecipe.local/meta.yaml index cfa1d5e6a79..53a51f5961d 100644 --- a/buildscripts/condarecipe.local/meta.yaml +++ b/buildscripts/condarecipe.local/meta.yaml @@ -36,7 +36,7 @@ requirements: - funcsigs # [py27] - singledispatch # [py27] # TBB devel version is to match TBB libs - - tbb-devel >=2018.0.5 # [not ((armv6l or armv7l or aarch64) or (win and py27))] + - tbb-devel >=2019.5 # [not (armv6l or armv7l or aarch64 or linux32)] run: - python >=3.6 - numpy >=1.15 @@ -48,7 +48,7 @@ requirements: run_constrained: # If TBB is present it must be at least this version from Anaconda due to # build flag issues triggering UB - - tbb >=2018.0.5 # [not ((armv6l or armv7l or aarch64) or (win and py27))] + - tbb >=2019.5 # [not (armv6l or armv7l or aarch64 or linux32)] # avoid confusion from openblas bugs - libopenblas !=0.3.6 # [x86_64] # CUDA 8.0 or later is required for CUDA support @@ -65,7 +65,7 @@ test: - ipython # [not (armv6l or armv7l or aarch64)] - setuptools - faulthandler # [py27 and (not (armv6l or armv7l or aarch64))] - - tbb >=2018.0.5 # [not ((armv6l or armv7l or aarch64) or (win and py27))] + - tbb >=2019.5 # [not (armv6l or armv7l or aarch64 or linux32)] - intel-openmp # [osx] # Need these for AOT. Do not init msvc as it may not be present - {{ compiler('c') }} # [not (win or armv6l or armv7l or aarch64)] diff --git a/docs/source/developer/index.rst b/docs/source/developer/index.rst index 89c48be7815..c804896ab02 100644 --- a/docs/source/developer/index.rst +++ b/docs/source/developer/index.rst @@ -22,6 +22,7 @@ Developer Manual environment.rst hashing.rst caching.rst + threading_implementation.rst literal.rst debugging.rst roadmap.rst diff --git a/docs/source/developer/threading_implementation.rst b/docs/source/developer/threading_implementation.rst new file mode 100644 index 00000000000..bc6bec58235 --- /dev/null +++ b/docs/source/developer/threading_implementation.rst @@ -0,0 +1,227 @@ +========================================= +Notes on Numba's threading implementation +========================================= + +The execution of the work presented by the Numba ``parallel`` targets is +undertaken by the Numba threading layer. Practically, the "threading layer" +is a Numba built-in library that can perform the required concurrent execution. +At the time of writing there are three threading layers available, each +implemented via a different lower level native threading library. More +information on the threading layers and appropriate selection of a threading +layer for a given application/system can be found in the +:ref:`threading layer documentation `. + +The pertinent information to note for the following sections is that the +function in the threading library that performs the parallel execution is the +``parallel_for`` function. The job of this function is to both orchestrate and +execute the parallel tasks. + +The relevant source files referenced in this document are + +- ``numba/npyufunc/tbbpool.cpp`` +- ``numba/npyufunc/omppool.cpp`` +- ``numba/npyufunc/workqueue.c`` + + These files contain the TBB, OpenMP, and workqueue threadpool + implementations, respectively. Each includes the functions + ``set_num_threads()``, ``get_num_threads()``, and ``get_thread_id()``, as + well as the relevant logic for thread masking in their respective + schedulers. Note that the basic thread local variable logic is duplicated in + each of these files, and not shared between them. + +- ``numba/npyufunc/parallel.py`` + + This file contains the Python and JIT compatible wrappers for + ``set_num_threads()``, ``get_num_threads()``, and ``get_thread_id()``, as + well as the code that loads the above libraries into Python and launches the + threadpool. + +- ``numba/npyufunc/parfor.py`` + + This file contains the main logic for generating code for the parallel + backend. The thread mask is accessed in this file in the code that generates + scheduler code, and passed to the relevant backend scheduler function (see + below). + +Thread masking +-------------- + +As part of its design, Numba never launches new threads beyond the threads +that are launched initially with ``numba.npyufunc.parallel._launch_threads()`` +when the first parallel execution is run. This is due to the way threads were +already implemented in Numba prior to thread masking being implemented. This +restriction was kept to keep the design simple, although it could be removed +in the future. Consequently, it's possible to programmatically set the number +of threads, but only to less than or equal to the total number that have +already been launched. This is done by "masking" out unused threads, causing +them to do no work. For example, on a 16 core machine, if the user were to +call ``set_num_threads(4)``, Numba would always have 16 threads present, but +12 of them would sit idle for parallel computations. A further call to +``set_num_threads(16)`` would cause those same threads to do work in later +computations. + +:ref:`Thread masking ` was added to make +it possible for a user to programmatically alter the number of threads +performing work in the threading layer. Thread masking proved challenging to +implement as it required the development of a programming model that is suitable +for users, easy to reason about, and could be implemented safely, with +consistent behavior across the various threading layers. + +Programming model +~~~~~~~~~~~~~~~~~ + +The programming model chosen is similar to that found in OpenMP. The reasons +for this choice were that it is familiar to a lot of users, restricted in +scope and also simple. The number of threads in use is specified by calling +``set_num_threads`` and the number of threads in use can be queried by calling +``get_num_threads``.These two functions are synonymous with their OpenMP +counterparts (with the above restriction that the mask must be less than or +equal to the number of launched threads). The execution semantics are also +similar to OpenMP in that once a parallel region is launched, altering the +thread mask has no impact on the currently executing region, but will have an +impact on parallel regions executed subsequently. + +The Implementation +~~~~~~~~~~~~~~~~~~ + +So as to place no further restrictions on user code other than those that +already existed in the threading layer libraries, careful consideration of the +design of thread masking was required. The "thread mask" cannot be stored in a +global value as concurrent use of the threading layer may result in classic +forms of race conditions on the value itself. Numerous designs were discussed +involving various types of mutex on such a global value, all of which were +eventually broken through thought experiment alone. It eventually transpired +that, following some OpenMP implementations, the "thread mask" is best +implemented as a ``thread local``. This means each thread that executes a Numba +parallel function will have a thread local storage (TLS) slot that contains the +value of the thread mask to use when scheduling threads in the ``parallel_for`` +function. + +The above notion of TLS use for a thread mask is relatively easy to implement, +``get_num_threads`` and ``set_num_threads`` simply need to address the TLS slot +in a given threading layer. This also means that the execution schedule for a +parallel region can be derived from a run time call to ``get_num_threads``. This +is achieved via a well known and relatively easy to implement pattern of a ``C`` +library function registration and wrapping it in the internal Numba +implementation. + +In addition to satisfying the original upfront thread masking requirements, a +few more complicated scenarios needed consideration as follows. + +Nested parallelism +****************** + +In all threading layers a "main thread" will invoke the ``parallel_for`` +function and then in the parallel region, depending on the threading layer, +some number of additional threads will assist in doing the actual work. +If the work contains a call to another parallel function (i.e. nested +parallelism) it is necessary for the thread making the call to know what the +"thread mask" of the main thread is so that it can propagate it into the +``parallel_for`` call it makes when executing the nested parallel function. +The implementation of this behavior is threading layer specific but the general +principle is for the "main thread" to always "send" the value of the thread mask +from its TLS slot to all threads in the threading layer that are active in the +parallel region. These active threads then update their TLS slots with this +value prior to performing any work. The net result of this implementation detail +is that: + +* thread masks correctly propagate into nested functions +* it's still possible for each thread in a parallel region to safely have a + different mask with which to call nested functions, if it's not set explicitly + then the inherited mask from the "main thread" is used +* threading layers which have dynamic scheduling with threads potentially + joining and leaving the active pool during a ``parallel_for`` execution are + successfully accommodated +* any "main thread" thread mask is entirely decoupled from the in-flux nature + of the thread masks of the threads in the active thread pool + +Python threads independently invoking parallel functions +******************************************************** + +The threading layer launch sequence is heavily guarded to ensure that the +launch is both thread and process safe and run once per process. In a system +with numerous Python ``threading`` module threads all using Numba, the first +thread through the launch sequence will get its thread mask set appropriately, +but no further threads can run the launch sequence. This means that other +threads will need their initial thread mask set some other way. This is +achieved when ``get_num_threads`` is called and no thread mask is present, in +this case the thread mask will be set to the default. In the implementation, +"no thread mask is present" is represented by the value ``-1`` and the "default +thread mask" (unset) is represented by the value ``0``. The implementation also +immediately calls ``set_num_threads(NUMBA_NUM_THREADS)`` after doing this, so +if either ``-1`` or ``0`` is encountered as a result from ``get_num_threads()`` it +indicates a bug in the above processes. + +OS ``fork()`` calls +******************* + +The use of TLS was also in part driven by the Linux (the most popular +platform for Numba use by far) having a ``fork(2, 3P)`` call that will do TLS +propagation into child processes, see ``clone(2)``\ 's ``CLONE_SETTLS``. + +Thread ID +********* + +A private ``get_thread_id()`` function was added to each threading backend, +which returns a unique ID for each thread. This can be accessed from Python by +``numba.npyufunc.parallel._get_thread_id()`` (it can also be used inside a +JIT compiled function). The thread ID function is useful for testing that the +thread masking behavior is correct, but it should not be used outside of the +tests. For example, one can call ``set_num_threads(4)`` and then collect all +unique ``_get_thread_id()``\ s in a parallel region to verify that only 4 +threads are run. + +Caveats +~~~~~~~ + +Some caveats to be aware of when testing thread masking: + +- The TBB backend may choose to schedule fewer than the given mask number of + threads. Thus a test such as the one described above may return fewer than 4 + unique threads. + +- The workqueue backend is not threadsafe, so attempts to do multithreading + nested parallelism with it may result in deadlocks or other undefined + behavior. The workqueue backend will raise a SIGABRT signal if it detects + nested parallelism. + +- Certain backends may reuse the main thread for computation, but this + behavior shouldn't be relied upon (for instance, if propagating exceptions). + +Use in Code Generation +~~~~~~~~~~~~~~~~~~~~~~ + +The general pattern for using ``get_num_threads`` in code generation is + +.. code:: python + + import llvmlite.llvmpy.core as lc + + get_num_threads = builder.module.get_or_insert_function( + lc.Type.function(lc.Type.int(types.intp.bitwidth), []), + name="get_num_threads") + + num_threads = builder.call(get_num_threads, []) + + with cgutils.if_unlikely(builder, builder.icmp_signed('<=', num_threads, + num_threads.type(0))): + cgutils.printf(builder, "num_threads: %d\n", num_threads) + context.call_conv.return_user_exc(builder, RuntimeError, + ("Invalid number of threads. " + "This likely indicates a bug in Numba.",)) + + # Pass num_threads through to the appropriate backend function here + +See the code in ``numba/npyufunc/parfor.py``. + +The guard against ``num_threads`` being <= 0 is not strictly necessary, but it +can protect against accidentally incorrect behavior in case the thread masking +logic contains a bug. + +The ``num_threads`` variable should be passed through to the appropriate +backend function, such as ``do_scheduling`` or ``parallel_for``. If it's used +in some way other than passing it through to the backend function, the above +considerations should be taken into account to ensure the use of the +``num_threads`` variable is safe. It would probably be better to keep such +logic in the threading backends, rather than trying to do it in code +generation. diff --git a/docs/source/reference/envvars.rst b/docs/source/reference/envvars.rst index adba1f0fc12..4c22336eed9 100644 --- a/docs/source/reference/envvars.rst +++ b/docs/source/reference/envvars.rst @@ -363,7 +363,10 @@ Threading Control of ``OMP_NUM_THREADS`` and ``MKL_NUM_THREADS``. *Default value:* The number of CPU cores on the system as determined at run - time, this can be accessed via ``numba.config.NUMBA_DEFAULT_NUM_THREADS``. + time. This can be accessed via :obj:`numba.config.NUMBA_DEFAULT_NUM_THREADS`. + + See also the section on :ref:`setting_the_number_of_threads` for + information on how to set the number of threads at runtime. .. envvar:: NUMBA_THREADING_LAYER diff --git a/docs/source/user/threading-layer.rst b/docs/source/user/threading-layer.rst index 4aa11aca31e..0f69b1f5a03 100644 --- a/docs/source/user/threading-layer.rst +++ b/docs/source/user/threading-layer.rst @@ -166,3 +166,106 @@ system level libraries, some additional things to note: * On OSX, the ``intel-openmp`` package is required to enable the OpenMP based threading layer. +.. _setting_the_number_of_threads: + +Setting the Number of Threads +----------------------------- + +The number of threads used by numba is based on the number of CPU cores +available (see :obj:`numba.config.NUMBA_DEFAULT_NUM_THREADS`), but it can be +overridden with the :envvar:`NUMBA_NUM_THREADS` environment variable. + +The total number of threads that numba launches is in the variable +:obj:`numba.config.NUMBA_NUM_THREADS`. + +For some use cases, it may be desirable to set the number of threads to a +lower value, so that numba can be used with higher level parallelism. + +The number of threads can be set dynamically at runtime using +:func:`numba.set_num_threads`. Note that :func:`~.set_num_threads` only allows +setting the number of threads to a smaller value than +:obj:`~.NUMBA_NUM_THREADS`. Numba always launches +:obj:`numba.config.NUMBA_NUM_THREADS` threads, but :func:`~.set_num_threads` +causes it to mask out unused threads so they aren't used in computations. + +The current number of threads used by numba can be accessed with +:func:`numba.get_num_threads`. Both functions work inside of a jitted +function. + +.. _numba-threading-layer-thread-masking: + +Example of Limiting the Number of Threads +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In this example, suppose the machine we are running on has 8 cores (so +:obj:`numba.config.NUMBA_NUM_THREADS` would be ``8``). Suppose we want to run +some code with ``@njit(parallel=True)``, but we also want to run our code +concurrently in 4 different processes. With the default number of threads, +each Python process would run 8 threads, for a total in 4*8 = 32 threads, +which is oversubscription for our 8 cores. We should rather limit each process +to 2 threads, so that the total will be 4*2 = 8, which matches our number of +physical cores. + +There are two ways to do this. One is to set the :envvar:`NUMBA_NUM_THREADS` +environment variable to ``2``. + +.. code:: bash + + $ NUMBA_NUM_THREADS=2 python ourcode.py + +However, there are two downsides to this approach: + +1. :envvar:`NUMBA_NUM_THREADS` must be set before Numba is imported, and + ideally before Python is launched. As soon as Numba is imported the + environment variable is read and that number of threads is locked in as the + number of threads Numba launches. + +2. If we want to later increase the number of threads used by the process, we + cannot. :envvar:`NUMBA_NUM_THREADS` sets the *maximum* number of threads + that are launched for a process. Calling :func:`~.set_num_threads()` with a + value greater than :obj:`numba.config.NUMBA_NUM_THREADS` results in an + error. + +The advantage of this approach is that we can do it from outside of the +process without changing the code. + +Another approach is to use the :func:`numba.set_num_threads` function in our code + +.. code:: python + + from numba import njit, set_num_threads + + @njit(parallel=True) + def func(): + ... + + set_num_threads(2) + func() + +If we call ``set_num_threads(2)`` before executing our parallel code, it has +the same effect as calling the process with ``NUMBA_NUM_THREADS=2``, in that +the parallel code will only execute on 2 threads. However, we can later call +``set_num_threads(8)`` to increase the number of threads back to the default +size. And we do not have to worry about setting it before Numba gets imported. +It only needs to be called before the parallel function is run. + +API Reference +~~~~~~~~~~~~~ + +.. py:data:: numba.config.NUMBA_NUM_THREADS + + The total (maximum) number of threads launched by numba. + + Defaults to :obj:`numba.config.NUMBA_DEFAULT_NUM_THREADS`, but can be + overridden with the :envvar:`NUMBA_NUM_THREADS` environment variable. + +.. py:data:: numba.config.NUMBA_DEFAULT_NUM_THREADS + + The number of CPU cores on the system (as determined by + ``multiprocessing.cpu_count()``). This is the default value for + :obj:`numba.config.NUMBA_NUM_THREADS` unless the + :envvar:`NUMBA_NUM_THREADS` environment variable is set. + +.. autofunction:: numba.set_num_threads + +.. autofunction:: numba.get_num_threads diff --git a/numba/__init__.py b/numba/__init__.py index 698867fc0b4..2e9e1770d3c 100644 --- a/numba/__init__.py +++ b/numba/__init__.py @@ -35,7 +35,8 @@ jit_module) # Re-export vectorize decorators and the thread layer querying function -from numba.np.ufunc import vectorize, guvectorize, threading_layer +from numba.np.ufunc import (vectorize, guvectorize, threading_layer, + get_num_threads, set_num_threads) # Re-export Numpy helpers from numba.np.numpy_support import carray, farray, from_dtype @@ -71,6 +72,8 @@ vectorize objmode literal_unroll + get_num_threads + set_num_threads """.split() + types.__all__ + errors.__all__ diff --git a/numba/core/config.py b/numba/core/config.py index aaeaafcba7f..8dada97f6ae 100644 --- a/numba/core/config.py +++ b/numba/core/config.py @@ -322,8 +322,22 @@ def avx_default(): NUMBA_DEFAULT_NUM_THREADS = max(1, multiprocessing.cpu_count()) # Numba thread pool size (defaults to number of CPUs on the system). - NUMBA_NUM_THREADS = _readenv("NUMBA_NUM_THREADS", int, - NUMBA_DEFAULT_NUM_THREADS) + _NUMBA_NUM_THREADS = _readenv("NUMBA_NUM_THREADS", int, + NUMBA_DEFAULT_NUM_THREADS) + if ('NUMBA_NUM_THREADS' in globals() + and globals()['NUMBA_NUM_THREADS'] != _NUMBA_NUM_THREADS): + + from numba.np.ufunc import parallel + if parallel._is_initialized: + raise RuntimeError("Cannot set NUMBA_NUM_THREADS to a " + "different value once the threads have been " + "launched (currently have %s, " + "trying to set %s)" % + (_NUMBA_NUM_THREADS, + globals()['NUMBA_NUM_THREADS'])) + + NUMBA_NUM_THREADS = _NUMBA_NUM_THREADS + del _NUMBA_NUM_THREADS # Profiling support diff --git a/numba/misc/numba_entry.py b/numba/misc/numba_entry.py index cdd06297a44..9adf8f6f719 100644 --- a/numba/misc/numba_entry.py +++ b/numba/misc/numba_entry.py @@ -278,6 +278,7 @@ def parse_error(e, backend): try: from numba.np.ufunc import omppool print(fmt % ("OpenMP Threading layer available", True)) + print(fmt % ("+--> Vendor: ", omppool.openmp_vendor)) except ImportError as e: print(fmt % ("OpenMP Threading layer available", False)) print(fmt % ("+--> Disabled due to", diff --git a/numba/np/ufunc/__init__.py b/numba/np/ufunc/__init__.py index 2f01bc6de0f..c9542e0f875 100644 --- a/numba/np/ufunc/__init__.py +++ b/numba/np/ufunc/__init__.py @@ -3,7 +3,9 @@ from numba.np.ufunc.decorators import Vectorize, GUVectorize, vectorize, guvectorize from numba.np.ufunc._internal import PyUFunc_None, PyUFunc_Zero, PyUFunc_One from numba.np.ufunc import _internal, array_exprs -from numba.np.ufunc.parallel import threading_layer +from numba.np.ufunc.parallel import (threading_layer, get_num_threads, + set_num_threads, _get_thread_id) + if hasattr(_internal, 'PyUFunc_ReorderableNone'): PyUFunc_ReorderableNone = _internal.PyUFunc_ReorderableNone diff --git a/numba/np/ufunc/_num_threads.c b/numba/np/ufunc/_num_threads.c new file mode 100644 index 00000000000..10c9d95ee67 --- /dev/null +++ b/numba/np/ufunc/_num_threads.c @@ -0,0 +1,37 @@ +// Thread local num_threads variable for masking out the total number of +// launched threads. + +#include "../../_pymodule.h" + +#ifdef _MSC_VER +#define THREAD_LOCAL(ty) __declspec(thread) ty +#else +/* Non-standard C99 extension that's understood by gcc and clang */ +#define THREAD_LOCAL(ty) __thread ty +#endif + +static THREAD_LOCAL(int) num_threads = 0; + +static void set_num_threads(int count) +{ + num_threads = count; +} + +static int get_num_threads(void) +{ + return num_threads; +} + +MOD_INIT(_num_threads) +{ + PyObject *m; + MOD_DEF(m, "_num_threads", "No docs", NULL) + if (m == NULL) + return MOD_ERROR_VAL; + PyObject_SetAttrString(m, "set_num_threads", + PyLong_FromVoidPtr((void*)&set_num_threads)); + PyObject_SetAttrString(m, "get_num_threads", + PyLong_FromVoidPtr((void*)&get_num_threads)); + + return MOD_SUCCESS_VAL(m); +} diff --git a/numba/np/ufunc/omppool.cpp b/numba/np/ufunc/omppool.cpp index c2c62f62284..2e23da46b27 100644 --- a/numba/np/ufunc/omppool.cpp +++ b/numba/np/ufunc/omppool.cpp @@ -30,16 +30,57 @@ Threading layer on top of OpenMP. // OpenMP vendor strings #if defined(_MSC_VER) #define _OMP_VENDOR "MS" -#elif defined(__GNUC__) -#define _OMP_VENDOR "GNU" #elif defined(__clang__) #define _OMP_VENDOR "Intel" +#elif defined(__GNUC__) // NOTE: clang also defines this, but it's checked above +#define _NOT_FORKSAFE 1 // GNU OpenMP Not forksafe +#define _OMP_VENDOR "GNU" #endif -#if defined(__GNUC__) +#if defined(_NOT_FORKSAFE) static pid_t parent_pid = 0; // 0 is not set, users can't own this anyway #endif + +#ifdef _MSC_VER +#define THREAD_LOCAL(ty) __declspec(thread) ty +#else +/* Non-standard C99 extension that's understood by gcc and clang */ +#define THREAD_LOCAL(ty) __thread ty +#endif + +// This is the number of threads that is default, it is set on initialisation of +// the threading backend via the launch_threads() call +static int _INIT_NUM_THREADS = -1; + +// This is the per-thread thread mask, each thread can carry its own mask. +static THREAD_LOCAL(int) _TLS_num_threads = 0; + +static void +set_num_threads(int count) +{ + _TLS_num_threads = count; +} + +static int +get_num_threads(void) +{ + if (_TLS_num_threads == 0) + { + // This is a thread that did not call launch_threads() but is still a + // "main" thread, probably from e.g. threading.Thread() use, it still + // has a TLS slot which is 0 from the lack of launch_threads() call + _TLS_num_threads = _INIT_NUM_THREADS; + } + return _TLS_num_threads; +} + +static int +get_thread_id(void) +{ + return omp_get_thread_num(); +} + static void add_task(void *fn, void *args, void *dims, void *steps, void *data) { @@ -51,7 +92,7 @@ add_task(void *fn, void *args, void *dims, void *steps, void *data) static void parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *data, - size_t inner_ndim, size_t array_count) + size_t inner_ndim, size_t array_count, int num_threads) { typedef void (*func_ptr_t)(char **args, size_t *dims, size_t *steps, void *data); func_ptr_t func = reinterpret_cast(fn); @@ -62,7 +103,7 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat printed = true; } -#if defined(__GNUC__) +#if defined(_NOT_FORKSAFE) // Handle GNU OpenMP not being forksafe... // This checks if the pid set by the process that initialized this library // matches the parent of this pid. If they do match this is a fork() from @@ -90,6 +131,10 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat // index variable in OpenMP 'for' statement must have signed integral type for MSVC const ptrdiff_t size = (ptrdiff_t)dimensions[0]; + // holds the shared variable for `num_threads`, this is a bit superfluous + // but present to force thinking about the scope of validity + int agreed_nthreads = num_threads; + if(_DEBUG) { printf("inner_ndim: %lu\n",inner_ndim); @@ -107,10 +152,17 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat printf("\n"); } - #pragma omp parallel + // Set the thread mask on the pragma such that the state is scope limited + // and passed via a register on the OMP region call site, this limiting + // global state and racing + #pragma omp parallel num_threads(num_threads), shared(agreed_nthreads) { size_t * count_space = (size_t *)alloca(sizeof(size_t) * arg_len); char ** array_arg_space = (char**)alloca(sizeof(char*) * array_count); + + // tell the active thread team about the number of threads + set_num_threads(agreed_nthreads); + #pragma omp for for(ptrdiff_t r = 0; r < size; r++) { @@ -158,7 +210,7 @@ static void launch_threads(int count) { // this must be called in a fork+thread safe region from Python static bool initialized = false; -#ifdef __GNUC__ +#ifdef _NOT_FORKSAFE parent_pid = getpid(); // record the parent PID for use later if(_DEBUG_FORK) { @@ -172,6 +224,8 @@ static void launch_threads(int count) if(count < 1) return; omp_set_num_threads(count); + omp_set_nested(0x1); // enable nesting, control depth with OMP env var + _INIT_NUM_THREADS = count; } static void synchronize(void) @@ -205,5 +259,11 @@ MOD_INIT(omppool) PyLong_FromVoidPtr((void*)&do_scheduling_unsigned)); PyObject_SetAttrString(m, "openmp_vendor", PyString_FromString(_OMP_VENDOR)); + PyObject_SetAttrString(m, "set_num_threads", + PyLong_FromVoidPtr((void*)&set_num_threads)); + PyObject_SetAttrString(m, "get_num_threads", + PyLong_FromVoidPtr((void*)&get_num_threads)); + PyObject_SetAttrString(m, "get_thread_id", + PyLong_FromVoidPtr((void*)&get_thread_id)); return MOD_SUCCESS_VAL(m); } diff --git a/numba/np/ufunc/parallel.py b/numba/np/ufunc/parallel.py index 098062dc6a3..b5387c03722 100644 --- a/numba/np/ufunc/parallel.py +++ b/numba/np/ufunc/parallel.py @@ -11,11 +11,11 @@ """ import os -import platform import sys import warnings from threading import RLock as threadRLock import multiprocessing +from ctypes import CFUNCTYPE, c_int, CDLL import numpy as np @@ -23,9 +23,15 @@ import llvmlite.binding as ll from numba.np.numpy_support import as_dtype -from numba.core import types, config +from numba.core import types, config, errors from numba.np.ufunc.wrappers import _wrapper_info from numba.np.ufunc import ufuncbuilder +from numba.extending import overload + + +_IS_OSX = sys.platform.startswith('darwin') +_IS_LINUX = sys.platform.startswith('linux') +_IS_WINDOWS = sys.platform.startswith('win32') def get_thread_count(): @@ -128,7 +134,7 @@ def as_void_ptr(arg): array_count = len(sig.args) + 1 parallel_for_ty = lc.Type.function(lc.Type.void(), - [byte_ptr_t] * 5 + [intp_t, ] * 2) + [byte_ptr_t] * 5 + [intp_t, ] * 3) parallel_for = mod.get_or_insert_function(parallel_for_ty, name='numba_parallel_for') @@ -142,12 +148,18 @@ def as_void_ptr(arg): ) wrapperlib.add_linking_library(info.library) + get_num_threads = builder.module.get_or_insert_function( + lc.Type.function(lc.Type.int(types.intp.bitwidth), []), + name="get_num_threads") + + num_threads = builder.call(get_num_threads, []) + # Prepare call fnptr = builder.bitcast(tmp_voidptr, byte_ptr_t) innerargs = [as_void_ptr(x) for x in [args, dimensions, steps, data]] builder.call(parallel_for, [fnptr] + innerargs + - [intp_t(x) for x in (inner_ndim, array_count)]) + [intp_t(x) for x in (inner_ndim, array_count)] + [num_threads]) # Release the GIL pyapi.restore_thread(thread_state) @@ -267,6 +279,7 @@ def build_gufunc_wrapper(py_func, cres, sin, sout, cache, is_parfors): class _nop(object): """A no-op contextmanager """ + def __enter__(self): pass @@ -313,6 +326,40 @@ def threading_layer(): return _threading_layer +def _check_tbb_version_compatible(): + """ + Checks that if TBB is present it is of a compatible version. + """ + try: + # first check that the TBB version is new enough + if _IS_WINDOWS: + libtbb_name = 'tbb' + elif _IS_OSX: + libtbb_name = 'libtbb.dylib' + elif _IS_LINUX: + libtbb_name = 'libtbb.so.2' + else: + raise ValueError("Unknown operating system") + libtbb = CDLL(libtbb_name) + version_func = libtbb.TBB_runtime_interface_version + version_func.argtypes = [] + version_func.restype = c_int + tbb_iface_ver = version_func() + if tbb_iface_ver < 11005: # magic number from TBB + msg = ("The TBB threading layer requires TBB " + "version 2019.5 or later i.e., " + "TBB_INTERFACE_VERSION >= 11005. Found " + "TBB_INTERFACE_VERSION = %s. The TBB " + "threading layer is disabled.") + problem = errors.NumbaWarning(msg % tbb_iface_ver) + warnings.warn(problem) + raise ImportError("Problem with TBB. Reason: %s" % msg) + except (ValueError, OSError) as e: + # Translate as an ImportError for consistent error class use, this error + # will never materialise + raise ImportError("Problem with TBB. Reason: %s" % e) + + def _launch_threads(): with _backend_init_process_lock: with _backend_init_thread_lock: @@ -320,8 +367,6 @@ def _launch_threads(): if _is_initialized: return - from ctypes import CFUNCTYPE, c_int - def select_known_backend(backend): """ Loads a specific threading layer backend based on string @@ -329,6 +374,9 @@ def select_known_backend(backend): lib = None if backend.startswith("tbb"): try: + # check if TBB is present and compatible + _check_tbb_version_compatible() + # now try and load the backend from numba.np.ufunc import tbbpool as lib except ImportError: pass @@ -363,8 +411,6 @@ def select_from_backends(backends): namedbackends = ['tbb', 'omp', 'workqueue'] lib = None - _IS_OSX = platform.system() == "Darwin" - _IS_LINUX = platform.system() == "Linux" err_helpers = dict() err_helpers['TBB'] = ("Intel TBB is required, try:\n" "$ conda/pip install tbb") @@ -446,12 +492,160 @@ def raise_with_hint(required): launch_threads = CFUNCTYPE(None, c_int)(lib.launch_threads) launch_threads(NUM_THREADS) + _load_num_threads_funcs(lib) # load late + # set library name so it can be queried global _threading_layer _threading_layer = libname _is_initialized = True +def _load_num_threads_funcs(lib): + + ll.add_symbol('get_num_threads', lib.get_num_threads) + ll.add_symbol('set_num_threads', lib.set_num_threads) + ll.add_symbol('get_thread_id', lib.get_thread_id) + + global _set_num_threads + _set_num_threads = CFUNCTYPE(None, c_int)(lib.set_num_threads) + _set_num_threads(NUM_THREADS) + + global _get_num_threads + _get_num_threads = CFUNCTYPE(c_int)(lib.get_num_threads) + + global _get_thread_id + _get_thread_id = CFUNCTYPE(c_int)(lib.get_thread_id) + + +# Some helpers to make set_num_threads jittable + +def gen_snt_check(): + from numba.core.config import NUMBA_NUM_THREADS + msg = "The number of threads must be between 1 and %s" % NUMBA_NUM_THREADS + + def snt_check(n): + if n > NUMBA_NUM_THREADS or n < 1: + raise ValueError(msg) + return snt_check + + +snt_check = gen_snt_check() + + +@overload(snt_check) +def ol_snt_check(n): + return snt_check + + +def set_num_threads(n): + """ + Set the number of threads to use for parallel execution. + + By default, all :obj:`numba.config.NUMBA_NUM_THREADS` threads are used. + + This functionality works by masking out threads that are not used. + Therefore, the number of threads *n* must be less than or equal to + :obj:`~.NUMBA_NUM_THREADS`, the total number of threads that are launched. + See its documentation for more details. + + This function can be used inside of a jitted function. + + Parameters + ---------- + n: The number of threads. Must be between 1 and NUMBA_NUM_THREADS. + + See Also + -------- + get_num_threads, numba.config.NUMBA_NUM_THREADS, + numba.config.NUMBA_DEFAULT_NUM_THREADS, :envvar:`NUMBA_NUM_THREADS` + + """ + _launch_threads() + if not isinstance(n, (int, np.integer)): + raise TypeError("The number of threads specified must be an integer") + snt_check(n) + _set_num_threads(n) + + +@overload(set_num_threads) +def ol_set_num_threads(n): + _launch_threads() + if not isinstance(n, types.Integer): + msg = "The number of threads specified must be an integer" + raise errors.TypingError(msg) + + def impl(n): + snt_check(n) + _set_num_threads(n) + return impl + + +def get_num_threads(): + """ + Get the number of threads used for parallel execution. + + By default (if :func:`~.set_num_threads` is never called), all + :obj:`numba.config.NUMBA_NUM_THREADS` threads are used. + + This number is less than or equal to the total number of threads that are + launched, :obj:`numba.config.NUMBA_NUM_THREADS`. + + This function can be used inside of a jitted function. + + Returns + ------- + The number of threads. + + See Also + -------- + set_num_threads, numba.config.NUMBA_NUM_THREADS, + numba.config.NUMBA_DEFAULT_NUM_THREADS, :envvar:`NUMBA_NUM_THREADS` + + """ + _launch_threads() + num_threads = _get_num_threads() + if num_threads <= 0: + raise RuntimeError("Invalid number of threads. " + "This likely indicates a bug in Numba. " + "(thread_id=%s, num_threads=%s)" % + (_get_thread_id(), num_threads)) + return num_threads + + +@overload(get_num_threads) +def ol_get_num_threads(): + _launch_threads() + + def impl(): + num_threads = _get_num_threads() + if num_threads <= 0: + print("Broken thread_id: ", _get_thread_id()) + print("num_threads: ", num_threads) + raise RuntimeError("Invalid number of threads. " + "This likely indicates a bug in Numba.") + return num_threads + return impl + + +def _get_thread_id(): + """ + Returns a unique ID for each thread + + This function is private and should only be used for testing purposes. + """ + _launch_threads() + return _get_thread_id() + + +@overload(_get_thread_id) +def ol_get_thread_id(): + _launch_threads() + + def impl(): + return _get_thread_id() + return impl + + _DYLD_WORKAROUND_SET = 'NUMBA_DYLD_WORKAROUND' in os.environ _DYLD_WORKAROUND_VAL = int(os.environ.get('NUMBA_DYLD_WORKAROUND', 0)) diff --git a/numba/np/ufunc/tbbpool.cpp b/numba/np/ufunc/tbbpool.cpp index 640c5739cd4..faff4790fb7 100644 --- a/numba/np/ufunc/tbbpool.cpp +++ b/numba/np/ufunc/tbbpool.cpp @@ -19,17 +19,19 @@ Implement parallel vectorize workqueue on top of Intel TBB. #include "gufunc_scheduler.h" -#if TBB_INTERFACE_VERSION >= 9106 +/* TBB 2019 U5 is the minimum required version as this is needed: + * https://github.com/intel/tbb/blob/18070344d755ece04d169e6cc40775cae9288cee/CHANGES#L133-L134 + * and therefore + * https://github.com/intel/tbb/blob/18070344d755ece04d169e6cc40775cae9288cee/CHANGES#L128-L129 + * from here: + * https://github.com/intel/tbb/blob/2019_U5/include/tbb/tbb_stddef.h#L29 + */ +#if TBB_INTERFACE_VERSION < 11006 +#error "TBB version is too old, 2019 update 5, i.e. TBB_INTERFACE_VERSION >= 11005 required" +#endif + #define TSI_INIT(count) tbb::task_scheduler_init(count) #define TSI_TERMINATE(tsi) tsi->blocking_terminate(std::nothrow) -#else -#if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE -#define TSI_INIT(count) tbb::task_scheduler_init(count, 0, /*blocking termination*/true) -#define TSI_TERMINATE(tsi) tsi->terminate() -#else -#error This version of TBB does not support blocking terminate -#endif -#endif #define _DEBUG 0 #define _TRACE_SPLIT 0 @@ -38,6 +40,62 @@ static tbb::task_group *tg = NULL; static tbb::task_scheduler_init *tsi = NULL; static int tsi_count = 0; +#ifdef _MSC_VER +#define THREAD_LOCAL(ty) __declspec(thread) ty +#else +/* Non-standard C99 extension that's understood by gcc and clang */ +#define THREAD_LOCAL(ty) __thread ty +#endif + +// This is the number of threads that is default, it is set on initialisation of +// the threading backend via the launch_threads() call +static int _INIT_NUM_THREADS = -1; + +// This is the per-thread thread mask, each thread can carry its own mask. +static THREAD_LOCAL(int) _TLS_num_threads = 0; + + +static void +set_num_threads(int count) +{ + _TLS_num_threads = count; +} + +static int +get_num_threads(void) +{ + if (_TLS_num_threads == 0) + { + // This is a thread that did not call launch_threads() but is still a + // "main" thread, probably from e.g. threading.Thread() use, it still + // has a TLS slot which is 0 from the lack of launch_threads() call + _TLS_num_threads = _INIT_NUM_THREADS; + } + return _TLS_num_threads; +} + +static int +get_thread_id(void) +{ + return tbb::task_arena::current_thread_index(); +} + +// watch the arena, if it decides to create more threads/add threads into the +// arena then make sure they get the right thread count +class fix_tls_observer: public tbb::task_scheduler_observer { + int mask_val; + void on_scheduler_entry( bool is_worker ) override; +public: + fix_tls_observer(tbb::task_arena &arena, int mask) : tbb::task_scheduler_observer(arena), mask_val(mask) + { + observe(true); + } +}; + +void fix_tls_observer::on_scheduler_entry(bool worker) { + set_num_threads(mask_val); +} + static void add_task(void *fn, void *args, void *dims, void *steps, void *data) { @@ -50,7 +108,7 @@ add_task(void *fn, void *args, void *dims, void *steps, void *data) static void parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *data, - size_t inner_ndim, size_t array_count) + size_t inner_ndim, size_t array_count, int num_threads) { static bool printed = false; if(!printed && _DEBUG) @@ -83,48 +141,64 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat printf("\n"); } - using range_t = tbb::blocked_range; - tbb::parallel_for(range_t(0, dimensions[0]), [=](const range_t &range) - { - size_t * count_space = (size_t *)alloca(sizeof(size_t) * arg_len); - char ** array_arg_space = (char**)alloca(sizeof(char*) * array_count); - memcpy(count_space, dimensions, arg_len * sizeof(size_t)); - count_space[0] = range.size(); + // This is making the assumption that the calling thread knows the truth + // about num_threads, which should be correct via the following: + // program starts/reinits and the threadpool launches, num_threads TLS is + // set as default. Any thread spawned on init making a call to this function + // will have a valid num_threads TLS slot and so the task_arena is sized + // appropriately and it's value is used in the observer that fixes the TLS + // slots of any subsequent threads joining the task_arena. This leads to + // all threads in a task_arena having valid num_threads TLS slots prior to + // doing any work. Any further call to query the TLS slot value made by any + // thread in the arena is then safe and were any thread to create a nested + // parallel region the same logic applies as per program start/reinit. + tbb::task_arena limited(num_threads); + fix_tls_observer observer(limited, num_threads); - if(_DEBUG && _TRACE_SPLIT > 1) + limited.execute([&]{ + using range_t = tbb::blocked_range; + tbb::parallel_for(range_t(0, dimensions[0]), [=](const range_t &range) { - printf("THREAD %p:", count_space); - printf("count_space: "); - for(size_t j = 0; j < arg_len; j++) - printf("%lu, ", count_space[j]); - printf("\n"); - } - for(size_t j = 0; j < array_count; j++) - { - char * base = args[j]; - size_t step = steps[j]; - ptrdiff_t offset = step * range.begin(); - array_arg_space[j] = base + offset; + size_t * count_space = (size_t *)alloca(sizeof(size_t) * arg_len); + char ** array_arg_space = (char**)alloca(sizeof(char*) * array_count); + memcpy(count_space, dimensions, arg_len * sizeof(size_t)); + count_space[0] = range.size(); - if(_DEBUG && _TRACE_SPLIT > 2) + if(_DEBUG && _TRACE_SPLIT > 1) { - printf("Index %ld\n", j); - printf("-->Got base %p\n", (void *)base); - printf("-->Got step %lu\n", step); - printf("-->Got offset %ld\n", offset); - printf("-->Got addr %p\n", (void *)array_arg_space[j]); + printf("THREAD %p:", count_space); + printf("count_space: "); + for(size_t j = 0; j < arg_len; j++) + printf("%lu, ", count_space[j]); + printf("\n"); } - } - - if(_DEBUG && _TRACE_SPLIT > 2) - { - printf("array_arg_space: "); for(size_t j = 0; j < array_count; j++) - printf("%p, ", (void *)array_arg_space[j]); - printf("\n"); - } - auto func = reinterpret_cast(fn); - func(array_arg_space, count_space, steps, data); + { + char * base = args[j]; + size_t step = steps[j]; + ptrdiff_t offset = step * range.begin(); + array_arg_space[j] = base + offset; + + if(_DEBUG && _TRACE_SPLIT > 2) + { + printf("Index %ld\n", j); + printf("-->Got base %p\n", (void *)base); + printf("-->Got step %lu\n", step); + printf("-->Got offset %ld\n", offset); + printf("-->Got addr %p\n", (void *)array_arg_space[j]); + } + } + + if(_DEBUG && _TRACE_SPLIT > 2) + { + printf("array_arg_space: "); + for(size_t j = 0; j < array_count; j++) + printf("%p, ", (void *)array_arg_space[j]); + printf("\n"); + } + auto func = reinterpret_cast(fn); + func(array_arg_space, count_space, steps, data); + }); }); } @@ -192,6 +266,8 @@ static void launch_threads(int count) tg = new tbb::task_group; tg->run([] {}); // start creating threads asynchronously + _INIT_NUM_THREADS = count; + #ifndef _MSC_VER pthread_atfork(prepare_fork, reset_after_fork, reset_after_fork); #endif @@ -235,7 +311,12 @@ MOD_INIT(tbbpool) PyLong_FromVoidPtr((void*)&do_scheduling_signed)); PyObject_SetAttrString(m, "do_scheduling_unsigned", PyLong_FromVoidPtr((void*)&do_scheduling_unsigned)); - + PyObject_SetAttrString(m, "set_num_threads", + PyLong_FromVoidPtr((void*)&set_num_threads)); + PyObject_SetAttrString(m, "get_num_threads", + PyLong_FromVoidPtr((void*)&get_num_threads)); + PyObject_SetAttrString(m, "get_thread_id", + PyLong_FromVoidPtr((void*)&get_thread_id)); return MOD_SUCCESS_VAL(m); } diff --git a/numba/np/ufunc/workqueue.c b/numba/np/ufunc/workqueue.c index 82956cc1bf9..87ecefba2d0 100644 --- a/numba/np/ufunc/workqueue.c +++ b/numba/np/ufunc/workqueue.c @@ -21,12 +21,16 @@ race conditions. #include #include #include +#include #define NUMBA_WINTHREAD #else /* PThread */ #include #include #include +#include +#include +#include #define NUMBA_PTHREAD #endif @@ -38,6 +42,16 @@ race conditions. #define _DEBUG 0 +/* workqueue is not threadsafe, so we use DSO globals to flag and update various + * states. + */ +/* This variable is the nesting level, it's incremented at the start of each + * parallel region and decremented at the end, if parallel regions are nested + * on entry the value == 1 and workqueue will abort (this in preference to just + * hanging or segfaulting). + */ +static int _nesting_level = 0; + /* As the thread-pool isn't inherited by children, free the task-queue, too. */ static void reset_after_fork(void); @@ -114,6 +128,12 @@ numba_new_thread(void *worker, void *arg) return (thread_pointer)th; } +static int +get_thread_id(void) +{ + return (int)pthread_self(); +} + #endif /* Win Thread */ @@ -199,6 +219,12 @@ numba_new_thread(void *worker, void *arg) return (thread_pointer)handle; } +static int +get_thread_id(void) +{ + return GetCurrentThreadId(); +} + #endif typedef struct Task @@ -239,14 +265,60 @@ queue_state_wait(Queue *queue, int old, int repl) void debug_marker(void); void debug_marker() {}; + +#ifdef _MSC_VER +#define THREAD_LOCAL(ty) __declspec(thread) ty +#else +/* Non-standard C99 extension that's understood by gcc and clang */ +#define THREAD_LOCAL(ty) __thread ty +#endif + +// This is the number of threads that is default, it is set on initialisation of +// the threading backend via the launch_threads() call +static int _INIT_NUM_THREADS = -1; + +// This is the per-thread thread mask, each thread can carry its own mask. +static THREAD_LOCAL(int) _TLS_num_threads = 0; + +static void +set_num_threads(int count) +{ + _TLS_num_threads = count; +} + +static int +get_num_threads(void) +{ + // This is purely to permit the implementation to survive to the point + // where it can exit cleanly as multiple threads cannot be used with this + // backend + if (_TLS_num_threads == 0) + { + // This is a thread that did not call launch_threads() but is still a + // "main" thread, probably from e.g. threading.Thread() use, it still + // has a TLS slot which is 0 from the lack of launch_threads() call + _TLS_num_threads = _INIT_NUM_THREADS; + } + return _TLS_num_threads; +} + + // this complies to a launchable function from `add_task` like: // add_task(nopfn, NULL, NULL, NULL, NULL) // useful if you want to limit the number of threads locally -void nopfn(void *args, void *dims, void *steps, void *data) {}; +// static void nopfn(void *args, void *dims, void *steps, void *data) {}; + + +// synchronize the TLS num_threads slot to value args[0] +static void sync_tls(void *args, void *dims, void *steps, void *data) { + int nthreads = *((int *)(args)); + _TLS_num_threads = nthreads; +}; + static void parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *data, - size_t inner_ndim, size_t array_count) + size_t inner_ndim, size_t array_count, int num_threads) { // args = , @@ -254,20 +326,36 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat // steps = // data = + // check the nesting level, if it's already 1, abort, workqueue cannot + // handle nesting. + if (_nesting_level >= 1){ + fprintf(stderr, "%s", "Terminating: Nested parallel kernel launch " + "detected, the workqueue threading layer does " + "not supported nested parallelism. Try the TBB " + "threading layer.\n"); + raise(SIGABRT); + return; + } + + // increment the nest level + _nesting_level += 1; + size_t * count_space = NULL; char ** array_arg_space = NULL; const size_t arg_len = (inner_ndim + 1); - size_t i, j, count, remain, total; + int i; // induction var for chunking, thread count unlikely to overflow int + size_t j, count, remain, total; ptrdiff_t offset; char * base; + int old_queue_count = -1; size_t step; debug_marker(); total = *((size_t *)dimensions); - count = total / NUM_THREADS; + count = total / num_threads; remain = total; if(_DEBUG) @@ -298,12 +386,24 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat } } - + // sync the thread pool TLS slots, sync all slots, we don't know which + // threads will end up running. for (i = 0; i < NUM_THREADS; i++) + { + add_task(sync_tls, (void *)(&num_threads), NULL, NULL, NULL); + } + ready(); + synchronize(); + + // This backend isn't threadsafe so just mutate the global + old_queue_count = queue_count; + queue_count = num_threads; + + for (i = 0; i < num_threads; i++) { count_space = (size_t *)alloca(sizeof(size_t) * arg_len); memcpy(count_space, dimensions, arg_len * sizeof(size_t)); - if(i == NUM_THREADS - 1) + if(i == num_threads - 1) { // Last thread takes all leftover count_space[0] = remain; @@ -316,7 +416,7 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat if(_DEBUG) { - printf("\n=================== THREAD %ld ===================\n", i); + printf("\n=================== THREAD %d ===================\n", i); printf("\ncount_space: "); for(j = 0; j < arg_len; j++) { @@ -357,6 +457,10 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat ready(); synchronize(); + + queue_count = old_queue_count; + // decrement the nest level + _nesting_level -= 1; } static void @@ -422,6 +526,8 @@ static void launch_threads(int count) queue_condition_init(&queues[i].cond); numba_new_thread(thread_worker, &queues[i]); } + + _INIT_NUM_THREADS = count; } } @@ -448,6 +554,8 @@ static void reset_after_fork(void) free(queues); queues = NULL; NUM_THREADS = -1; + _INIT_NUM_THREADS = -1; + _nesting_level = 0; } MOD_INIT(workqueue) @@ -471,6 +579,11 @@ MOD_INIT(workqueue) PyLong_FromVoidPtr(&do_scheduling_signed)); PyObject_SetAttrString(m, "do_scheduling_unsigned", PyLong_FromVoidPtr(&do_scheduling_unsigned)); - + PyObject_SetAttrString(m, "set_num_threads", + PyLong_FromVoidPtr((void*)&set_num_threads)); + PyObject_SetAttrString(m, "get_num_threads", + PyLong_FromVoidPtr((void*)&get_num_threads)); + PyObject_SetAttrString(m, "get_thread_id", + PyLong_FromVoidPtr((void*)&get_thread_id)); return MOD_SUCCESS_VAL(m); } diff --git a/numba/np/ufunc/workqueue.h b/numba/np/ufunc/workqueue.h index cfb805c55fd..80558a0e534 100644 --- a/numba/np/ufunc/workqueue.h +++ b/numba/np/ufunc/workqueue.h @@ -53,4 +53,13 @@ void ready(void); */ static void parallel_for(void *fn, char **args, size_t *dims, size_t *steps, void *data,\ - size_t inner_ndim, size_t array_count); + size_t inner_ndim, size_t array_count, int num_threads); + + +/* Masking API cf. OpenMP */ +static void +set_num_threads(int count); +static int +get_num_threads(void); +static int +get_thread_id(void); diff --git a/numba/parfors/parfor_lowering.py b/numba/parfors/parfor_lowering.py index 081b5bfeef1..85789c0184c 100644 --- a/numba/parfors/parfor_lowering.py +++ b/numba/parfors/parfor_lowering.py @@ -4,6 +4,8 @@ import linecache import os import sys +import operator + import numpy as np import types as pytypes import operator @@ -1328,11 +1330,24 @@ def load_range(v): do_scheduling = builder.module.get_or_insert_function(scheduling_fnty, name="do_scheduling_unsigned") + get_num_threads = builder.module.get_or_insert_function( + lc.Type.function(lc.Type.int(types.intp.bitwidth), []), + name="get_num_threads") + + num_threads = builder.call(get_num_threads, []) + + with cgutils.if_unlikely(builder, builder.icmp_signed('<=', num_threads, + num_threads.type(0))): + cgutils.printf(builder, "num_threads: %d\n", num_threads) + context.call_conv.return_user_exc(builder, RuntimeError, + ("Invalid number of threads. " + "This likely indicates a bug in Numba.",)) + builder.call( do_scheduling, [ context.get_constant( - types.uintp, num_dim), dim_starts, dim_stops, context.get_constant( - types.uintp, get_thread_count()), sched, context.get_constant( + types.uintp, num_dim), dim_starts, dim_stops, num_threads, + sched, context.get_constant( types.intp, debug_flag)]) # Get the LLVM vars for the Numba IR reduction array vars. @@ -1459,7 +1474,7 @@ def load_range(v): nshapes = len(sig_dim_dict) + 1 shapes = cgutils.alloca_once(builder, intp_t, size=nshapes, name="pshape") # For now, outer loop size is the same as number of threads - builder.store(context.get_constant(types.intp, get_thread_count()), shapes) + builder.store(num_threads, shapes) # Individual shape variables go next i = 1 for dim_sym in occurances: diff --git a/numba/tests/npyufunc/test_parallel_env_variable.py b/numba/tests/npyufunc/test_parallel_env_variable.py index 8cf58eaae23..7d11692ad34 100644 --- a/numba/tests/npyufunc/test_parallel_env_variable.py +++ b/numba/tests/npyufunc/test_parallel_env_variable.py @@ -17,15 +17,20 @@ def test_num_threads_variable(self): Tests the NUMBA_NUM_THREADS env variable behaves as expected. """ key = 'NUMBA_NUM_THREADS' - current = str(getattr(env, key, config.NUMBA_DEFAULT_NUM_THREADS)) + current = str(getattr(env, key, config.NUMBA_NUM_THREADS)) threads = "3154" env[key] = threads - config.reload_config() try: + config.reload_config() + except RuntimeError as e: + # This test should fail if threads have already been launched + self.assertIn("Cannot set NUMBA_NUM_THREADS", e.args[0]) + else: self.assertEqual(threads, str(get_thread_count())) self.assertEqual(threads, str(config.NUMBA_NUM_THREADS)) finally: - # reset the env variable/set to default + # reset the env variable/set to default. Should not fail even if + # threads are launched because the value is the same. env[key] = current config.reload_config() diff --git a/numba/tests/test_num_threads.py b/numba/tests/test_num_threads.py new file mode 100644 index 00000000000..28651a31c28 --- /dev/null +++ b/numba/tests/test_num_threads.py @@ -0,0 +1,600 @@ +# -*- coding: utf-8 -*- +from __future__ import print_function, absolute_import, division + +import sys +import os +import re +import multiprocessing +import unittest + +import numpy as np + +from numba import (njit, set_num_threads, get_num_threads, prange, config, + threading_layer, guvectorize) +from numba.np.ufunc.parallel import _get_thread_id +from numba.core.errors import TypingError +from numba.tests.support import TestCase, skip_parfors_unsupported, tag +from numba.tests.test_parallel_backend import TestInSubprocess + + +class TestNumThreads(TestCase): + _numba_parallel_test_ = False + + def setUp(self): + # Make sure the num_threads is set to the max. This also makes sure + # the threads are launched. + set_num_threads(config.NUMBA_NUM_THREADS) + + def check_mask(self, expected, result): + # There's no guarantee that TBB will use a full mask worth of + # threads if it deems it inefficient to do so + if threading_layer() == 'tbb': + self.assertTrue(np.all(result <= expected)) + elif threading_layer() in ('omp', 'workqueue'): + np.testing.assert_equal(expected, result) + else: + assert 0, 'unreachable' + + @skip_parfors_unsupported + def test_set_num_threads_type(self): + + @njit + def foo(): + set_num_threads('wrong_type') + + expected = "The number of threads specified must be an integer" + for fn, errty in ((foo, TypingError), (foo.py_func, TypeError)): + with self.assertRaises(errty) as raises: + fn() + self.assertIn(expected, str(raises.exception)) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_set_num_threads_basic(self): + max_threads = config.NUMBA_NUM_THREADS + + self.assertEqual(get_num_threads(), max_threads) + set_num_threads(2) + self.assertEqual(get_num_threads(), 2) + set_num_threads(max_threads) + self.assertEqual(get_num_threads(), max_threads) + + with self.assertRaises(ValueError): + set_num_threads(0) + + with self.assertRaises(ValueError): + set_num_threads(max_threads + 1) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_set_num_threads_basic_jit(self): + max_threads = config.NUMBA_NUM_THREADS + + @njit + def get_n(): + return get_num_threads() + + self.assertEqual(get_n(), max_threads) + set_num_threads(2) + self.assertEqual(get_n(), 2) + set_num_threads(max_threads) + self.assertEqual(get_n(), max_threads) + + @njit + def set_get_n(n): + set_num_threads(n) + return get_num_threads() + + self.assertEqual(set_get_n(2), 2) + self.assertEqual(set_get_n(max_threads), max_threads) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_set_num_threads_basic_guvectorize(self): + max_threads = config.NUMBA_NUM_THREADS + + @guvectorize(['void(int64[:])'], + '(n)', + nopython=True, + target='parallel') + def get_n(x): + x[:] = get_num_threads() + + x = np.zeros((5000000,), dtype=np.int64) + get_n(x) + np.testing.assert_equal(x, max_threads) + set_num_threads(2) + x = np.zeros((5000000,), dtype=np.int64) + get_n(x) + np.testing.assert_equal(x, 2) + set_num_threads(max_threads) + x = np.zeros((5000000,), dtype=np.int64) + get_n(x) + np.testing.assert_equal(x, max_threads) + + @guvectorize(['void(int64[:])'], + '(n)', + nopython=True, + target='parallel') + def set_get_n(n): + set_num_threads(n[0]) + n[:] = get_num_threads() + + x = np.zeros((5000000,), dtype=np.int64) + x[0] = 2 + set_get_n(x) + np.testing.assert_equal(x, 2) + x = np.zeros((5000000,), dtype=np.int64) + x[0] = max_threads + set_get_n(x) + np.testing.assert_equal(x, max_threads) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_set_num_threads_outside_jit(self): + + # Test set_num_threads outside a jitted function + set_num_threads(2) + + @njit(parallel=True) + def test_func(): + x = 5 + buf = np.empty((x,)) + for i in prange(x): + buf[i] = get_num_threads() + return buf + + @guvectorize(['void(int64[:])'], + '(n)', + nopython=True, + target='parallel') + def test_gufunc(x): + x[:] = get_num_threads() + + out = test_func() + np.testing.assert_equal(out, 2) + + x = np.zeros((5000000,), dtype=np.int64) + test_gufunc(x) + np.testing.assert_equal(x, 2) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_set_num_threads_inside_jit(self): + # Test set_num_threads inside a jitted function + @njit(parallel=True) + def test_func(nthreads): + x = 5 + buf = np.empty((x,)) + set_num_threads(nthreads) + for i in prange(x): + buf[i] = get_num_threads() + return buf + + mask = 2 + out = test_func(mask) + np.testing.assert_equal(out, mask) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_set_num_threads_inside_guvectorize(self): + # Test set_num_threads inside a jitted guvectorize function + @guvectorize(['void(int64[:])'], + '(n)', + nopython=True, + target='parallel') + def test_func(x): + set_num_threads(x[0]) + x[:] = get_num_threads() + + x = np.zeros((5000000,), dtype=np.int64) + mask = 2 + x[0] = mask + test_func(x) + np.testing.assert_equal(x, mask) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_get_num_threads_truth_outside_jit(self): + + for mask in range(2, min(6, config.NUMBA_NUM_THREADS + 1)): + set_num_threads(mask) + + # a lot of work, hopefully will trigger "mask" count of threads to + # join the parallel region (for those backends with dynamic threads) + @njit(parallel=True) + def test_func(): + x = 5000000 + buf = np.empty((x,)) + for i in prange(x): + buf[i] = _get_thread_id() + return len(np.unique(buf)), get_num_threads() + + out = test_func() + self.check_mask((mask, mask), out) + + @guvectorize(['void(int64[:], int64[:])'], + '(n), (m)', + nopython=True, + target='parallel') + def test_gufunc(x, out): + x[:] = _get_thread_id() + out[0] = get_num_threads() + + # Reshape to force parallelism + x = np.full((5000000,), -1, dtype=np.int64).reshape((100, 50000)) + out = np.zeros((1,), dtype=np.int64) + test_gufunc(x, out) + self.check_mask(mask, out) + self.check_mask(mask, len(np.unique(x))) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_get_num_threads_truth_inside_jit(self): + + for mask in range(2, min(6, config.NUMBA_NUM_THREADS + 1)): + + # a lot of work, hopefully will trigger "mask" count of threads to + # join the parallel region (for those backends with dynamic threads) + @njit(parallel=True) + def test_func(): + set_num_threads(mask) + x = 5000000 + buf = np.empty((x,)) + for i in prange(x): + buf[i] = _get_thread_id() + return len(np.unique(buf)), get_num_threads() + + out = test_func() + self.check_mask((mask, mask), out) + + @guvectorize(['void(int64[:], int64[:])'], + '(n), (m)', + nopython=True, + target='parallel') + def test_gufunc(x, out): + set_num_threads(mask) + x[:] = _get_thread_id() + out[0] = get_num_threads() + + # Reshape to force parallelism + x = np.full((5000000,), -1, dtype=np.int64).reshape((100, 50000)) + out = np.zeros((1,), dtype=np.int64) + test_gufunc(x, out) + self.check_mask(mask, out) + self.check_mask(mask, len(np.unique(x))) + + # this test can only run on OpenMP (providing OMP_MAX_ACTIVE_LEVELS is not + # set or >= 2) and TBB backends + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_nested_parallelism_1(self): + if threading_layer() == 'workqueue': + self.skipTest("workqueue is not threadsafe") + + # check that get_num_threads is ok in nesting + mask = config.NUMBA_NUM_THREADS - 1 + + N = config.NUMBA_NUM_THREADS + M = 2 * config.NUMBA_NUM_THREADS + + @njit(parallel=True) + def child_func(buf, fid): + M, N = buf.shape + for i in prange(N): + buf[fid, i] = get_num_threads() + + def get_test(test_type): + if test_type == 'njit': + def test_func(nthreads, py_func=False): + @njit(parallel=True) + def _test_func(nthreads): + acc = 0 + buf = np.zeros((M, N)) + set_num_threads(nthreads) + for i in prange(M): + local_mask = 1 + i % mask + # set threads in parent function + set_num_threads(local_mask) + if local_mask < N: + child_func(buf, local_mask) + acc += get_num_threads() + return acc, buf + if py_func: + return _test_func.py_func(nthreads) + else: + return _test_func(nthreads) + + elif test_type == 'guvectorize': + def test_func(nthreads, py_func=False): + def _test_func(acc, buf, local_mask): + set_num_threads(nthreads) + # set threads in parent function + set_num_threads(local_mask[0]) + if local_mask[0] < N: + child_func(buf, local_mask[0]) + acc[0] += get_num_threads() + + buf = np.zeros((M, N), dtype=np.int64) + acc = np.zeros((M, 1), dtype=np.int64) + local_mask = (1 + np.arange(M) % mask).reshape((M, 1)) + sig = ['void(int64[:], int64[:, :], int64[:])'] + layout = '(p), (n, m), (p)' + if not py_func: + _test_func = guvectorize(sig, layout, nopython=True, + target='parallel')(_test_func) + else: + _test_func = guvectorize(sig, layout, + forceobj=True)(_test_func) + _test_func(acc, buf, local_mask) + return acc, buf + + return test_func + + for test_type in ['njit', 'guvectorize']: + test_func = get_test(test_type) + got_acc, got_arr = test_func(mask) + exp_acc, exp_arr = test_func(mask, py_func=True) + np.testing.assert_equal(exp_acc, got_acc) + np.testing.assert_equal(exp_arr, got_arr) + + # check the maths reconciles, guvectorize does not reduce, njit does + math_acc_exp = 1 + np.arange(M) % mask + if test_type == 'guvectorize': + math_acc = math_acc_exp.reshape((M, 1)) + else: + math_acc = np.sum(math_acc_exp) + + np.testing.assert_equal(math_acc, got_acc) + + math_arr = np.zeros((M, N)) + for i in range(1, N): + # there's branches on 1, ..., num_threads - 1 + math_arr[i, :] = i + np.testing.assert_equal(math_arr, got_arr) + + # this test can only run on OpenMP (providing OMP_MAX_ACTIVE_LEVELS is not + # set or >= 2) and TBB backends + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + def _test_nested_parallelism_2(self): + if threading_layer() == 'workqueue': + self.skipTest("workqueue is not threadsafe") + + # check that get_num_threads is ok in nesting + + N = config.NUMBA_NUM_THREADS + 1 + M = 4 * config.NUMBA_NUM_THREADS + 1 + + def get_impl(child_type, test_type): + + if child_type == 'parallel': + child_dec = njit(parallel=True) + elif child_type == 'njit': + child_dec = njit(parallel=False) + elif child_type == 'none': + def child_dec(x): + return x + + @child_dec + def child(buf, fid): + M, N = buf.shape + set_num_threads(fid) # set threads in child function + for i in prange(N): + buf[fid, i] = get_num_threads() + + if test_type in ['parallel', 'njit', 'none']: + if test_type == 'parallel': + test_dec = njit(parallel=True) + elif test_type == 'njit': + test_dec = njit(parallel=False) + elif test_type == 'none': + def test_dec(x): + return x + + @test_dec + def test_func(nthreads): + buf = np.zeros((M, N)) + set_num_threads(nthreads) + for i in prange(M): + local_mask = 1 + i % mask + # when the threads exit the child functions they should + # have a TLS slot value of the local mask as it was set + # in child + if local_mask < config.NUMBA_NUM_THREADS: + child(buf, local_mask) + assert get_num_threads() == local_mask + return buf + else: + if test_type == 'guvectorize': + test_dec = guvectorize(['int64[:,:], int64[:]'], + '(n, m), (k)', nopython=True, + target='parallel') + elif test_type == 'guvectorize-obj': + test_dec = guvectorize(['int64[:,:], int64[:]'], + '(n, m), (k)', forceobj=True) + + def test_func(nthreads): + @test_dec + def _test_func(buf, local_mask): + set_num_threads(nthreads) + # when the threads exit the child functions they should + # have a TLS slot value of the local mask as it was set + # in child + if local_mask[0] < config.NUMBA_NUM_THREADS: + child(buf, local_mask[0]) + assert get_num_threads() == local_mask[0] + + buf = np.zeros((M, N), dtype=np.int64) + local_mask = (1 + np.arange(M) % mask).reshape((M, 1)) + _test_func(buf, local_mask) + return buf + + return test_func + + mask = config.NUMBA_NUM_THREADS - 1 + + res_arrays = {} + for test_type in ['parallel', 'njit', 'none', + 'guvectorize', 'guvectorize-obj']: + for child_type in ['parallel', 'njit', 'none']: + if child_type == 'none' and test_type != 'none': + continue + set_num_threads(mask) + res_arrays[test_type, child_type] = get_impl( + child_type, test_type)(mask) + + py_arr = res_arrays['none', 'none'] + for arr in res_arrays.values(): + np.testing.assert_equal(arr, py_arr) + + # check the maths reconciles + math_arr = np.zeros((M, N)) + # there's branches on modulo mask but only NUMBA_NUM_THREADS funcs + for i in range(1, config.NUMBA_NUM_THREADS): + math_arr[i, :] = i + + np.testing.assert_equal(math_arr, py_arr) + + # this test can only run on OpenMP (providing OMP_MAX_ACTIVE_LEVELS is not + # set or >= 2) and TBB backends + # This test needs at least 3 threads to run, N>=2 for the launch, M>=N+1 for + # the nested function + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 3, "Not enough CPU cores") + def _test_nested_parallelism_3(self): + if threading_layer() == 'workqueue': + self.skipTest("workqueue is not threadsafe") + + # check that the right number of threads are present in nesting + # this relies on there being a load of cores present + BIG = 1000000 + + @njit(parallel=True) + def work(local_nt): # arg is value 3 + tid = np.zeros(BIG) + acc = 0 + set_num_threads(local_nt) # set to 3 threads + for i in prange(BIG): + acc += 1 + tid[i] = _get_thread_id() + return acc, np.unique(tid) + + @njit(parallel=True) + def test_func_jit(nthreads): + set_num_threads(nthreads) # set to 2 threads + lens = np.zeros(nthreads) + total = 0 + for i in prange(nthreads): + my_acc, tids = work(nthreads + 1) # call with value 3 + lens[i] = len(tids) + total += my_acc + return total, np.unique(lens) + + NT = 2 + expected_acc = BIG * NT + expected_thread_count = NT + 1 + + got_acc, got_tc = test_func_jit(NT) + self.assertEqual(expected_acc, got_acc) + self.check_mask(expected_thread_count, got_tc) + + def test_guvectorize(nthreads): + @guvectorize(['int64[:], int64[:]'], + '(n), (n)', + nopython=True, + target='parallel') + def test_func_guvectorize(total, lens): + my_acc, tids = work(nthreads + 1) + lens[0] = len(tids) + total[0] += my_acc + + total = np.zeros((nthreads, 1), dtype=np.int64) + lens = np.zeros(nthreads, dtype=np.int64).reshape((nthreads, 1)) + + test_func_guvectorize(total, lens) + # vectorize does not reduce, so total is summed + return total.sum(), np.unique(lens) + + got_acc, got_tc = test_guvectorize(NT) + + self.assertEqual(expected_acc, got_acc) + self.check_mask(expected_thread_count, got_tc) + + @skip_parfors_unsupported + @unittest.skipIf(config.NUMBA_NUM_THREADS < 2, "Not enough CPU cores") + @unittest.skipIf(not sys.platform.startswith('linux'), "Linux only") + def _test_threadmask_across_fork(self): + forkctx = multiprocessing.get_context('fork') + @njit + def foo(): + return get_num_threads() + + def wrap(queue): + queue.put(foo()) + + mask = 1 + self.assertEqual(foo(), config.NUMBA_NUM_THREADS) + set_num_threads(mask) + self.assertEqual(foo(), mask) + shared_queue = forkctx.Queue() + # check TLS slot inheritance in fork + p = forkctx.Process(target=wrap, args=(shared_queue,)) + p.start() + p.join() + self.assertEqual(shared_queue.get(), mask) + + def tearDown(self): + set_num_threads(config.NUMBA_NUM_THREADS) + + +class TestNumThreadsBackends(TestInSubprocess, TestCase): + _class = TestNumThreads + _DEBUG = False + + # 1 is mainly here to ensure tests skip correctly + num_threads = [i for i in [1, 2, 4, 8, 16] if i <= config.NUMBA_NUM_THREADS] + + def run_test_in_separate_process(self, test, threading_layer, num_threads): + env_copy = os.environ.copy() + env_copy['NUMBA_THREADING_LAYER'] = str(threading_layer) + env_copy['NUMBA_NUM_THREADS'] = str(num_threads) + cmdline = [sys.executable, "-m", "numba.runtests", "-v", test] + return self.run_cmd(cmdline, env_copy) + + @classmethod + def _inject(cls, name, backend, backend_guard, num_threads): + themod = cls.__module__ + thecls = cls._class.__name__ + injected_method = '%s.%s.%s' % (themod, thecls, name) + + def test_template(self): + o, e = self.run_test_in_separate_process(injected_method, backend, + num_threads) + if self._DEBUG: + print('stdout:\n "%s"\n stderr:\n "%s"' % (o, e)) + self.assertIn('OK', e) + self.assertTrue('FAIL' not in e) + self.assertTrue('ERROR' not in e) + m = re.search(r"\.\.\. skipped '(.*?)'", e) + if m: + self.skipTest(m.group(1)) + + injected_test = "%s_%s_%s_threads" % (name[1:], backend, num_threads) + setattr(cls, injected_test, + tag('long_running')(backend_guard(test_template))) + + @classmethod + def generate(cls): + for name in cls._class.__dict__.copy(): + for backend, backend_guard in cls.backends.items(): + for num_threads in cls.num_threads: + if not name.startswith('_test_'): + continue + cls._inject(name, backend, backend_guard, num_threads) + + +TestNumThreadsBackends.generate() + +if __name__ == '__main__': + unittest.main() diff --git a/numba/tests/test_parallel_backend.py b/numba/tests/test_parallel_backend.py index 96c233c1703..34d99c0e933 100644 --- a/numba/tests/test_parallel_backend.py +++ b/numba/tests/test_parallel_backend.py @@ -10,18 +10,18 @@ import subprocess import sys import threading +import unittest import numpy as np -from numba import jit, vectorize, guvectorize - +from numba import jit, vectorize, guvectorize, set_num_threads from numba.tests.support import (temp_directory, override_config, TestCase, tag, skip_parfors_unsupported, linux_only) import queue as t_queue from numba.testing.main import _TIMEOUT as _RUNNER_TIMEOUT from numba.core import config -import unittest + _TEST_TIMEOUT = _RUNNER_TIMEOUT - 60. @@ -29,6 +29,9 @@ # Check which backends are available # TODO: Put this in a subprocess so the address space is kept clean try: + # Check it's a compatible TBB before loading it + from numba.np.ufunc.parallel import _check_tbb_version_compatible + _check_tbb_version_compatible() from numba.np.ufunc import tbbpool # noqa: F401 _HAVE_TBB_POOL = True except ImportError: @@ -101,6 +104,19 @@ def __call__(self): np.testing.assert_allclose(expected, got) +class mask_runner(object): + def __init__(self, runner, mask, **options): + self.runner = runner + self.mask = mask + + def __call__(self): + if self.mask: + # Tests are all run in isolated subprocesses, so we + # don't have to worry about this affecting other tests + set_num_threads(self.mask) + self.runner() + + class linalg_runner(runnable): def __call__(self): @@ -225,6 +241,17 @@ class TestParallelBackendBase(TestCase): ] all_impls.extend(parfor_impls) + if config.NUMBA_NUM_THREADS < 2: + # Not enough cores + masks = [] + else: + masks = [1, 2] + + mask_impls = [] + for impl in all_impls: + for mask in masks: + mask_impls.append(mask_runner(impl, mask)) + parallelism = ['threading', 'random'] parallelism.append('multiprocessing_spawn') if _HAVE_OS_FORK: @@ -242,6 +269,7 @@ class TestParallelBackendBase(TestCase): guvectorize_runner(nopython=True, target='parallel'), ], 'concurrent_mix_use': all_impls, + 'concurrent_mix_use_masks': mask_impls, } safe_backends = {'omp', 'tbb'} @@ -313,17 +341,7 @@ def test_method(self): TestParallelBackend.generate() -class TestSpecificBackend(TestParallelBackendBase): - """ - This is quite contrived, for each test in the TestParallelBackend tests it - generates a test that will run the TestParallelBackend test in a new python - process with an environment modified to ensure a specific threadsafe backend - is used. This is with view of testing the backends independently and in an - isolated manner such that if they hang/crash/have issues, it doesn't kill - the test suite. - """ - _DEBUG = False - +class TestInSubprocess(object): backends = {'tbb': skip_no_tbb, 'omp': skip_no_omp, 'workqueue': unittest.skipIf(False, '')} @@ -353,6 +371,18 @@ def run_test_in_separate_process(self, test, threading_layer): cmdline = [sys.executable, "-m", "numba.runtests", test] return self.run_cmd(cmdline, env_copy) + +class TestSpecificBackend(TestInSubprocess, TestParallelBackendBase): + """ + This is quite contrived, for each test in the TestParallelBackend tests it + generates a test that will run the TestParallelBackend test in a new python + process with an environment modified to ensure a specific threadsafe backend + is used. This is with view of testing the backends independently and in an + isolated manner such that if they hang/crash/have issues, it doesn't kill + the test suite. + """ + _DEBUG = False + @classmethod def _inject(cls, p, name, backend, backend_guard): themod = cls.__module__ @@ -552,6 +582,46 @@ def foo(n): print(out, err) self.assertIn("@tbb@", out) + def test_workqueue_aborts_on_nested_parallelism(self): + """ + Tests workqueue raises sigabrt if a nested parallel call is performed + """ + runme = """if 1: + from numba import njit, prange + import numpy as np + + @njit(parallel=True) + def nested(x): + for i in prange(len(x)): + x[i] += 1 + + + @njit(parallel=True) + def main(): + Z = np.zeros((5, 10)) + for i in prange(Z.shape[0]): + nested(Z[i]) + return Z + + main() + """ + cmdline = [sys.executable, '-c', runme] + env = os.environ.copy() + env['NUMBA_THREADING_LAYER'] = "workqueue" + env['NUMBA_NUM_THREADS'] = "4" + + try: + out, err = self.run_cmd(cmdline, env=env) + except AssertionError as e: + if self._DEBUG: + print(out, err) + e_msg = str(e) + self.assertIn("failed with code", e_msg) + # raised a SIGABRT, but the value is platform specific so just check + # the error message + self.assertIn("Terminating: Nested parallel kernel launch detected", + e_msg) + # 32bit or windows py27 (not that this runs on windows) @skip_parfors_unsupported @@ -828,5 +898,25 @@ def run_cmd(cmdline): print("ERR:", err) +@skip_parfors_unsupported +@skip_no_omp +class TestOpenMPVendors(TestCase): + + def test_vendors(self): + """ + Checks the OpenMP vendor strings are correct + """ + expected = dict() + expected['win32'] = "MS" + expected['darwin'] = "Intel" + expected['linux'] = "GNU" + + # only check OS that are supported, custom toolchains may well work as + # may other OS + for k in expected.keys(): + if sys.platform.startswith(k): + self.assertEqual(expected[k], omppool.openmp_vendor) + + if __name__ == '__main__': unittest.main() diff --git a/numba/tests/test_parfors.py b/numba/tests/test_parfors.py index 58efa955dc9..fcc52e004b7 100644 --- a/numba/tests/test_parfors.py +++ b/numba/tests/test_parfors.py @@ -19,7 +19,7 @@ from collections import defaultdict import numba.parfors.parfor -from numba import njit, prange +from numba import njit, prange, set_num_threads, get_num_threads from numba.core import (types, utils, typing, errors, ir, rewrites, typed_passes, inline_closurecall, config, compiler, cpu) from numba.extending import (overload_method, register_model, @@ -2419,20 +2419,18 @@ class TestParforsVectorizer(TestPrangeBase): def get_gufunc_asm(self, func, schedule_type, *args, **kwargs): fastmath = kwargs.pop('fastmath', False) - nthreads = kwargs.pop('nthreads', 2) cpu_name = kwargs.pop('cpu_name', 'skylake-avx512') assertions = kwargs.pop('assertions', True) env_opts = {'NUMBA_CPU_NAME': cpu_name, 'NUMBA_CPU_FEATURES': '', - 'NUMBA_NUM_THREADS': str(nthreads) } overrides = [] for k, v in env_opts.items(): overrides.append(override_env_config(k, v)) - with overrides[0], overrides[1], overrides[2]: + with overrides[0], overrides[1]: sig = tuple([numba.typeof(x) for x in args]) pfunc_vectorizable = self.generate_prange_func(func, None) if fastmath == True: @@ -2450,7 +2448,7 @@ def get_gufunc_asm(self, func, schedule_type, *args, **kwargs): self.assertEqual(matches[0], schedule_type) self.assertTrue(asm != {}) - return asm + return asm # this is a common match pattern for something like: # \n\tvsqrtpd\t-192(%rbx,%rsi,8), %zmm0\n diff --git a/setup.py b/setup.py index b9a847fea94..32b3d96fd06 100644 --- a/setup.py +++ b/setup.py @@ -145,10 +145,16 @@ def get_ext_modules(): ext_np_ufunc = Extension(name="numba.np.ufunc._internal", sources=["numba/np/ufunc/_internal.c"], depends=["numba/np/ufunc/_ufunc.c", - "numba/np/ufunc/_internal.h", - "numba/_pymodule.h"], + "numba/np/ufunc/_internal.h", + "numba/_pymodule.h"], **np_compile_args) + ext_npyufunc_num_threads = Extension(name="numba.np.ufunc._num_threads", + sources=[ + "numba/np/ufunc/_num_threads.c"], + depends=["numba/_pymodule.h"], + ) + ext_np_ufunc_backends = [] def check_file_at_path(path2file): @@ -290,8 +296,8 @@ def check_file_at_path(path2file): include_dirs=["numba"]) ext_modules = [ext_dynfunc, ext_dispatcher, ext_helperlib, ext_typeconv, - ext_np_ufunc, ext_mviewbuf, ext_nrt_python, - ext_jitclass_box, ext_cuda_extras] + ext_np_ufunc, ext_npyufunc_num_threads, ext_mviewbuf, + ext_nrt_python, ext_jitclass_box, ext_cuda_extras] ext_modules += ext_np_ufunc_backends