From 9e449bd592841bd60be0167d1062ff087344d6f1 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Wed, 2 Sep 2020 17:24:13 +0300 Subject: [PATCH 1/7] Do not try to terminate TBB if fork was called not from main thread --- numba/np/ufunc/tbbpool.cpp | 47 ++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/numba/np/ufunc/tbbpool.cpp b/numba/np/ufunc/tbbpool.cpp index faff4790fb7..a068aba8de7 100644 --- a/numba/np/ufunc/tbbpool.cpp +++ b/numba/np/ufunc/tbbpool.cpp @@ -15,6 +15,7 @@ Implement parallel vectorize workqueue on top of Intel TBB. #include #include #include +#include #include "workqueue.h" #include "gufunc_scheduler.h" @@ -30,9 +31,6 @@ Implement parallel vectorize workqueue on top of Intel TBB. #error "TBB version is too old, 2019 update 5, i.e. TBB_INTERFACE_VERSION >= 11005 required" #endif -#define TSI_INIT(count) tbb::task_scheduler_init(count) -#define TSI_TERMINATE(tsi) tsi->blocking_terminate(std::nothrow) - #define _DEBUG 0 #define _TRACE_SPLIT 0 @@ -202,12 +200,25 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat }); } -void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * ) +static std::thread::id init_thread_id; +static THREAD_LOCAL(bool) need_reinit_after_fork = false; + +static void set_main_thread() +{ + init_thread_id = std::this_thread::get_id(); +} + +static bool is_main_thread() +{ + return std::this_thread::get_id() == init_thread_id; +} + +static void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * ) { tbb::internal::runtime_warning("Unable to wait for threads to shut down before fork(). It can break multithreading in child process\n"); } -void ignore_assertion( const char*, int, const char*, const char * ) {} +static void ignore_assertion( const char*, int, const char*, const char * ) {} static void prepare_fork(void) { @@ -217,9 +228,18 @@ static void prepare_fork(void) } if(tsi) { - assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion); - TSI_TERMINATE(tsi); - tbb::set_assertion_handler(orig); + if(is_main_thread()) + { + // TBB thread termination must always be called from same thread that called initialize + assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion); + tsi->blocking_terminate(std::nothrow); + tbb::set_assertion_handler(orig); + need_reinit_after_fork = true; + } + else if (_DEBUG) + { + puts("NUMBA: Attempt to fork from non-main thread, TBB may be in incorrect state in child processes"); + } } } @@ -229,8 +249,12 @@ static void reset_after_fork(void) { puts("Resuming TBB: after fork"); } - if(tsi) + if(tsi && need_reinit_after_fork) + { tsi->initialize(tsi_count); + set_main_thread(); + need_reinit_after_fork = false; + } } #if PY_MAJOR_VERSION >= 3 @@ -262,12 +286,15 @@ static void launch_threads(int count) puts("Using TBB"); if(count < 1) count = tbb::task_scheduler_init::automatic; - tsi = new TSI_INIT(tsi_count = count); + tsi_count = count; + tsi = new tbb::task_scheduler_init(count); tg = new tbb::task_group; tg->run([] {}); // start creating threads asynchronously _INIT_NUM_THREADS = count; + set_main_thread(); + #ifndef _MSC_VER pthread_atfork(prepare_fork, reset_after_fork, reset_after_fork); #endif From 9748b9684c545f7f9152ca72321c325f52ef70d3 Mon Sep 17 00:00:00 2001 From: Stuart Archibald Date: Fri, 18 Sep 2020 08:53:32 +0100 Subject: [PATCH 2/7] Print error to stderr by default and add test. As title. --- numba/np/ufunc/tbbpool.cpp | 6 ++- numba/tests/test_parallel_backend.py | 63 ++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/numba/np/ufunc/tbbpool.cpp b/numba/np/ufunc/tbbpool.cpp index a068aba8de7..9c2cfafc443 100644 --- a/numba/np/ufunc/tbbpool.cpp +++ b/numba/np/ufunc/tbbpool.cpp @@ -236,9 +236,11 @@ static void prepare_fork(void) tbb::set_assertion_handler(orig); need_reinit_after_fork = true; } - else if (_DEBUG) + else { - puts("NUMBA: Attempt to fork from non-main thread, TBB may be in incorrect state in child processes"); + fprintf(stderr, "Numba: Attempted to fork from a non-main thread, " + "the TBB library may be in an invalid state in the " + "child process."); } } } diff --git a/numba/tests/test_parallel_backend.py b/numba/tests/test_parallel_backend.py index d3e796e2914..1da9049790b 100644 --- a/numba/tests/test_parallel_backend.py +++ b/numba/tests/test_parallel_backend.py @@ -857,6 +857,69 @@ def test_serial_parent_explicit_mp_fork_par_child_then_par_parent(self): print(out, err) +@skip_parfors_unsupported +@skip_no_tbb +class TestTBBSpecificIssues(ThreadLayerTestHelper): + + _DEBUG = False + + def test_fork_from_non_main_thread(self): + # See issue #5973 and PR #6208 for context. + runme = """if 1: + import threading + import numba + numba.config.THREADING_LAYER='tbb' + from numba import njit, prange, objmode + import os + + def runner(e_running, e_proceed): + @njit(parallel=True, nogil=True) + def work(): + acc = 0 + for x in prange(10): + acc += x + with objmode(): + e_running.set() + # wait for forker() to have forked + while not e_proceed.isSet(): + pass + return acc + work() + + def forker(e_running, e_proceed): + # wait for the jit function to say it's running + while not e_running.isSet(): + pass + # then fork + os.fork() + # now fork is done signal the runner to proceed to exit + e_proceed.set() + + e_running = threading.Event() + e_proceed = threading.Event() + numba_runner = threading.Thread(target=runner, + args=(e_running, e_proceed,)) + fork_runner = threading.Thread(target=forker, + args=(e_running, e_proceed,)) + + threads = (numba_runner, fork_runner) + for t in threads: + t.start() + for t in threads: + t.join() + """ + + cmdline = [sys.executable, '-c', runme] + out, err = self.run_cmd(cmdline) + # assert error message printed on stderr + msg_head = "Attempted to fork from a non-main thread, the TBB library" + self.assertIn(msg_head, err) + + if self._DEBUG: + print("OUT:", out) + print("ERR:", err) + + @skip_parfors_unsupported class TestInitSafetyIssues(TestCase): From 53c6e9393750d13a73d9feadf2a7a4a1f363a238 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Fri, 18 Sep 2020 13:48:26 +0300 Subject: [PATCH 3/7] newline --- numba/np/ufunc/tbbpool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/numba/np/ufunc/tbbpool.cpp b/numba/np/ufunc/tbbpool.cpp index 9c2cfafc443..83389223dfa 100644 --- a/numba/np/ufunc/tbbpool.cpp +++ b/numba/np/ufunc/tbbpool.cpp @@ -240,7 +240,7 @@ static void prepare_fork(void) { fprintf(stderr, "Numba: Attempted to fork from a non-main thread, " "the TBB library may be in an invalid state in the " - "child process."); + "child process.\n"); } } } From 077ba716391d476f42ac68bba6faa819ad08b2c2 Mon Sep 17 00:00:00 2001 From: Stuart Archibald Date: Fri, 18 Sep 2020 12:39:37 +0100 Subject: [PATCH 4/7] Skip test requiring fork on windows. As title. --- numba/tests/test_parallel_backend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/numba/tests/test_parallel_backend.py b/numba/tests/test_parallel_backend.py index 1da9049790b..19930f3d37e 100644 --- a/numba/tests/test_parallel_backend.py +++ b/numba/tests/test_parallel_backend.py @@ -863,6 +863,7 @@ class TestTBBSpecificIssues(ThreadLayerTestHelper): _DEBUG = False + @linux_only # os.fork required. def test_fork_from_non_main_thread(self): # See issue #5973 and PR #6208 for context. runme = """if 1: From 3acb017c8039dcbe514e4f3be0cf0cde8ae0e976 Mon Sep 17 00:00:00 2001 From: Stuart Archibald Date: Fri, 18 Sep 2020 12:52:03 +0100 Subject: [PATCH 5/7] Add docs on TBB fork restrictions. As title. --- docs/source/user/threading-layer.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/source/user/threading-layer.rst b/docs/source/user/threading-layer.rst index 0f69b1f5a03..b4de40eb07f 100644 --- a/docs/source/user/threading-layer.rst +++ b/docs/source/user/threading-layer.rst @@ -163,6 +163,13 @@ system level libraries, some additional things to note: program that is using the ``omp`` threading layer, a detection mechanism is present that will try and gracefully terminate the forked child and print an error message to ``STDERR``. +* On systems with the ``fork(2)`` system call available, if the TBB backed + threading layer is in use and a ``fork`` call is made from a thread other than + the thread that launched TBB (typically the main thread) then this results in + undefined behaviour and a warning will be displayed on ``STDERR``. As + ``spawn`` is essentially ``fork`` followed by ``exec`` it is safe to ``spawn`` + from a non-main thread, but as this cannot be differentiated from just a + ``fork`` call the warning message will still be displayed. * On OSX, the ``intel-openmp`` package is required to enable the OpenMP based threading layer. From 3c997f0dd7a27250e5bb03d0608e182e482ab1de Mon Sep 17 00:00:00 2001 From: Stuart Archibald Date: Wed, 7 Oct 2020 13:03:49 +0100 Subject: [PATCH 6/7] Refer to event objects from globals. As title. --- numba/tests/test_parallel_backend.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/numba/tests/test_parallel_backend.py b/numba/tests/test_parallel_backend.py index 19930f3d37e..9760aa331a0 100644 --- a/numba/tests/test_parallel_backend.py +++ b/numba/tests/test_parallel_backend.py @@ -873,7 +873,7 @@ def test_fork_from_non_main_thread(self): from numba import njit, prange, objmode import os - def runner(e_running, e_proceed): + def runner(): @njit(parallel=True, nogil=True) def work(): acc = 0 @@ -887,7 +887,7 @@ def work(): return acc work() - def forker(e_running, e_proceed): + def forker(): # wait for the jit function to say it's running while not e_running.isSet(): pass @@ -898,10 +898,8 @@ def forker(e_running, e_proceed): e_running = threading.Event() e_proceed = threading.Event() - numba_runner = threading.Thread(target=runner, - args=(e_running, e_proceed,)) - fork_runner = threading.Thread(target=forker, - args=(e_running, e_proceed,)) + numba_runner = threading.Thread(target=runner,) + fork_runner = threading.Thread(target=forker,) threads = (numba_runner, fork_runner) for t in threads: From a2cc68a5c9cf5b07ca4c0f8b9eddca8d94cc406b Mon Sep 17 00:00:00 2001 From: Stuart Archibald Date: Wed, 7 Oct 2020 22:01:44 +0100 Subject: [PATCH 7/7] Fix up use of threading events --- numba/tests/test_parallel_backend.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/numba/tests/test_parallel_backend.py b/numba/tests/test_parallel_backend.py index 9760aa331a0..5d213c4a83a 100644 --- a/numba/tests/test_parallel_backend.py +++ b/numba/tests/test_parallel_backend.py @@ -873,6 +873,15 @@ def test_fork_from_non_main_thread(self): from numba import njit, prange, objmode import os + e_running = threading.Event() + e_proceed = threading.Event() + + def indirect(): + e_running.set() + # wait for forker() to have forked + while not e_proceed.isSet(): + pass + def runner(): @njit(parallel=True, nogil=True) def work(): @@ -880,11 +889,10 @@ def work(): for x in prange(10): acc += x with objmode(): - e_running.set() - # wait for forker() to have forked - while not e_proceed.isSet(): - pass + indirect() + return acc + work() def forker(): @@ -896,8 +904,6 @@ def forker(): # now fork is done signal the runner to proceed to exit e_proceed.set() - e_running = threading.Event() - e_proceed = threading.Event() numba_runner = threading.Thread(target=runner,) fork_runner = threading.Thread(target=forker,)