diff --git a/docs/source/user/threading-layer.rst b/docs/source/user/threading-layer.rst index 0f69b1f5a03..b4de40eb07f 100644 --- a/docs/source/user/threading-layer.rst +++ b/docs/source/user/threading-layer.rst @@ -163,6 +163,13 @@ system level libraries, some additional things to note: program that is using the ``omp`` threading layer, a detection mechanism is present that will try and gracefully terminate the forked child and print an error message to ``STDERR``. +* On systems with the ``fork(2)`` system call available, if the TBB backed + threading layer is in use and a ``fork`` call is made from a thread other than + the thread that launched TBB (typically the main thread) then this results in + undefined behaviour and a warning will be displayed on ``STDERR``. As + ``spawn`` is essentially ``fork`` followed by ``exec`` it is safe to ``spawn`` + from a non-main thread, but as this cannot be differentiated from just a + ``fork`` call the warning message will still be displayed. * On OSX, the ``intel-openmp`` package is required to enable the OpenMP based threading layer. diff --git a/numba/np/ufunc/tbbpool.cpp b/numba/np/ufunc/tbbpool.cpp index faff4790fb7..83389223dfa 100644 --- a/numba/np/ufunc/tbbpool.cpp +++ b/numba/np/ufunc/tbbpool.cpp @@ -15,6 +15,7 @@ Implement parallel vectorize workqueue on top of Intel TBB. #include #include #include +#include #include "workqueue.h" #include "gufunc_scheduler.h" @@ -30,9 +31,6 @@ Implement parallel vectorize workqueue on top of Intel TBB. #error "TBB version is too old, 2019 update 5, i.e. TBB_INTERFACE_VERSION >= 11005 required" #endif -#define TSI_INIT(count) tbb::task_scheduler_init(count) -#define TSI_TERMINATE(tsi) tsi->blocking_terminate(std::nothrow) - #define _DEBUG 0 #define _TRACE_SPLIT 0 @@ -202,12 +200,25 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat }); } -void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * ) +static std::thread::id init_thread_id; +static THREAD_LOCAL(bool) need_reinit_after_fork = false; + +static void set_main_thread() +{ + init_thread_id = std::this_thread::get_id(); +} + +static bool is_main_thread() +{ + return std::this_thread::get_id() == init_thread_id; +} + +static void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * ) { tbb::internal::runtime_warning("Unable to wait for threads to shut down before fork(). It can break multithreading in child process\n"); } -void ignore_assertion( const char*, int, const char*, const char * ) {} +static void ignore_assertion( const char*, int, const char*, const char * ) {} static void prepare_fork(void) { @@ -217,9 +228,20 @@ static void prepare_fork(void) } if(tsi) { - assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion); - TSI_TERMINATE(tsi); - tbb::set_assertion_handler(orig); + if(is_main_thread()) + { + // TBB thread termination must always be called from same thread that called initialize + assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion); + tsi->blocking_terminate(std::nothrow); + tbb::set_assertion_handler(orig); + need_reinit_after_fork = true; + } + else + { + fprintf(stderr, "Numba: Attempted to fork from a non-main thread, " + "the TBB library may be in an invalid state in the " + "child process.\n"); + } } } @@ -229,8 +251,12 @@ static void reset_after_fork(void) { puts("Resuming TBB: after fork"); } - if(tsi) + if(tsi && need_reinit_after_fork) + { tsi->initialize(tsi_count); + set_main_thread(); + need_reinit_after_fork = false; + } } #if PY_MAJOR_VERSION >= 3 @@ -262,12 +288,15 @@ static void launch_threads(int count) puts("Using TBB"); if(count < 1) count = tbb::task_scheduler_init::automatic; - tsi = new TSI_INIT(tsi_count = count); + tsi_count = count; + tsi = new tbb::task_scheduler_init(count); tg = new tbb::task_group; tg->run([] {}); // start creating threads asynchronously _INIT_NUM_THREADS = count; + set_main_thread(); + #ifndef _MSC_VER pthread_atfork(prepare_fork, reset_after_fork, reset_after_fork); #endif diff --git a/numba/tests/test_parallel_backend.py b/numba/tests/test_parallel_backend.py index d3e796e2914..5d213c4a83a 100644 --- a/numba/tests/test_parallel_backend.py +++ b/numba/tests/test_parallel_backend.py @@ -857,6 +857,74 @@ def test_serial_parent_explicit_mp_fork_par_child_then_par_parent(self): print(out, err) +@skip_parfors_unsupported +@skip_no_tbb +class TestTBBSpecificIssues(ThreadLayerTestHelper): + + _DEBUG = False + + @linux_only # os.fork required. + def test_fork_from_non_main_thread(self): + # See issue #5973 and PR #6208 for context. + runme = """if 1: + import threading + import numba + numba.config.THREADING_LAYER='tbb' + from numba import njit, prange, objmode + import os + + e_running = threading.Event() + e_proceed = threading.Event() + + def indirect(): + e_running.set() + # wait for forker() to have forked + while not e_proceed.isSet(): + pass + + def runner(): + @njit(parallel=True, nogil=True) + def work(): + acc = 0 + for x in prange(10): + acc += x + with objmode(): + indirect() + + return acc + + work() + + def forker(): + # wait for the jit function to say it's running + while not e_running.isSet(): + pass + # then fork + os.fork() + # now fork is done signal the runner to proceed to exit + e_proceed.set() + + numba_runner = threading.Thread(target=runner,) + fork_runner = threading.Thread(target=forker,) + + threads = (numba_runner, fork_runner) + for t in threads: + t.start() + for t in threads: + t.join() + """ + + cmdline = [sys.executable, '-c', runme] + out, err = self.run_cmd(cmdline) + # assert error message printed on stderr + msg_head = "Attempted to fork from a non-main thread, the TBB library" + self.assertIn(msg_head, err) + + if self._DEBUG: + print("OUT:", out) + print("ERR:", err) + + @skip_parfors_unsupported class TestInitSafetyIssues(TestCase):