Skip to content

Commit

Permalink
Timeout 2, proper rebase off joblib/master (#340)
Browse files Browse the repository at this point in the history
* adding timeout to get

* adding tests

* adding timeout arg to ImmediateResult

* fixing flake errors

* flake error

* updated docstring and added backend testing for timeout

* dynamic check if future supports timeout arg
  • Loading branch information
Ian Dewancker authored and ogrisel committed Apr 18, 2016
1 parent 537b2ea commit b3255ed
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
13 changes: 11 additions & 2 deletions joblib/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend,
ThreadingBackend, SequentialBackend)
from ._compat import _basestring
from .func_inspect import getfullargspec


BACKENDS = {
Expand Down Expand Up @@ -297,6 +298,9 @@ class Parallel(Logger):
printed. Above 50, the output is sent to stdout.
The frequency of the messages increases with the verbosity level.
If it more than 10, all iterations are reported.
timeout: float, optional
Timeout limit for each task to complete. If any task takes longer
a TimeOutError will be raised. Only applied when n_jobs != 1
pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}
The number of batches (of tasks) to be pre-dispatched.
Default is '2*n_jobs'. When batch_size="auto" this is reasonable
Expand Down Expand Up @@ -454,7 +458,7 @@ class Parallel(Logger):
[Parallel(n_jobs=2)]: Done 5 out of 6 | elapsed: 0.0s remaining: 0.0s
[Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
'''
def __init__(self, n_jobs=1, backend=None, verbose=0,
def __init__(self, n_jobs=1, backend=None, verbose=0, timeout=None,
pre_dispatch='2 * n_jobs', batch_size='auto',
temp_folder=None, max_nbytes='1M', mmap_mode='r'):
active_backend, default_n_jobs = get_active_backend()
Expand All @@ -464,6 +468,7 @@ def __init__(self, n_jobs=1, backend=None, verbose=0,
n_jobs = default_n_jobs
self.n_jobs = n_jobs
self.verbose = verbose
self.timeout = timeout
self.pre_dispatch = pre_dispatch

if isinstance(max_nbytes, _basestring):
Expand Down Expand Up @@ -668,7 +673,11 @@ def retrieve(self):
with self._lock:
job = self._jobs.pop(0)
try:
self._output.extend(job.get())
# check if timeout supported in backend future implementation
if 'timeout' in getfullargspec(job.get).args:
self._output.extend(job.get(timeout=self.timeout))
else:
self._output.extend(job.get())
except tuple(self.exceptions) as exception:
# Stop dispatching any new job in the async callback thread
self._aborting = True
Expand Down
21 changes: 21 additions & 0 deletions joblib/test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from joblib.test.common import with_multiprocessing
from joblib.testing import check_subprocess_call
from joblib._compat import PY3_OR_LATER
from multiprocessing import TimeoutError
from time import sleep

try:
import cPickle as pickle
Expand Down Expand Up @@ -248,6 +250,25 @@ def g(x):
(delayed(g)(x) for x in range(10)))


def test_parallel_timeout_success():
# Check that timeout isn't thrown when function is fast enough
for backend in ['multiprocessing', 'threading']:
nose.tools.assert_equal(
10,
len(Parallel(n_jobs=2, backend=backend, timeout=10)
(delayed(sleep)(0.001) for x in range(10))))


def test_parallel_timeout_fail():
# Check that timeout properly fails when function is too slow
for backend in ['multiprocessing', 'threading']:
nose.tools.assert_raises(
TimeoutError,
Parallel(n_jobs=2, backend=backend, timeout=0.01),
(delayed(sleep)(10) for x in range(10))
)


def test_error_capture():
# Check that error are captured, and that correct exceptions
# are raised.
Expand Down

0 comments on commit b3255ed

Please sign in to comment.