Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG] Protect against oversubscription with numba prange / or TBB linked native code #951

Merged
merged 8 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
Latest changes
===============
==============

Release 0.14.1
--------------

- Configure the loky workers' environment to mitigate oversubsription with
nested multi-threaded code in the following case:

- allow for a suitable number of threads for numba (``NUMBA_NUM_THREADS``);

- enable Interprocess Communication for scheduler coordination when the
nested code uses Threading Building Blocks (TBB) (``ENABLE_IPC=1``)

https://github.com/joblib/joblib/pull/951


Release 0.14.0
--------------
Expand Down
9 changes: 4 additions & 5 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ environment:
# platforms (universal wheel).
# We run the tests on 2 different target platforms for testing purpose only.
matrix:
- PYTHON: "C:\\Python27"
PYTHON_VERSION: "2.7.x"
PYTHON_ARCH: "32"

- PYTHON: "C:\\Python35-x64"
PYTHON_VERSION: "3.5.x"
PYTHON_ARCH: "64"

- PYTHON: "C:\\Python27"
PYTHON_VERSION: "2.7.x"
PYTHON_ARCH: "32"

matrix:
fast_finish: true


install:
# Install Python (from the official .msi of http://python.org) and pip when
# not already installed.
Expand Down
14 changes: 11 additions & 3 deletions joblib/_parallel_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ def __init__(self, nesting_level=None, inner_max_num_threads=None):

MAX_NUM_THREADS_VARS = [
'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS',
'BLIS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMEXPR_NUM_THREADS'
'BLIS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'TBB_NUM_THREADS',
Copy link

@anton-malakhov anton-malakhov Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is TBB_NUM_THREADS kind of dummy variable? Because there is no such control variable in TBB itself. @ogrisel
I'd suggest adding a comment which documents its internal usage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops indeed, this is a left over I wanted to remove. thanks for the catch.

'NUMBA_NUM_THREADS', 'NUMEXPR_NUM_THREADS',
]

TBB_ENABLE_IPC_VAR = "ENABLE_IPC"

@abstractmethod
def effective_n_jobs(self, n_jobs):
"""Determine the number of jobs that can actually run in parallel
Expand Down Expand Up @@ -149,7 +152,7 @@ def retrieval_context(self):
"""
yield

def _get_max_num_threads_vars(self, n_jobs):
def _prepare_worker_env(self, n_jobs):
"""Return environment variables limiting threadpools in external libs.

This function return a dict containing environment variables to pass
Expand All @@ -173,6 +176,11 @@ def _get_max_num_threads_vars(self, n_jobs):
var_value = str(explicit_n_threads)

env[var] = var_value

if self.TBB_ENABLE_IPC_VAR not in os.environ:
# To avoid over-subscription when using TBB, let the TBB schedulers
# use Inter Process Communication to coordinate:
env[self.TBB_ENABLE_IPC_VAR] = "1"
return env

@staticmethod
Expand Down Expand Up @@ -505,7 +513,7 @@ def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,

self._workers = get_memmapping_executor(
n_jobs, timeout=idle_worker_timeout,
env=self._get_max_num_threads_vars(n_jobs=n_jobs),
env=self._prepare_worker_env(n_jobs=n_jobs),
**memmappingexecutor_args)
self.parallel = parallel
return n_jobs
Expand Down
17 changes: 12 additions & 5 deletions joblib/test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,20 +1537,27 @@ def test_thread_bomb_mitigation(backend):
def _run_parallel_sum():
env_vars = {}
for var in ['OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS',
'VECLIB_MAXIMUM_THREADS', 'NUMEXPR_NUM_THREADS']:
'VECLIB_MAXIMUM_THREADS', 'NUMEXPR_NUM_THREADS',
'NUMBA_NUM_THREADS', 'ENABLE_IPC']:
env_vars[var] = os.environ.get(var)
return env_vars, parallel_sum(100)


@parametrize("backend", [None, 'loky'])
@skipif(parallel_sum is None, reason="Need OpenMP helper compiled")
def test_parallel_thread_limit(backend):
res = Parallel(n_jobs=2, backend=backend)(
results = Parallel(n_jobs=2, backend=backend)(
delayed(_run_parallel_sum)() for _ in range(2)
)
for value in res[0][0].values():
assert value == '1'
assert all([r[1] == 1 for r in res])
expected_num_threads = max(cpu_count() // 2, 1)
for worker_env_vars, omp_num_threads in results:
assert omp_num_threads == expected_num_threads
for name, value in worker_env_vars.items():
if name.endswith("_THREADS"):
assert value == str(expected_num_threads)
else:
assert name == "ENABLE_IPC"
assert value == "1"


@skipif(distributed is not None,
Expand Down