From 27629e01a0d1c7ab77cc60c9010a96fe0b666378 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Thu, 22 Jun 2023 11:48:36 +0200 Subject: [PATCH 1/8] `return_generator={True,False}` -> `return_as={'list','submitted'}` --- CHANGES.rst | 4 ++-- doc/parallel.rst | 13 ++++++++++--- examples/parallel_generator.py | 34 ++++++++++++++++++++-------------- joblib/parallel.py | 34 ++++++++++++++++++++++++---------- joblib/test/test_parallel.py | 20 ++++++++++---------- 5 files changed, 66 insertions(+), 39 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index b6c361035..04723ac3d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -32,7 +32,7 @@ In development previous versions of Joblib. https://github.com/joblib/joblib/pull/1374 -- Add a ``return_generator`` parameter for ``Parallel``, that allows +- Add a ``return_as`` parameter for ``Parallel``, that allows to consume results asynchronously. https://github.com/joblib/joblib/pull/1393 @@ -1339,4 +1339,4 @@ Gael Varoquaux BUG: Make sure that joblib still works with Python2.5. => release 0.4.2 Release 0.4.1 ----------------- +---------------- \ No newline at end of file diff --git a/doc/parallel.rst b/doc/parallel.rst index b63e0ae54..8a011ceb7 100644 --- a/doc/parallel.rst +++ b/doc/parallel.rst @@ -25,11 +25,11 @@ can be spread over 2 CPUs using the following:: The output can be a generator that yields the results as soon as they're available, even if the subsequent tasks aren't completed yet. The order -of the outputs always matches the order of the inputs:: +of the outputs always matches the order the inputs have been submitted with:: >>> from math import sqrt >>> from joblib import Parallel, delayed - >>> parallel = Parallel(n_jobs=2, return_generator=True) + >>> parallel = Parallel(n_jobs=2, return_as="submitted") >>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10)) >>> print(type(output_generator)) @@ -44,6 +44,13 @@ This generator allows to reduce the memory footprint of :class:`joblib.Parallel` calls in case the results can benefit from on-the-fly aggregation, as illustrated in :ref:`sphx_glr_auto_examples_parallel_generator.py`. +Future releases are planned to also support returning a generator that yields the +results in the order of completion rather than the order of submission, by using +`return_as="completed"` instead of `return_as="submitted"`. In this case the order the +results are returned with will depend on the concurrency of workers and will not +be guaranteed to be deterministic, meaning the results can be yielded with a different +order every time the code is executed. + Thread-based parallelism vs process-based parallelism ===================================================== @@ -415,4 +422,4 @@ does not exist (but multiprocessing has more overhead). .. autoclass:: joblib.parallel.ParallelBackendBase -.. autoclass:: joblib.parallel.AutoBatchingMixin +.. autoclass:: joblib.parallel.AutoBatchingMixin \ No newline at end of file diff --git a/examples/parallel_generator.py b/examples/parallel_generator.py index 43b84ea07..9d90ecfa6 100644 --- a/examples/parallel_generator.py +++ b/examples/parallel_generator.py @@ -10,8 +10,14 @@ observe a high memory usage, as all the results are held in RAM before being processed -Using the ``return_generator=True`` option allows to progressively consumes -the outputs as they arrive and keeps the memory at an acceptable level. +Using the `return_as=` parameter with non-default value allows to progressively +consume the outputs as they arrive and keeps the memory at an acceptable level. + +Using this feature requires passing `return_as="submitted"`, in which case the +generator yields the results in the order the tasks have been submitted with. +Future releases are also planned to support the `return_as="completed"` +parameter to have the generator yield results as soon as available. + """ ############################################################################## @@ -93,16 +99,16 @@ def accumulator_sum(generator): ############################################################################## -# We process many of the tasks in parallel. If `return_generator=False` -# (default), we should expect a usage of more than 2GB in RAM. Indeed, all the -# results are computed and stored in ``res`` before being processed by +# We process many of the tasks in parallel. If `return_as="list"` (default), +# we should expect a usage of more than 2GB in RAM. Indeed, all the results +# are computed and stored in ``res`` before being processed by # `accumulator_sum` and collected by the gc. from joblib import Parallel, delayed monitor = MemoryMonitor() -print("Running tasks with return_generator=False...") -res = Parallel(n_jobs=2, return_generator=False)( +print('Running tasks with return_as="list"...') +res = Parallel(n_jobs=2, return_as="list")( delayed(return_big_object)(i) for i in range(150) ) print("Accumulate results:", end='') @@ -117,14 +123,14 @@ def accumulator_sum(generator): ############################################################################## -# If we use ``return_generator=True``, ``res`` is simply a generator with the +# If we use ``return_as="submitted"``, ``res`` is simply a generator with the # results that are ready. Here we consume the results as soon as they arrive # with the ``accumulator_sum`` and once they have been used, they are collected # by the gc. The memory footprint is thus reduced, typically around 300MB. monitor_gen = MemoryMonitor() -print("Create result generator with return_generator=True...") -res = Parallel(n_jobs=2, return_generator=True)( +print('Create result generator with return_as="submitted"...') +res = Parallel(n_jobs=2, return_as="submitted")( delayed(return_big_object)(i) for i in range(150) ) print("Accumulate results:", end='') @@ -152,11 +158,11 @@ def accumulator_sum(generator): import matplotlib.pyplot as plt plt.semilogy( np.maximum.accumulate(monitor.memory_buffer), - label='return_generator=False' + label='return_as="list"' ) plt.semilogy( np.maximum.accumulate(monitor_gen.memory_buffer), - label='return_generator=True' + label='return_as="submitted"' ) plt.xlabel("Time") plt.xticks([], []) @@ -166,7 +172,7 @@ def accumulator_sum(generator): plt.show() ############################################################################## -# It is important to note that with ``return_generator``, the results are +# It is important to note that with ``return_as="submitted"``, the results are # still accumulated in RAM after computation. But as we asynchronously process # them, they can be freed sooner. However, if the generator is not consumed -# the memory still grows linearly. +# the memory still grows linearly. \ No newline at end of file diff --git a/joblib/parallel.py b/joblib/parallel.py index cf528dcab..92f1d67fe 100644 --- a/joblib/parallel.py +++ b/joblib/parallel.py @@ -967,11 +967,15 @@ class Parallel(Logger): soft hints (prefer) or hard constraints (require) so as to make it possible for library users to change the backend from the outside using the :func:`~parallel_backend` context manager. - return_generator: bool - If True, calls to this instance will return a generator, yielding - the results as soon as they are available, in the original order. - Note that the intended usage is to run one call at a time. Multiple - calls to the same Parallel object will result in a ``RuntimeError`` + return_as: str in {'list', 'submitted'}, default: 'list' + If 'list', calls to this instance will return a list, only when + all results have been processed and are ready to return. + Else it will return a generator that yields the results as soon as + they are available, in the order the tasks have been submitted + with. + Future releases are planned to also support 'completed', in which + case the generator immediately yields available results + independently of the submission order. prefer: str in {'processes', 'threads'} or None, default: None Soft hint to choose the default backend if no specific backend was selected with the :func:`~parallel_backend` context manager. @@ -1066,6 +1070,9 @@ class Parallel(Logger): * Ability to use shared memory efficiently with worker processes for large numpy-based datastructures. + Note that the intended usage is to run one call at a time. Multiple + calls to the same Parallel object will result in a ``RuntimeError`` + Examples -------- @@ -1161,7 +1168,7 @@ def __init__( self, n_jobs=default_parallel_config["n_jobs"], backend=None, - return_generator=False, + return_as="list", verbose=default_parallel_config["verbose"], timeout=None, pre_dispatch='2 * n_jobs', @@ -1191,7 +1198,14 @@ def __init__( self.verbose = _get_config_param(verbose, context_config, "verbose") self.timeout = timeout self.pre_dispatch = pre_dispatch - self.return_generator = return_generator + + if return_as not in {"list", "submitted"}: + raise ValueError( + 'Expected `return_as` parameter to be a string equal to "list"' + f' or "submitted", but got {return_as} instead' + ) + self.return_as = return_as + self.return_generator = return_as != "list" # Check if we are under a parallel_config or parallel_backend # context manager and use the config from the context manager @@ -1267,10 +1281,10 @@ def __init__( % batch_size) if not isinstance(backend, SequentialBackend): - if return_generator and not backend.supports_return_generator: + if self.return_generator and not backend.supports_return_generator: raise ValueError( "Backend {} does not support " - "return_generator=True".format(backend) + "return_as={}".format(backend, return_as) ) # This lock is used to coordinate the main thread of this process # with the async callback thread of our the pool. @@ -1925,4 +1939,4 @@ def _batched_calls_reducer_callback(): return output if self.return_generator else list(output) def __repr__(self): - return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs) + return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs) \ No newline at end of file diff --git a/joblib/test/test_parallel.py b/joblib/test/test_parallel.py index 38a8d9b38..66acce1bf 100644 --- a/joblib/test/test_parallel.py +++ b/joblib/test/test_parallel.py @@ -1212,12 +1212,12 @@ def set_list_value(input_list, index, value): @pytest.mark.parametrize('n_jobs', [1, 2, 4]) -def test_parallel_return_generator_order(n_jobs): +def test_parallel_return_order_with_return_as_submitted_parameter(n_jobs): # This test inserts values in a list in some expected order # in sequential computing, and then checks that this order has been # respected by Parallel output generator. input_list = [0] * 5 - result = Parallel(n_jobs=n_jobs, return_generator=True, + result = Parallel(n_jobs=n_jobs, return_as="submitted", backend='threading')( delayed(set_list_value)(input_list, i, i) for i in range(5)) @@ -1252,7 +1252,7 @@ def test_deadlock_with_generator(backend, n_jobs): # Non-regression test for a race condition in the backends when the pickler # is delayed by a large object. with Parallel(n_jobs=n_jobs, backend=backend, - return_generator=True) as parallel: + return_as="submitted") as parallel: result = parallel(delayed(get_large_object)(i) for i in range(10)) next(result) next(result) @@ -1270,7 +1270,7 @@ def test_multiple_generator_call(backend, n_jobs): # assumption that only one generator can be submitted at a time. with raises(RuntimeError, match="This Parallel instance is already running"): - parallel = Parallel(n_jobs, backend=backend, return_generator=True) + parallel = Parallel(n_jobs, backend=backend, return_as="submitted") g = parallel(delayed(sleep)(1) for _ in range(10)) # noqa: F841 t_start = time.time() gen2 = parallel(delayed(id)(i) for i in range(100)) # noqa: F841 @@ -1294,7 +1294,7 @@ def test_multiple_generator_call_managed(backend, n_jobs): # immediately when Parallel.__call__ is called. This test relies on the # assumption that only one generator can be submitted at a time. with Parallel(n_jobs, backend=backend, - return_generator=True) as parallel: + return_as="submitted") as parallel: g = parallel(delayed(sleep)(10) for _ in range(10)) # noqa: F841 t_start = time.time() with raises(RuntimeError, @@ -1317,10 +1317,10 @@ def test_multiple_generator_call_managed(backend, n_jobs): @parametrize('n_jobs', [1, 2, -2, -1]) def test_multiple_generator_call_separated(backend, n_jobs): # Check that for separated Parallel, both tasks are correctly returned. - g = Parallel(n_jobs, backend=backend, return_generator=True)( + g = Parallel(n_jobs, backend=backend, return_as="submitted")( delayed(sqrt)(i ** 2) for i in range(10) ) - g2 = Parallel(n_jobs, backend=backend, return_generator=True)( + g2 = Parallel(n_jobs, backend=backend, return_as="submitted")( delayed(sqrt)(i ** 2) for i in range(10, 20) ) @@ -1340,7 +1340,7 @@ def test_multiple_generator_call_separated_gc(backend, error): # Check that in loky, only one call can be run at a time with # a single executor. - parallel = Parallel(2, backend=backend, return_generator=True) + parallel = Parallel(2, backend=backend, return_as="submitted") g = parallel(delayed(sleep)(10) for i in range(10)) g_wr = weakref.finalize(g, lambda: print("Generator collected")) ctx = ( @@ -1353,7 +1353,7 @@ def test_multiple_generator_call_separated_gc(backend, error): # For the other backends, as the worker pools are not shared between # the two calls, this should proceed correctly. t_start = time.time() - g = Parallel(2, backend=backend, return_generator=True)( + g = Parallel(2, backend=backend, return_as="submitted")( delayed(sqrt)(i ** 2) for i in range(10, 20) ) @@ -1977,4 +1977,4 @@ def test_parallel_config_constructor_params(): # but backend constructor params are given with raises(ValueError, match="only supported when backend is not None"): with parallel_config(inner_max_num_threads=1): - pass + pass \ No newline at end of file From e98a91fec076a0e8e37146e5a7b73ede25e4b91a Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Thu, 22 Jun 2023 11:55:06 +0200 Subject: [PATCH 2/8] Add the link to the PR to CHANGES.rst --- CHANGES.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.rst b/CHANGES.rst index 04723ac3d..0103612ba 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -35,6 +35,7 @@ In development - Add a ``return_as`` parameter for ``Parallel``, that allows to consume results asynchronously. https://github.com/joblib/joblib/pull/1393 + https://github.com/joblib/joblib/pull/1458 - Improve the behavior of ``joblib`` for ``n_jobs=1``, with simplified tracebacks and more efficient running time. From debd3527659cb681448ed33c34a18793a2202a2f Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Thu, 22 Jun 2023 12:12:03 +0200 Subject: [PATCH 3/8] backquote formatting --- doc/parallel.rst | 21 +++++++++++---------- examples/parallel_generator.py | 13 +++++++------ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/doc/parallel.rst b/doc/parallel.rst index 8a011ceb7..e7a7d4a74 100644 --- a/doc/parallel.rst +++ b/doc/parallel.rst @@ -40,16 +40,17 @@ of the outputs always matches the order the inputs have been submitted with:: >>> print(list(output_generator)) [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] -This generator allows to reduce the memory footprint of :class:`joblib.Parallel` -calls in case the results can benefit from on-the-fly aggregation, as illustrated -in :ref:`sphx_glr_auto_examples_parallel_generator.py`. - -Future releases are planned to also support returning a generator that yields the -results in the order of completion rather than the order of submission, by using -`return_as="completed"` instead of `return_as="submitted"`. In this case the order the -results are returned with will depend on the concurrency of workers and will not -be guaranteed to be deterministic, meaning the results can be yielded with a different -order every time the code is executed. +This generator allows to reduce the memory footprint of +:class:`joblib.Parallel` calls in case the results can benefit from on-the-fly +aggregation, as illustrated in +:ref:`sphx_glr_auto_examples_parallel_generator.py`. + +Future releases are planned to also support returning a generator that yields +the results in the order of completion rather than the order of submission, by +using ``return_as="completed"`` instead of ``return_as="submitted"``. In this +case the order of the outputs will depend on the concurrency of workers and +will not be guaranteed to be deterministic, meaning the results can be yielded +with a different order every time the code is executed. Thread-based parallelism vs process-based parallelism ===================================================== diff --git a/examples/parallel_generator.py b/examples/parallel_generator.py index 9d90ecfa6..b33b8f7ca 100644 --- a/examples/parallel_generator.py +++ b/examples/parallel_generator.py @@ -10,12 +10,13 @@ observe a high memory usage, as all the results are held in RAM before being processed -Using the `return_as=` parameter with non-default value allows to progressively -consume the outputs as they arrive and keeps the memory at an acceptable level. +Using the ``return_as=`` parameter with non-default value allows to +progressively consume the outputs as they arrive and keeps the memory at an +acceptable level. -Using this feature requires passing `return_as="submitted"`, in which case the -generator yields the results in the order the tasks have been submitted with. -Future releases are also planned to support the `return_as="completed"` +Using this feature requires passing ``return_as="submitted"``, in which case +the generator yields the results in the order the tasks have been submitted +with. Future releases are also planned to support the ``return_as="completed"`` parameter to have the generator yield results as soon as available. """ @@ -99,7 +100,7 @@ def accumulator_sum(generator): ############################################################################## -# We process many of the tasks in parallel. If `return_as="list"` (default), +# We process many of the tasks in parallel. If ``return_as="list"`` (default), # we should expect a usage of more than 2GB in RAM. Indeed, all the results # are computed and stored in ``res`` before being processed by # `accumulator_sum` and collected by the gc. From 542d0819640bbfdbc5a9e148ce844938eb485ee4 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Thu, 22 Jun 2023 14:23:22 +0200 Subject: [PATCH 4/8] linting --- examples/parallel_generator.py | 2 +- joblib/parallel.py | 4 ++-- joblib/test/test_parallel.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/parallel_generator.py b/examples/parallel_generator.py index b33b8f7ca..87c27b5ed 100644 --- a/examples/parallel_generator.py +++ b/examples/parallel_generator.py @@ -176,4 +176,4 @@ def accumulator_sum(generator): # It is important to note that with ``return_as="submitted"``, the results are # still accumulated in RAM after computation. But as we asynchronously process # them, they can be freed sooner. However, if the generator is not consumed -# the memory still grows linearly. \ No newline at end of file +# the memory still grows linearly. diff --git a/joblib/parallel.py b/joblib/parallel.py index 92f1d67fe..145e70c4b 100644 --- a/joblib/parallel.py +++ b/joblib/parallel.py @@ -1679,7 +1679,7 @@ def _retrieve(self): while self._wait_retrieval(): # If the callback thread of a worker has signaled that its task - # triggerd an exception, or if the retrieval loop has raised an + # triggered an exception, or if the retrieval loop has raised an # exception (e.g. `GeneratorExit`), exit the loop and surface the # worker traceback. if self._aborting: @@ -1939,4 +1939,4 @@ def _batched_calls_reducer_callback(): return output if self.return_generator else list(output) def __repr__(self): - return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs) \ No newline at end of file + return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs) diff --git a/joblib/test/test_parallel.py b/joblib/test/test_parallel.py index 66acce1bf..d32a1d01c 100644 --- a/joblib/test/test_parallel.py +++ b/joblib/test/test_parallel.py @@ -1977,4 +1977,4 @@ def test_parallel_config_constructor_params(): # but backend constructor params are given with raises(ValueError, match="only supported when backend is not None"): with parallel_config(inner_max_num_threads=1): - pass \ No newline at end of file + pass From 7fad99d51335a05c4995dc7780f3566c52efe69d Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Fri, 23 Jun 2023 11:16:07 +0200 Subject: [PATCH 5/8] apply review suggestions --- examples/parallel_generator.py | 17 ++++++++--------- joblib/parallel.py | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/examples/parallel_generator.py b/examples/parallel_generator.py index 87c27b5ed..36bd67528 100644 --- a/examples/parallel_generator.py +++ b/examples/parallel_generator.py @@ -10,14 +10,13 @@ observe a high memory usage, as all the results are held in RAM before being processed -Using the ``return_as=`` parameter with non-default value allows to -progressively consume the outputs as they arrive and keeps the memory at an -acceptable level. +Using ``return_as='submitted'`` allows to progressively consume the outputs +as they arrive and keeps the memory at an acceptable level. -Using this feature requires passing ``return_as="submitted"``, in which case -the generator yields the results in the order the tasks have been submitted -with. Future releases are also planned to support the ``return_as="completed"`` -parameter to have the generator yield results as soon as available. +In this case, the output of the `Parallel` call is a generator that yields the +results in the order the tasks have been submitted with. Future releases are +also planned to support the ``return_as="completed"`` parameter to have the +generator yield results as soon as available. """ @@ -108,7 +107,7 @@ def accumulator_sum(generator): from joblib import Parallel, delayed monitor = MemoryMonitor() -print('Running tasks with return_as="list"...') +print("Running tasks with return_as='list'...") res = Parallel(n_jobs=2, return_as="list")( delayed(return_big_object)(i) for i in range(150) ) @@ -130,7 +129,7 @@ def accumulator_sum(generator): # by the gc. The memory footprint is thus reduced, typically around 300MB. monitor_gen = MemoryMonitor() -print('Create result generator with return_as="submitted"...') +print("Create result generator with return_as='submitted'...") res = Parallel(n_jobs=2, return_as="submitted")( delayed(return_big_object)(i) for i in range(150) ) diff --git a/joblib/parallel.py b/joblib/parallel.py index 145e70c4b..f944f59e2 100644 --- a/joblib/parallel.py +++ b/joblib/parallel.py @@ -969,7 +969,7 @@ class Parallel(Logger): using the :func:`~parallel_backend` context manager. return_as: str in {'list', 'submitted'}, default: 'list' If 'list', calls to this instance will return a list, only when - all results have been processed and are ready to return. + all results have been processed and retrieved. Else it will return a generator that yields the results as soon as they are available, in the order the tasks have been submitted with. From efcdb905286a786a52996e7d348f2d9fd770eae1 Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Fri, 23 Jun 2023 15:03:42 +0200 Subject: [PATCH 6/8] "submitted/completed" replaced with "generator/generator_unordered" --- CHANGES.rst | 4 ++-- doc/parallel.rst | 12 ++++++------ examples/parallel_generator.py | 14 +++++++------- joblib/_parallel_backends.py | 2 +- joblib/parallel.py | 22 +++++++++++----------- joblib/test/test_parallel.py | 18 +++++++++--------- 6 files changed, 36 insertions(+), 36 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index e48bdfa82..150fa23a2 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -36,8 +36,8 @@ In development custom cache invalidation based on the metadata of the function call. https://github.com/joblib/joblib/pull/1149 -- Add a ``return_generator`` parameter for ``Parallel``, that allows - to consume results asynchronously. +- Add a ``return_as`` parameter for ``Parallel``, that enables consuming + results asynchronously. https://github.com/joblib/joblib/pull/1393 https://github.com/joblib/joblib/pull/1458 diff --git a/doc/parallel.rst b/doc/parallel.rst index 2b18722b3..08f767966 100644 --- a/doc/parallel.rst +++ b/doc/parallel.rst @@ -29,7 +29,7 @@ of the outputs always matches the order the inputs have been submitted with:: >>> from math import sqrt >>> from joblib import Parallel, delayed - >>> parallel = Parallel(n_jobs=2, return_as="submitted") + >>> parallel = Parallel(n_jobs=2, return_as="generator") >>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10)) >>> print(type(output_generator)) @@ -40,17 +40,17 @@ of the outputs always matches the order the inputs have been submitted with:: >>> print(list(output_generator)) [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] -This generator allows to reduce the memory footprint of +This generator enables reducing the memory footprint of :class:`joblib.Parallel` calls in case the results can benefit from on-the-fly aggregation, as illustrated in :ref:`sphx_glr_auto_examples_parallel_generator.py`. Future releases are planned to also support returning a generator that yields the results in the order of completion rather than the order of submission, by -using ``return_as="completed"`` instead of ``return_as="submitted"``. In this -case the order of the outputs will depend on the concurrency of workers and -will not be guaranteed to be deterministic, meaning the results can be yielded -with a different order every time the code is executed. +using ``return_as="unordered_generator"`` instead of ``return_as="generator"``. +In this case the order of the outputs will depend on the concurrency of workers +and will not be guaranteed to be deterministic, meaning the results can be +yielded with a different order every time the code is executed. Thread-based parallelism vs process-based parallelism ===================================================== diff --git a/examples/parallel_generator.py b/examples/parallel_generator.py index 36bd67528..9c4ec30e5 100644 --- a/examples/parallel_generator.py +++ b/examples/parallel_generator.py @@ -10,13 +10,13 @@ observe a high memory usage, as all the results are held in RAM before being processed -Using ``return_as='submitted'`` allows to progressively consume the outputs +Using ``return_as='generator'`` allows to progressively consume the outputs as they arrive and keeps the memory at an acceptable level. In this case, the output of the `Parallel` call is a generator that yields the results in the order the tasks have been submitted with. Future releases are -also planned to support the ``return_as="completed"`` parameter to have the -generator yield results as soon as available. +also planned to support the ``return_as="unordered_generator"`` parameter to +have the generator yield results as soon as available. """ @@ -123,14 +123,14 @@ def accumulator_sum(generator): ############################################################################## -# If we use ``return_as="submitted"``, ``res`` is simply a generator with the +# If we use ``return_as="generator"``, ``res`` is simply a generator on the # results that are ready. Here we consume the results as soon as they arrive # with the ``accumulator_sum`` and once they have been used, they are collected # by the gc. The memory footprint is thus reduced, typically around 300MB. monitor_gen = MemoryMonitor() print("Create result generator with return_as='submitted'...") -res = Parallel(n_jobs=2, return_as="submitted")( +res = Parallel(n_jobs=2, return_as="generator")( delayed(return_big_object)(i) for i in range(150) ) print("Accumulate results:", end='') @@ -162,7 +162,7 @@ def accumulator_sum(generator): ) plt.semilogy( np.maximum.accumulate(monitor_gen.memory_buffer), - label='return_as="submitted"' + label='return_as="generator"' ) plt.xlabel("Time") plt.xticks([], []) @@ -172,7 +172,7 @@ def accumulator_sum(generator): plt.show() ############################################################################## -# It is important to note that with ``return_as="submitted"``, the results are +# It is important to note that with ``return_as="generator"``, the results are # still accumulated in RAM after computation. But as we asynchronously process # them, they can be freed sooner. However, if the generator is not consumed # the memory still grows linearly. diff --git a/joblib/_parallel_backends.py b/joblib/_parallel_backends.py index 53a71572a..b715f04e3 100644 --- a/joblib/_parallel_backends.py +++ b/joblib/_parallel_backends.py @@ -116,7 +116,7 @@ def get_exceptions(self): def abort_everything(self, ensure_ready=True): """Abort any running tasks - This is called when an exception has been raised when executing a tasks + This is called when an exception has been raised when executing a task and all the remaining tasks will be ignored and can therefore be aborted to spare computation resources. diff --git a/joblib/parallel.py b/joblib/parallel.py index f944f59e2..d2ef770a6 100644 --- a/joblib/parallel.py +++ b/joblib/parallel.py @@ -574,8 +574,8 @@ def __init__(self, iterator_slice, backend_and_jobs, reducer_callback=None, self._pickle_cache = pickle_cache if pickle_cache is not None else {} def __call__(self): - # Set the default nested backend to self._backend but do not set the - # change the default number of processes to -1 + # Set the default nested backend to self._backend but do not change the + # default number of processes to -1 with parallel_backend(self._backend, n_jobs=self._n_jobs): return [func(*args, **kwargs) for func, args, kwargs in self.items] @@ -967,14 +967,14 @@ class Parallel(Logger): soft hints (prefer) or hard constraints (require) so as to make it possible for library users to change the backend from the outside using the :func:`~parallel_backend` context manager. - return_as: str in {'list', 'submitted'}, default: 'list' + return_as: str in {'list', 'generator'}, default: 'list' If 'list', calls to this instance will return a list, only when all results have been processed and retrieved. - Else it will return a generator that yields the results as soon as - they are available, in the order the tasks have been submitted - with. - Future releases are planned to also support 'completed', in which - case the generator immediately yields available results + If 'generator', it will return a generator that yields the results + as soon as they are available, in the order the tasks have been + submitted with. + Future releases are planned to also support 'generator_unordered', + in which case the generator immediately yields available results independently of the submission order. prefer: str in {'processes', 'threads'} or None, default: None Soft hint to choose the default backend if no specific backend @@ -1199,10 +1199,10 @@ def __init__( self.timeout = timeout self.pre_dispatch = pre_dispatch - if return_as not in {"list", "submitted"}: + if return_as not in {"list", "generator"}: raise ValueError( 'Expected `return_as` parameter to be a string equal to "list"' - f' or "submitted", but got {return_as} instead' + f' or "generator", but got {return_as} instead' ) self.return_as = return_as self.return_generator = return_as != "list" @@ -1400,7 +1400,7 @@ def dispatch_one_batch(self, iterator): batch_size = self._get_batch_size() with self._lock: - # to ensure an even distribution of the workolad between workers, + # to ensure an even distribution of the workload between workers, # we look ahead in the original iterators more than batch_size # tasks - However, we keep consuming only one batch at each # dispatch_one_batch call. The extra tasks are stored in a local diff --git a/joblib/test/test_parallel.py b/joblib/test/test_parallel.py index d32a1d01c..6217e7341 100644 --- a/joblib/test/test_parallel.py +++ b/joblib/test/test_parallel.py @@ -1212,12 +1212,12 @@ def set_list_value(input_list, index, value): @pytest.mark.parametrize('n_jobs', [1, 2, 4]) -def test_parallel_return_order_with_return_as_submitted_parameter(n_jobs): +def test_parallel_return_order_with_return_as_generator_parameter(n_jobs): # This test inserts values in a list in some expected order # in sequential computing, and then checks that this order has been # respected by Parallel output generator. input_list = [0] * 5 - result = Parallel(n_jobs=n_jobs, return_as="submitted", + result = Parallel(n_jobs=n_jobs, return_as="generator", backend='threading')( delayed(set_list_value)(input_list, i, i) for i in range(5)) @@ -1252,7 +1252,7 @@ def test_deadlock_with_generator(backend, n_jobs): # Non-regression test for a race condition in the backends when the pickler # is delayed by a large object. with Parallel(n_jobs=n_jobs, backend=backend, - return_as="submitted") as parallel: + return_as="generator") as parallel: result = parallel(delayed(get_large_object)(i) for i in range(10)) next(result) next(result) @@ -1270,7 +1270,7 @@ def test_multiple_generator_call(backend, n_jobs): # assumption that only one generator can be submitted at a time. with raises(RuntimeError, match="This Parallel instance is already running"): - parallel = Parallel(n_jobs, backend=backend, return_as="submitted") + parallel = Parallel(n_jobs, backend=backend, return_as="generator") g = parallel(delayed(sleep)(1) for _ in range(10)) # noqa: F841 t_start = time.time() gen2 = parallel(delayed(id)(i) for i in range(100)) # noqa: F841 @@ -1294,7 +1294,7 @@ def test_multiple_generator_call_managed(backend, n_jobs): # immediately when Parallel.__call__ is called. This test relies on the # assumption that only one generator can be submitted at a time. with Parallel(n_jobs, backend=backend, - return_as="submitted") as parallel: + return_as="generator") as parallel: g = parallel(delayed(sleep)(10) for _ in range(10)) # noqa: F841 t_start = time.time() with raises(RuntimeError, @@ -1317,10 +1317,10 @@ def test_multiple_generator_call_managed(backend, n_jobs): @parametrize('n_jobs', [1, 2, -2, -1]) def test_multiple_generator_call_separated(backend, n_jobs): # Check that for separated Parallel, both tasks are correctly returned. - g = Parallel(n_jobs, backend=backend, return_as="submitted")( + g = Parallel(n_jobs, backend=backend, return_as="generator")( delayed(sqrt)(i ** 2) for i in range(10) ) - g2 = Parallel(n_jobs, backend=backend, return_as="submitted")( + g2 = Parallel(n_jobs, backend=backend, return_as="generator")( delayed(sqrt)(i ** 2) for i in range(10, 20) ) @@ -1340,7 +1340,7 @@ def test_multiple_generator_call_separated_gc(backend, error): # Check that in loky, only one call can be run at a time with # a single executor. - parallel = Parallel(2, backend=backend, return_as="submitted") + parallel = Parallel(2, backend=backend, return_as="generator") g = parallel(delayed(sleep)(10) for i in range(10)) g_wr = weakref.finalize(g, lambda: print("Generator collected")) ctx = ( @@ -1353,7 +1353,7 @@ def test_multiple_generator_call_separated_gc(backend, error): # For the other backends, as the worker pools are not shared between # the two calls, this should proceed correctly. t_start = time.time() - g = Parallel(2, backend=backend, return_as="submitted")( + g = Parallel(2, backend=backend, return_as="generator")( delayed(sqrt)(i ** 2) for i in range(10, 20) ) From 1e717d29157ecb29e36ffe02b8148baf148350ee Mon Sep 17 00:00:00 2001 From: Franck Charras <29153872+fcharras@users.noreply.github.com> Date: Fri, 23 Jun 2023 15:34:39 +0200 Subject: [PATCH 7/8] minor fixups --- examples/parallel_generator.py | 2 +- joblib/_parallel_backends.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/parallel_generator.py b/examples/parallel_generator.py index 9c4ec30e5..286518fa9 100644 --- a/examples/parallel_generator.py +++ b/examples/parallel_generator.py @@ -129,7 +129,7 @@ def accumulator_sum(generator): # by the gc. The memory footprint is thus reduced, typically around 300MB. monitor_gen = MemoryMonitor() -print("Create result generator with return_as='submitted'...") +print("Create result generator with return_as='generator'...") res = Parallel(n_jobs=2, return_as="generator")( delayed(return_big_object)(i) for i in range(150) ) diff --git a/joblib/_parallel_backends.py b/joblib/_parallel_backends.py index b715f04e3..4f4ff8abf 100644 --- a/joblib/_parallel_backends.py +++ b/joblib/_parallel_backends.py @@ -608,7 +608,7 @@ def retrieve_result_callback(self, out): raise RuntimeError( "The executor underlying Parallel has been shutdown. " "This is likely due to the garbage collection of a previous " - "generator from a call to Parallel with return_generator=True." + "generator from a call to Parallel with return_as=generator." " Make sure the generator is not garbage collected when " "submitting a new job or that it is first properly exhausted." ) From 7c2043ff3311aa658a363d4c517f5632a7b20381 Mon Sep 17 00:00:00 2001 From: Olivier Grisel Date: Fri, 23 Jun 2023 16:41:04 +0200 Subject: [PATCH 8/8] Typo Co-authored-by: Thomas Moreau --- joblib/_parallel_backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/joblib/_parallel_backends.py b/joblib/_parallel_backends.py index 4f4ff8abf..a3806007b 100644 --- a/joblib/_parallel_backends.py +++ b/joblib/_parallel_backends.py @@ -608,7 +608,7 @@ def retrieve_result_callback(self, out): raise RuntimeError( "The executor underlying Parallel has been shutdown. " "This is likely due to the garbage collection of a previous " - "generator from a call to Parallel with return_as=generator." + "generator from a call to Parallel with return_as='generator'." " Make sure the generator is not garbage collected when " "submitting a new job or that it is first properly exhausted." )