Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR 6208 continued #6324

Merged
merged 7 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/user/threading-layer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ system level libraries, some additional things to note:
program that is using the ``omp`` threading layer, a detection mechanism is
present that will try and gracefully terminate the forked child and print an
error message to ``STDERR``.
* On systems with the ``fork(2)`` system call available, if the TBB backed
threading layer is in use and a ``fork`` call is made from a thread other than
the thread that launched TBB (typically the main thread) then this results in
undefined behaviour and a warning will be displayed on ``STDERR``. As
``spawn`` is essentially ``fork`` followed by ``exec`` it is safe to ``spawn``
from a non-main thread, but as this cannot be differentiated from just a
``fork`` call the warning message will still be displayed.
* On OSX, the ``intel-openmp`` package is required to enable the OpenMP based
threading layer.

Expand Down
49 changes: 39 additions & 10 deletions numba/np/ufunc/tbbpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Implement parallel vectorize workqueue on top of Intel TBB.
#include <tbb/tbb.h>
#include <string.h>
#include <stdio.h>
#include <thread>
#include "workqueue.h"

#include "gufunc_scheduler.h"
Expand All @@ -30,9 +31,6 @@ Implement parallel vectorize workqueue on top of Intel TBB.
#error "TBB version is too old, 2019 update 5, i.e. TBB_INTERFACE_VERSION >= 11005 required"
#endif

#define TSI_INIT(count) tbb::task_scheduler_init(count)
#define TSI_TERMINATE(tsi) tsi->blocking_terminate(std::nothrow)

#define _DEBUG 0
#define _TRACE_SPLIT 0

Expand Down Expand Up @@ -202,12 +200,25 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat
});
}

void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * )
static std::thread::id init_thread_id;
static THREAD_LOCAL(bool) need_reinit_after_fork = false;

static void set_main_thread()
{
init_thread_id = std::this_thread::get_id();
}

static bool is_main_thread()
{
return std::this_thread::get_id() == init_thread_id;
}

static void ignore_blocking_terminate_assertion( const char*, int, const char*, const char * )
{
tbb::internal::runtime_warning("Unable to wait for threads to shut down before fork(). It can break multithreading in child process\n");
}

void ignore_assertion( const char*, int, const char*, const char * ) {}
static void ignore_assertion( const char*, int, const char*, const char * ) {}

static void prepare_fork(void)
{
Expand All @@ -217,9 +228,20 @@ static void prepare_fork(void)
}
if(tsi)
{
assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion);
TSI_TERMINATE(tsi);
tbb::set_assertion_handler(orig);
if(is_main_thread())
{
// TBB thread termination must always be called from same thread that called initialize
assertion_handler_type orig = tbb::set_assertion_handler(ignore_blocking_terminate_assertion);
tsi->blocking_terminate(std::nothrow);
tbb::set_assertion_handler(orig);
need_reinit_after_fork = true;
}
else
{
fprintf(stderr, "Numba: Attempted to fork from a non-main thread, "
"the TBB library may be in an invalid state in the "
"child process.\n");
}
}
}

Expand All @@ -229,8 +251,12 @@ static void reset_after_fork(void)
{
puts("Resuming TBB: after fork");
}
if(tsi)
if(tsi && need_reinit_after_fork)
{
tsi->initialize(tsi_count);
set_main_thread();
need_reinit_after_fork = false;
}
}

#if PY_MAJOR_VERSION >= 3
Expand Down Expand Up @@ -262,12 +288,15 @@ static void launch_threads(int count)
puts("Using TBB");
if(count < 1)
count = tbb::task_scheduler_init::automatic;
tsi = new TSI_INIT(tsi_count = count);
tsi_count = count;
tsi = new tbb::task_scheduler_init(count);
tg = new tbb::task_group;
tg->run([] {}); // start creating threads asynchronously

_INIT_NUM_THREADS = count;

set_main_thread();

#ifndef _MSC_VER
pthread_atfork(prepare_fork, reset_after_fork, reset_after_fork);
#endif
Expand Down
68 changes: 68 additions & 0 deletions numba/tests/test_parallel_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,74 @@ def test_serial_parent_explicit_mp_fork_par_child_then_par_parent(self):
print(out, err)


@skip_parfors_unsupported
@skip_no_tbb
class TestTBBSpecificIssues(ThreadLayerTestHelper):

_DEBUG = False

@linux_only # os.fork required.
def test_fork_from_non_main_thread(self):
# See issue #5973 and PR #6208 for context.
runme = """if 1:
import threading
import numba
numba.config.THREADING_LAYER='tbb'
from numba import njit, prange, objmode
import os

e_running = threading.Event()
e_proceed = threading.Event()

def indirect():
e_running.set()
# wait for forker() to have forked
while not e_proceed.isSet():
pass

def runner():
@njit(parallel=True, nogil=True)
def work():
acc = 0
for x in prange(10):
acc += x
with objmode():
indirect()

return acc

work()

def forker():
# wait for the jit function to say it's running
while not e_running.isSet():
pass
# then fork
os.fork()
# now fork is done signal the runner to proceed to exit
e_proceed.set()

numba_runner = threading.Thread(target=runner,)
fork_runner = threading.Thread(target=forker,)

threads = (numba_runner, fork_runner)
for t in threads:
t.start()
for t in threads:
t.join()
"""

cmdline = [sys.executable, '-c', runme]
out, err = self.run_cmd(cmdline)
# assert error message printed on stderr
msg_head = "Attempted to fork from a non-main thread, the TBB library"
self.assertIn(msg_head, err)

if self._DEBUG:
print("OUT:", out)
print("ERR:", err)


@skip_parfors_unsupported
class TestInitSafetyIssues(TestCase):

Expand Down