Skip to content

Commit

Permalink
[MRG+1] Remove warnings in nested Parallel when the inner Parallel ha…
Browse files Browse the repository at this point in the history
…s n_jobs=1 (#406)

* Update _parallel_backends.py

suppress a warning if a daemonic process creates parallel jobs with n_jobs=1

* Add test for warnings in nested Parallel calls

Enhance check_subprocess_call to be able to check stderr as well as
stdout.
  • Loading branch information
harishmk authored and aabadie committed Oct 5, 2016
1 parent a82addc commit 5f169bb
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 12 deletions.
9 changes: 5 additions & 4 deletions joblib/_parallel_backends.py
Expand Up @@ -267,10 +267,11 @@ def effective_n_jobs(self, n_jobs):
"""
if mp.current_process().daemon:
# Daemonic processes cannot have children
warnings.warn(
'Multiprocessing-backed parallel loops cannot be nested,'
' setting n_jobs=1',
stacklevel=3)
if n_jobs != 1:
warnings.warn(
'Multiprocessing-backed parallel loops cannot be nested,'
' setting n_jobs=1',
stacklevel=3)
return 1

elif threading.current_thread().name != 'MainThread':
Expand Down
47 changes: 43 additions & 4 deletions joblib/test/test_parallel.py
Expand Up @@ -44,6 +44,10 @@
# Backward compat
from Queue import Queue

try:
import posix
except ImportError:
posix = None

from joblib._parallel_backends import SequentialBackend
from joblib._parallel_backends import ThreadingBackend
Expand Down Expand Up @@ -724,10 +728,7 @@ def test_no_blas_crash_or_freeze_with_multiprocessing():
def test_parallel_with_interactively_defined_functions():
# When functions are defined interactively in a python/IPython
# session, we want to be able to use them with joblib.Parallel

try:
import posix
except ImportError:
if posix is None:
# This test pass only when fork is the process start method
raise nose.SkipTest('Not a POSIX platform')

Expand Down Expand Up @@ -768,3 +769,41 @@ def generate_arrays(n):
delayed(check_memmap)(a) for a in generate_arrays(100))
for result, expected in zip(results, generate_arrays(len(results))):
np.testing.assert_array_equal(expected, result)


@with_multiprocessing
def test_nested_parallel_warnings():
# The warnings happen in child processes so
# warnings.catch_warnings can not be used for this tests that's
# why we use check_subprocess_call instead
if posix is None:
# This test pass only when fork is the process start method
raise nose.SkipTest('Not a POSIX platform')

template_code = """
import sys
from joblib import Parallel, delayed
def func():
return 42
def parallel_func():
res = Parallel(n_jobs={inner_n_jobs})(delayed(func)() for _ in range(3))
return res
Parallel(n_jobs={outer_n_jobs})(delayed(parallel_func)() for _ in range(5))
"""
# no warnings if inner_n_jobs=1
code = template_code.format(inner_n_jobs=1, outer_n_jobs=2)
check_subprocess_call([sys.executable, '-c', code],
stderr_regex='^$')

# warnings if inner_n_jobs != 1
regex = ('Multiprocessing-backed parallel loops cannot '
'be nested')
code = template_code.format(inner_n_jobs=2, outer_n_jobs=2)
check_subprocess_call([sys.executable, '-c', code],
stderr_regex=regex)
2 changes: 1 addition & 1 deletion joblib/test/test_testing.py
Expand Up @@ -24,7 +24,7 @@ def test_check_subprocess_call_non_matching_regex():
code = '42'
non_matching_pattern = '_no_way_this_matches_anything_'
assert_raises_regex(ValueError,
'Unexpected output.+{0}'.format(non_matching_pattern),
'Unexpected stdout.+{0}'.format(non_matching_pattern),
check_subprocess_call,
[sys.executable, '-c', code],
stdout_regex=non_matching_pattern)
Expand Down
14 changes: 11 additions & 3 deletions joblib/testing.py
Expand Up @@ -51,10 +51,12 @@ def assert_raises_regex(expected_exception, expected_regexp,
expected_exception(expected_regexp))


def check_subprocess_call(cmd, timeout=1, stdout_regex=None):
def check_subprocess_call(cmd, timeout=1, stdout_regex=None,
stderr_regex=None):
"""Runs a command in a subprocess with timeout in seconds.
Also checks returncode is zero and stdout if stdout_regex is set.
Also checks returncode is zero, stdout if stdout_regex is set, and
stderr if stderr_regex is set.
"""
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
Expand All @@ -79,7 +81,13 @@ def kill_process():
if (stdout_regex is not None and
not re.search(stdout_regex, stdout)):
raise ValueError(
"Unexpected output: '{0!r}' does not match:\n{1!r}".format(
"Unexpected stdout: {0!r} does not match:\n{1!r}".format(
stdout_regex, stdout))
if (stderr_regex is not None and
not re.search(stderr_regex, stderr)):
raise ValueError(
"Unexpected stderr: {0!r} does not match:\n{1!r}".format(
stderr_regex, stderr))

finally:
timer.cancel()

0 comments on commit 5f169bb

Please sign in to comment.