Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return_generator={True,False} -> return_as={'list','generator'} #1458

Merged
merged 10 commits into from Jun 28, 2023
3 changes: 2 additions & 1 deletion CHANGES.rst
Expand Up @@ -39,6 +39,7 @@ In development
- Add a ``return_generator`` parameter for ``Parallel``, that allows
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
to consume results asynchronously.
https://github.com/joblib/joblib/pull/1393
https://github.com/joblib/joblib/pull/1458

- Improve the behavior of ``joblib`` for ``n_jobs=1``, with simplified
tracebacks and more efficient running time.
Expand Down Expand Up @@ -1343,4 +1344,4 @@ Gael Varoquaux
BUG: Make sure that joblib still works with Python2.5. => release 0.4.2

Release 0.4.1
----------------
----------------
20 changes: 14 additions & 6 deletions doc/parallel.rst
Expand Up @@ -25,11 +25,11 @@ can be spread over 2 CPUs using the following::

The output can be a generator that yields the results as soon as they're
available, even if the subsequent tasks aren't completed yet. The order
of the outputs always matches the order of the inputs::
of the outputs always matches the order the inputs have been submitted with::

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> parallel = Parallel(n_jobs=2, return_generator=True)
>>> parallel = Parallel(n_jobs=2, return_as="submitted")
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
>>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
>>> print(type(output_generator))
<class 'generator'>
Expand All @@ -40,9 +40,17 @@ of the outputs always matches the order of the inputs::
>>> print(list(output_generator))
[2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

This generator allows to reduce the memory footprint of :class:`joblib.Parallel`
calls in case the results can benefit from on-the-fly aggregation, as illustrated
in :ref:`sphx_glr_auto_examples_parallel_generator.py`.
This generator allows to reduce the memory footprint of
:class:`joblib.Parallel` calls in case the results can benefit from on-the-fly
aggregation, as illustrated in
:ref:`sphx_glr_auto_examples_parallel_generator.py`.

Future releases are planned to also support returning a generator that yields
the results in the order of completion rather than the order of submission, by
using ``return_as="completed"`` instead of ``return_as="submitted"``. In this
case the order of the outputs will depend on the concurrency of workers and
will not be guaranteed to be deterministic, meaning the results can be yielded
with a different order every time the code is executed.

Thread-based parallelism vs process-based parallelism
=====================================================
Expand Down Expand Up @@ -415,4 +423,4 @@ does not exist (but multiprocessing has more overhead).

.. autoclass:: joblib.parallel.ParallelBackendBase

.. autoclass:: joblib.parallel.AutoBatchingMixin
.. autoclass:: joblib.parallel.AutoBatchingMixin
33 changes: 20 additions & 13 deletions examples/parallel_generator.py
Expand Up @@ -10,8 +10,15 @@
observe a high memory usage, as all the results are held in RAM before being
processed

Using the ``return_generator=True`` option allows to progressively consumes
the outputs as they arrive and keeps the memory at an acceptable level.
Using the ``return_as=`` parameter with non-default value allows to
progressively consume the outputs as they arrive and keeps the memory at an
acceptable level.
tomMoral marked this conversation as resolved.
Show resolved Hide resolved

Using this feature requires passing ``return_as="submitted"``, in which case
the generator yields the results in the order the tasks have been submitted
with. Future releases are also planned to support the ``return_as="completed"``
parameter to have the generator yield results as soon as available.

"""

##############################################################################
Expand Down Expand Up @@ -93,16 +100,16 @@ def accumulator_sum(generator):


##############################################################################
# We process many of the tasks in parallel. If `return_generator=False`
# (default), we should expect a usage of more than 2GB in RAM. Indeed, all the
# results are computed and stored in ``res`` before being processed by
# We process many of the tasks in parallel. If ``return_as="list"`` (default),
# we should expect a usage of more than 2GB in RAM. Indeed, all the results
# are computed and stored in ``res`` before being processed by
# `accumulator_sum` and collected by the gc.

from joblib import Parallel, delayed

monitor = MemoryMonitor()
print("Running tasks with return_generator=False...")
res = Parallel(n_jobs=2, return_generator=False)(
print('Running tasks with return_as="list"...')
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
res = Parallel(n_jobs=2, return_as="list")(
delayed(return_big_object)(i) for i in range(150)
)
print("Accumulate results:", end='')
Expand All @@ -117,14 +124,14 @@ def accumulator_sum(generator):


##############################################################################
# If we use ``return_generator=True``, ``res`` is simply a generator with the
# If we use ``return_as="submitted"``, ``res`` is simply a generator with the
# results that are ready. Here we consume the results as soon as they arrive
# with the ``accumulator_sum`` and once they have been used, they are collected
# by the gc. The memory footprint is thus reduced, typically around 300MB.

monitor_gen = MemoryMonitor()
print("Create result generator with return_generator=True...")
res = Parallel(n_jobs=2, return_generator=True)(
print('Create result generator with return_as="submitted"...')
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
res = Parallel(n_jobs=2, return_as="submitted")(
delayed(return_big_object)(i) for i in range(150)
)
print("Accumulate results:", end='')
Expand Down Expand Up @@ -152,11 +159,11 @@ def accumulator_sum(generator):
import matplotlib.pyplot as plt
plt.semilogy(
np.maximum.accumulate(monitor.memory_buffer),
label='return_generator=False'
label='return_as="list"'
)
plt.semilogy(
np.maximum.accumulate(monitor_gen.memory_buffer),
label='return_generator=True'
label='return_as="submitted"'
)
plt.xlabel("Time")
plt.xticks([], [])
Expand All @@ -166,7 +173,7 @@ def accumulator_sum(generator):
plt.show()

##############################################################################
# It is important to note that with ``return_generator``, the results are
# It is important to note that with ``return_as="submitted"``, the results are
# still accumulated in RAM after computation. But as we asynchronously process
# them, they can be freed sooner. However, if the generator is not consumed
# the memory still grows linearly.
34 changes: 24 additions & 10 deletions joblib/parallel.py
Expand Up @@ -967,11 +967,15 @@
soft hints (prefer) or hard constraints (require) so as to make it
possible for library users to change the backend from the outside
using the :func:`~parallel_backend` context manager.
return_generator: bool
If True, calls to this instance will return a generator, yielding
the results as soon as they are available, in the original order.
Note that the intended usage is to run one call at a time. Multiple
calls to the same Parallel object will result in a ``RuntimeError``
return_as: str in {'list', 'submitted'}, default: 'list'
If 'list', calls to this instance will return a list, only when
all results have been processed and are ready to return.
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
Else it will return a generator that yields the results as soon as
they are available, in the order the tasks have been submitted
with.
Future releases are planned to also support 'completed', in which
case the generator immediately yields available results
independently of the submission order.
prefer: str in {'processes', 'threads'} or None, default: None
Soft hint to choose the default backend if no specific backend
was selected with the :func:`~parallel_backend` context manager.
Expand Down Expand Up @@ -1066,6 +1070,9 @@
* Ability to use shared memory efficiently with worker
processes for large numpy-based datastructures.

Note that the intended usage is to run one call at a time. Multiple
calls to the same Parallel object will result in a ``RuntimeError``

Examples
--------

Expand Down Expand Up @@ -1161,7 +1168,7 @@
self,
n_jobs=default_parallel_config["n_jobs"],
backend=None,
return_generator=False,
return_as="list",
verbose=default_parallel_config["verbose"],
timeout=None,
pre_dispatch='2 * n_jobs',
Expand Down Expand Up @@ -1191,7 +1198,14 @@
self.verbose = _get_config_param(verbose, context_config, "verbose")
self.timeout = timeout
self.pre_dispatch = pre_dispatch
self.return_generator = return_generator

if return_as not in {"list", "submitted"}:
raise ValueError(

Check warning on line 1203 in joblib/parallel.py

View check run for this annotation

Codecov / codecov/patch

joblib/parallel.py#L1203

Added line #L1203 was not covered by tests
'Expected `return_as` parameter to be a string equal to "list"'
f' or "submitted", but got {return_as} instead'
)
self.return_as = return_as
self.return_generator = return_as != "list"

# Check if we are under a parallel_config or parallel_backend
# context manager and use the config from the context manager
Expand Down Expand Up @@ -1267,10 +1281,10 @@
% batch_size)

if not isinstance(backend, SequentialBackend):
if return_generator and not backend.supports_return_generator:
if self.return_generator and not backend.supports_return_generator:
raise ValueError(
"Backend {} does not support "
"return_generator=True".format(backend)
"return_as={}".format(backend, return_as)
)
# This lock is used to coordinate the main thread of this process
# with the async callback thread of our the pool.
Expand Down Expand Up @@ -1665,7 +1679,7 @@
while self._wait_retrieval():

# If the callback thread of a worker has signaled that its task
# triggerd an exception, or if the retrieval loop has raised an
# triggered an exception, or if the retrieval loop has raised an
# exception (e.g. `GeneratorExit`), exit the loop and surface the
# worker traceback.
if self._aborting:
Expand Down
18 changes: 9 additions & 9 deletions joblib/test/test_parallel.py
Expand Up @@ -1212,12 +1212,12 @@ def set_list_value(input_list, index, value):


@pytest.mark.parametrize('n_jobs', [1, 2, 4])
def test_parallel_return_generator_order(n_jobs):
def test_parallel_return_order_with_return_as_submitted_parameter(n_jobs):
# This test inserts values in a list in some expected order
# in sequential computing, and then checks that this order has been
# respected by Parallel output generator.
input_list = [0] * 5
result = Parallel(n_jobs=n_jobs, return_generator=True,
result = Parallel(n_jobs=n_jobs, return_as="submitted",
backend='threading')(
delayed(set_list_value)(input_list, i, i) for i in range(5))

Expand Down Expand Up @@ -1252,7 +1252,7 @@ def test_deadlock_with_generator(backend, n_jobs):
# Non-regression test for a race condition in the backends when the pickler
# is delayed by a large object.
with Parallel(n_jobs=n_jobs, backend=backend,
return_generator=True) as parallel:
return_as="submitted") as parallel:
result = parallel(delayed(get_large_object)(i) for i in range(10))
next(result)
next(result)
Expand All @@ -1270,7 +1270,7 @@ def test_multiple_generator_call(backend, n_jobs):
# assumption that only one generator can be submitted at a time.
with raises(RuntimeError,
match="This Parallel instance is already running"):
parallel = Parallel(n_jobs, backend=backend, return_generator=True)
parallel = Parallel(n_jobs, backend=backend, return_as="submitted")
g = parallel(delayed(sleep)(1) for _ in range(10)) # noqa: F841
t_start = time.time()
gen2 = parallel(delayed(id)(i) for i in range(100)) # noqa: F841
Expand All @@ -1294,7 +1294,7 @@ def test_multiple_generator_call_managed(backend, n_jobs):
# immediately when Parallel.__call__ is called. This test relies on the
# assumption that only one generator can be submitted at a time.
with Parallel(n_jobs, backend=backend,
return_generator=True) as parallel:
return_as="submitted") as parallel:
g = parallel(delayed(sleep)(10) for _ in range(10)) # noqa: F841
t_start = time.time()
with raises(RuntimeError,
Expand All @@ -1317,10 +1317,10 @@ def test_multiple_generator_call_managed(backend, n_jobs):
@parametrize('n_jobs', [1, 2, -2, -1])
def test_multiple_generator_call_separated(backend, n_jobs):
# Check that for separated Parallel, both tasks are correctly returned.
g = Parallel(n_jobs, backend=backend, return_generator=True)(
g = Parallel(n_jobs, backend=backend, return_as="submitted")(
delayed(sqrt)(i ** 2) for i in range(10)
)
g2 = Parallel(n_jobs, backend=backend, return_generator=True)(
g2 = Parallel(n_jobs, backend=backend, return_as="submitted")(
delayed(sqrt)(i ** 2) for i in range(10, 20)
)

Expand All @@ -1340,7 +1340,7 @@ def test_multiple_generator_call_separated_gc(backend, error):

# Check that in loky, only one call can be run at a time with
# a single executor.
parallel = Parallel(2, backend=backend, return_generator=True)
parallel = Parallel(2, backend=backend, return_as="submitted")
g = parallel(delayed(sleep)(10) for i in range(10))
g_wr = weakref.finalize(g, lambda: print("Generator collected"))
ctx = (
Expand All @@ -1353,7 +1353,7 @@ def test_multiple_generator_call_separated_gc(backend, error):
# For the other backends, as the worker pools are not shared between
# the two calls, this should proceed correctly.
t_start = time.time()
g = Parallel(2, backend=backend, return_generator=True)(
g = Parallel(2, backend=backend, return_as="submitted")(
delayed(sqrt)(i ** 2) for i in range(10, 20)
)

Expand Down