Skip to content

Commit

Permalink
Merge pull request #6324 from stuartarchibald/pr_6208_continued
Browse files Browse the repository at this point in the history
PR 6208 continued
  • Loading branch information
sklam committed Oct 8, 2020
2 parents 378027d + a2cc68a commit 9381206
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 10 deletions.
7 changes: 7 additions & 0 deletions docs/source/user/threading-layer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ system level libraries, some additional things to note:
program that is using the ``omp`` threading layer, a detection mechanism is
present that will try and gracefully terminate the forked child and print an
error message to ``STDERR``.
* On systems with the ``fork(2)`` system call available, if the TBB backed
threading layer is in use and a ``fork`` call is made from a thread other than
the thread that launched TBB (typically the main thread) then this results in
undefined behaviour and a warning will be displayed on ``STDERR``. As
``spawn`` is essentially ``fork`` followed by ``exec`` it is safe to ``spawn``
from a non-main thread, but as this cannot be differentiated from just a
``fork`` call the warning message will still be displayed.
* On OSX, the ``intel-openmp`` package is required to enable the OpenMP based
threading layer.

Expand Down
49 changes: 39 additions & 10 deletions numba/np/ufunc/tbbpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Implement parallel vectorize workqueue on top of Intel TBB.
#include <tbb/tbb.h>
#include <string.h>
#include <stdio.h>
#include <thread>
#include "workqueue.h"

#include "gufunc_scheduler.h"
Expand All @@ -30,9 +31,6 @@ Implement parallel vectorize workqueue on top of Intel TBB.
#error "TBB version is too old, 2019 update 5, i.e. TBB_INTERFACE_VERSION >= 11005 required"
#endif

#define TSI_INIT(count) tbb::task_scheduler_init(count)
#define TSI_TERMINATE(tsi) tsi->blocking_terminate(std::nothrow)

#define _DEBUG 0
#define _TRACE_SPLIT 0

Expand Down Expand Up @@ -202,12 +200,25 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat
});
}

void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * )
static std::thread::id init_thread_id;
static THREAD_LOCAL(bool) need_reinit_after_fork = false;

static void set_main_thread()
{
init_thread_id = std::this_thread::get_id();
}

static bool is_main_thread()
{
return std::this_thread::get_id() == init_thread_id;
}

static void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * )
{
tbb::internal::runtime_warning("Unable to wait for threads to shut down before fork(). It can break multithreading in child process\n");
}

void ignore_assertion( const char*, int, const char*, const char * ) {}
static void ignore_assertion( const char*, int, const char*, const char * ) {}

static void prepare_fork(void)
{
Expand All @@ -217,9 +228,20 @@ static void prepare_fork(void)
}
if(tsi)
{
assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion);
TSI_TERMINATE(tsi);
tbb::set_assertion_handler(orig);
if(is_main_thread())
{
// TBB thread termination must always be called from same thread that called initialize
assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion);
tsi->blocking_terminate(std::nothrow);
tbb::set_assertion_handler(orig);
need_reinit_after_fork = true;
}
else
{
fprintf(stderr, "Numba: Attempted to fork from a non-main thread, "
"the TBB library may be in an invalid state in the "
"child process.\n");
}
}
}

Expand All @@ -229,8 +251,12 @@ static void reset_after_fork(void)
{
puts("Resuming TBB: after fork");
}
if(tsi)
if(tsi && need_reinit_after_fork)
{
tsi->initialize(tsi_count);
set_main_thread();
need_reinit_after_fork = false;
}
}

#if PY_MAJOR_VERSION >= 3
Expand Down Expand Up @@ -262,12 +288,15 @@ static void launch_threads(int count)
puts("Using TBB");
if(count < 1)
count = tbb::task_scheduler_init::automatic;
tsi = new TSI_INIT(tsi_count = count);
tsi_count = count;
tsi = new tbb::task_scheduler_init(count);
tg = new tbb::task_group;
tg->run([] {}); // start creating threads asynchronously

_INIT_NUM_THREADS = count;

set_main_thread();

#ifndef _MSC_VER
pthread_atfork(prepare_fork, reset_after_fork, reset_after_fork);
#endif
Expand Down
68 changes: 68 additions & 0 deletions numba/tests/test_parallel_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,74 @@ def test_serial_parent_explicit_mp_fork_par_child_then_par_parent(self):
print(out, err)


@skip_parfors_unsupported
@skip_no_tbb
class TestTBBSpecificIssues(ThreadLayerTestHelper):

_DEBUG = False

@linux_only # os.fork required.
def test_fork_from_non_main_thread(self):
# See issue #5973 and PR #6208 for context.
runme = """if 1:
import threading
import numba
numba.config.THREADING_LAYER='tbb'
from numba import njit, prange, objmode
import os
e_running = threading.Event()
e_proceed = threading.Event()
def indirect():
e_running.set()
# wait for forker() to have forked
while not e_proceed.isSet():
pass
def runner():
@njit(parallel=True, nogil=True)
def work():
acc = 0
for x in prange(10):
acc += x
with objmode():
indirect()
return acc
work()
def forker():
# wait for the jit function to say it's running
while not e_running.isSet():
pass
# then fork
os.fork()
# now fork is done signal the runner to proceed to exit
e_proceed.set()
numba_runner = threading.Thread(target=runner,)
fork_runner = threading.Thread(target=forker,)
threads = (numba_runner, fork_runner)
for t in threads:
t.start()
for t in threads:
t.join()
"""

cmdline = [sys.executable, '-c', runme]
out, err = self.run_cmd(cmdline)
# assert error message printed on stderr
msg_head = "Attempted to fork from a non-main thread, the TBB library"
self.assertIn(msg_head, err)

if self._DEBUG:
print("OUT:", out)
print("ERR:", err)


@skip_parfors_unsupported
class TestInitSafetyIssues(TestCase):

Expand Down

0 comments on commit 9381206

Please sign in to comment.