From 5b149510643e3c0c85efd8e8f077dac8e863c8d2 Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Wed, 30 Sep 2020 22:04:50 -0500 Subject: [PATCH 01/16] bpo-33533: Provide an async iterator version of as_completed * as_completed returns object that is both iterator and async iterator * Existing tests adjusted to test both the old and new style * New test to ensure iterator can be resumed * New test to ensure async iterator yields any passed-in Futures as-is --- Doc/library/asyncio-task.rst | 18 +- Doc/whatsnew/3.10.rst | 10 + Lib/asyncio/tasks.py | 140 ++++--- Lib/test/test_asyncio/test_tasks.py | 354 +++++++++++++----- Misc/ACKS | 1 + .../2020-10-02-17-35-19.bpo-33533.GLIhM5.rst | 2 + 6 files changed, 382 insertions(+), 143 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 99f012540d989b..43c6d24de3fe4a 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -593,9 +593,10 @@ Waiting Primitives .. function:: as_completed(aws, \*, loop=None, timeout=None) Run :ref:`awaitable objects ` in the *aws* - set concurrently. Return an iterator of coroutines. - Each coroutine returned can be awaited to get the earliest next - result from the set of the remaining awaitables. + set concurrently. Returns an asynchronous iterator of the next-completed + Tasks or Futures. If Tasks or Futures are supplied, those same objects + are yielded on completion. Other awaitables are scheduled and their + implicitly created Tasks are yielded instead. Raises :exc:`asyncio.TimeoutError` if the timeout occurs before all Futures are done. @@ -605,10 +606,17 @@ Waiting Primitives Example:: - for coro in as_completed(aws): - earliest_result = await coro + async for task in as_completed(aws): + earliest_result = await task # ... + For backwards compatibility, the object returned by ``as_completed()`` + can be iterated as a plain iterator, yielding new coroutines that return + the results of the passed in awaitables as their tasks finish.:: + + for aw in as_completed(aws): + earliest_result = await aw + # ... Running in Threads ================== diff --git a/Doc/whatsnew/3.10.rst b/Doc/whatsnew/3.10.rst index 957a3e791ecb69..023b1e2b0bc626 100644 --- a/Doc/whatsnew/3.10.rst +++ b/Doc/whatsnew/3.10.rst @@ -102,6 +102,16 @@ New Modules Improved Modules ================ +asyncio +------- + +:func:`asyncio.as_completed` now returns an :term:`asynchronous iterator` of +awaitables. The yielded awaitables include Task or Future objects that were +passed in, making it easier to associate results with the tasks being +completed. For backwards compatibility, the object returned by +:func:`asyncio.as_completed` can still be iterated as a regular +:term:`iterator` yielding new coroutines. +(Contributed by Justin Arthur in :issue:`33533`.) base64 ------ diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 8b05434f273b52..dbbf76057f830e 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -9,6 +9,7 @@ '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) +import asyncio import concurrent.futures import contextvars import functools @@ -553,67 +554,106 @@ async def _cancel_and_wait(fut, loop): fut.remove_done_callback(cb) -# This is *not* a @coroutine! It is just an iterator (yielding Futures). -def as_completed(fs, *, loop=None, timeout=None): - """Return an iterator whose values are coroutines. +class _AsCompletedIterator: + """Doubles as an async iterator of as-completed results of a set of + awaitables and a plain iterator of awaitable as-completed results + (of awaitables). + """ + def __init__(self, aws, loop, timeout): + self._done = asyncio.Queue(loop=loop) + self._timeout_handle = None - When waiting for the yielded coroutines you'll get the results (or - exceptions!) of the original Futures (or coroutines), in the order - in which and as soon as they complete. + if loop is None: + loop = events.get_event_loop() + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) - This differs from PEP 3148; the proper way to use this is: + todo = {ensure_future(aw, loop=loop) for aw in frozenset(aws)} + for f in todo: + f.add_done_callback(self._handle_completion) + if todo and timeout is not None: + self._timeout_handle = ( + loop.call_later(timeout, self._handle_timeout) + ) + self._todo = todo + self._todo_left = len(todo) + + def __aiter__(self): + return self + + def __iter__(self): + return self + + async def __anext__(self): + self._todo_left -= 1 + if self._todo_left < 0: + raise StopAsyncIteration + return await self._wait_for_one() + + def __next__(self): + self._todo_left -= 1 + if self._todo_left < 0: + raise StopIteration + return self._wait_for_one(resolve=True) + + def _handle_timeout(self): + for f in self._todo: + f.remove_done_callback(self._handle_completion) + self._done.put_nowait(None) # Sentinel for _wait_for_one(). + self._todo.clear() # Can't do todo.remove(f) in the loop. + + def _handle_completion(self, f): + if not self._todo: + return # _handle_timeout() was here first. + self._todo.remove(f) + self._done.put_nowait(f) + if not self._todo and self._timeout_handle is not None: + self._timeout_handle.cancel() + + async def _wait_for_one(self, resolve=False): + """Waits for the next future to be done and returns it unless resolve + is set, in which case it returns either the result of the future or + raises an exception.""" + f = await self._done.get() + if f is None: + # Dummy value from _handle_timeout(). + raise exceptions.TimeoutError + return f.result() if resolve else f - for f in as_completed(fs): - result = await f # The 'await' may raise. - # Use result. - If a timeout is specified, the 'await' will raise - TimeoutError when the timeout occurs before all Futures are done. +def as_completed(fs, *, loop=None, timeout=None): + """Return an async iterator that yields results from the given awaitables + in the order they finish as they finish. - Note: The futures 'f' are not necessarily members of fs. - """ - if futures.isfuture(fs) or coroutines.iscoroutine(fs): - raise TypeError(f"expect a list of futures, not {type(fs).__name__}") + async for result in as_completed(fs): + # Use result. - from .queues import Queue # Import here to avoid circular import problem. - done = Queue(loop=loop) + The first of any exceptions from the given awaitables will be raised by + the async for statement, as will a TimeoutError if the timeout is reached + before the iterator is exhausted. - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - todo = {ensure_future(f, loop=loop) for f in set(fs)} - timeout_handle = None + For backwards compatibility, the returned object can also be used as a + plain iterator that yields new awaitables representing each next awaitable + to be completed: - def _on_timeout(): - for f in todo: - f.remove_done_callback(_on_completion) - done.put_nowait(None) # Queue a dummy value for _wait_for_one(). - todo.clear() # Can't do todo.remove(f) in the loop. + for next_result in as_completed(fs): + result = await next_result # The 'await' may raise. + # Use result. - def _on_completion(f): - if not todo: - return # _on_timeout() was here first. - todo.remove(f) - done.put_nowait(f) - if not todo and timeout_handle is not None: - timeout_handle.cancel() + When waiting for the yielded awaitables, you'll get the results (or + exceptions!) of the original awaitables in the order in which and as soon + as they complete. - async def _wait_for_one(): - f = await done.get() - if f is None: - # Dummy value from _on_timeout(). - raise exceptions.TimeoutError - return f.result() # May raise f.exception(). + Note: The futures 'f' are not necessarily members of fs. + """ + if inspect.isawaitable(fs): + raise TypeError( + f"expects an iterable of awaitables, not {type(fs).__name__}" + ) - for f in todo: - f.add_done_callback(_on_completion) - if todo and timeout is not None: - timeout_handle = loop.call_later(timeout, _on_timeout) - for _ in range(len(todo)): - yield _wait_for_one() + return _AsCompletedIterator(fs, loop, timeout) @types.coroutine diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 74fc1e4a42133c..d5c95d7d131f83 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1556,43 +1556,110 @@ def gen(): yield 0.01 yield 0 - loop = self.new_test_loop(gen) - # disable "slow callback" warning - loop.slow_callback_duration = 1.0 completed = set() time_shifted = False - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def sleeper(dt, x): - nonlocal time_shifted - yield from asyncio.sleep(dt) - completed.add(x) - if not time_shifted and 'a' in completed and 'b' in completed: - time_shifted = True - loop.advance_time(0.14) - return x - - a = sleeper(0.01, 'a') - b = sleeper(0.01, 'b') - c = sleeper(0.15, 'c') + async def sleeper(dt, x): + nonlocal time_shifted + await asyncio.sleep(dt) + completed.add(x) + if not time_shifted and 'a' in completed and 'b' in completed: + time_shifted = True + loop.advance_time(0.14) + return x - async def foo(): + async def do_nothing(): + pass + + async def try_iterator(awaitables, loop): values = [] - for f in asyncio.as_completed([b, c, a], loop=loop): + for f in asyncio.as_completed(awaitables, loop=loop): values.append(await f) return values - with self.assertWarns(DeprecationWarning): - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) - self.assertTrue('a' in res[:2]) - self.assertTrue('b' in res[:2]) - self.assertEqual(res[2], 'c') - # Doing it again should take no time and exercise a different path. - with self.assertWarns(DeprecationWarning): - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) + async def try_async_iterator(awaitables, loop): + values = [] + async for f in asyncio.as_completed(awaitables, loop=loop): + values.append(await f) + return values + + for foo in try_iterator, try_async_iterator: + with self.subTest(method=foo.__name__): + loop = self.new_test_loop(gen) + # disable "slow callback" warning + loop.slow_callback_duration = 1.0 + awaitables = ( + sleeper(0.01, 'b'), + sleeper(0.15, 'c'), + sleeper(0.01, 'a') + ) + with self.assertWarns(DeprecationWarning): + res = loop.run_until_complete( + self.new_task(loop, foo(awaitables, loop)) + ) + self.assertAlmostEqual(0.15, loop.time()) + self.assertTrue('a' in res[:2]) + self.assertTrue('b' in res[:2]) + self.assertEqual(res[2], 'c') + + # Doing nothing should take no time. + awaitables = ( + do_nothing(), + do_nothing(), + do_nothing() + ) + with self.assertWarns(DeprecationWarning): + loop.run_until_complete( + self.new_task(loop, foo(awaitables, loop)) + ) + self.assertAlmostEqual(0.15, loop.time()) + completed.clear() + time_shifted = False + + def test_as_completed_same_tasks_in_as_out(self): + """Ensures that asynchronously iterating as_completed's iterator + yields awaitables are the same awaitables that were passed in when + those awaitables are futures.""" + async def try_async_iterator(awaitables, loop): + awaitables_out = set() + with self.assertWarns(DeprecationWarning): + async for out_aw in ( + asyncio.as_completed(awaitables, loop=loop) + ): + awaitables_out.add(out_aw) + return awaitables_out + + async def coro(i): + return i + + with contextlib.closing(asyncio.new_event_loop()) as loop: + # Coroutines shouldn't be yielded back as finished coroutines + # can't be re-used. + awaitables_in = frozenset( + (coro(0), coro(1), coro(2), coro(3)) + ) + awaitables_out = loop.run_until_complete( + try_async_iterator(awaitables_in, loop) + ) + if awaitables_in - awaitables_out != awaitables_in: + raise self.failureException('Got original coroutines ' + 'out of as_completed iterator.') + + # Tasks should be yielded back. + coro_obj_a = coro('a') + task_b = loop.create_task(coro('b')) + coro_obj_c = coro('c') + task_d = loop.create_task(coro('d')) + awaitables_in = frozenset( + (coro_obj_a, task_b, coro_obj_c, task_d) + ) + awaitables_out = loop.run_until_complete( + try_async_iterator(awaitables_in, loop) + ) + if awaitables_in & awaitables_out != {task_b, task_d}: + raise self.failureException('Only tasks should be yielded ' + 'from as_completed iterator ' + 'as-is.') def test_as_completed_with_timeout(self): @@ -1602,12 +1669,7 @@ def gen(): yield 0 yield 0.1 - loop = self.new_test_loop(gen) - - a = loop.create_task(asyncio.sleep(0.1, 'a')) - b = loop.create_task(asyncio.sleep(0.15, 'b')) - - async def foo(): + async def try_iterator(): values = [] for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): if values: @@ -1619,17 +1681,38 @@ async def foo(): values.append((2, exc)) return values - with self.assertWarns(DeprecationWarning): - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertEqual(len(res), 2, res) - self.assertEqual(res[0], (1, 'a')) - self.assertEqual(res[1][0], 2) - self.assertIsInstance(res[1][1], asyncio.TimeoutError) - self.assertAlmostEqual(0.12, loop.time()) + async def try_async_iterator(): + values = [] + try: + async for f in ( + asyncio.as_completed([a, b], timeout=0.12, loop=loop) + ): + v = await f + values.append((1, v)) + loop.advance_time(0.02) + except asyncio.TimeoutError as exc: + values.append((2, exc)) + return values - # move forward to close generator - loop.advance_time(10) - loop.run_until_complete(asyncio.wait([a, b])) + for foo in try_iterator, try_async_iterator: + with self.subTest(method=foo.__name__): + loop = self.new_test_loop(gen) + a = loop.create_task(asyncio.sleep(0.1, 'a')) + b = loop.create_task(asyncio.sleep(0.15, 'b')) + + with self.assertWarns(DeprecationWarning): + res = loop.run_until_complete( + self.new_task(loop, foo()) + ) + self.assertEqual(len(res), 2, res) + self.assertEqual(res[0], (1, 'a')) + self.assertEqual(res[1][0], 2) + self.assertIsInstance(res[1][1], asyncio.TimeoutError) + self.assertAlmostEqual(0.12, loop.time()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(asyncio.wait([a, b])) def test_as_completed_with_unused_timeout(self): @@ -1638,20 +1721,76 @@ def gen(): yield 0 yield 0.01 - loop = self.new_test_loop(gen) - - a = asyncio.sleep(0.01, 'a') - - async def foo(): + async def try_iterator(): for f in asyncio.as_completed([a], timeout=1, loop=loop): v = await f self.assertEqual(v, 'a') - with self.assertWarns(DeprecationWarning): - loop.run_until_complete(self.new_task(loop, foo())) + async def try_async_iterator(): + async for f in asyncio.as_completed([a], timeout=1, loop=loop): + v = await f + self.assertEqual(v, 'a') - def test_as_completed_reverse_wait(self): + for foo in try_iterator, try_async_iterator: + with self.subTest(method=foo.__name__): + loop = self.new_test_loop(gen) + a = asyncio.sleep(0.01, 'a') + + with self.assertWarns(DeprecationWarning): + loop.run_until_complete(self.new_task(loop, foo())) + + def test_as_completed_resume_iterator(self): + """Test that as_completed returns an iterator that can be resumed + the next time iteration is performed (i.e. if __iter__ is called + again)""" + async def try_iterator(awaitables): + iterations = 0 + iterator = asyncio.as_completed(awaitables) + collected = [] + for f in iterator: + collected.append(await f) + iterations += 1 + if iterations == 2: + break + self.assertEqual(len(collected), 2) + + # Resume same iterator: + for f in iterator: + collected.append(await f) + return collected + + async def try_async_iterator(awaitables): + iterations = 0 + iterator = asyncio.as_completed(awaitables) + collected = [] + async for f in iterator: + collected.append(await f) + iterations += 1 + if iterations == 2: + break + self.assertEqual(len(collected), 2) + + # Resume same iterator: + async for f in iterator: + collected.append(await f) + return collected + + async def coro(i): + return i + + with contextlib.closing(asyncio.new_event_loop()) as loop: + for foo in try_iterator, try_async_iterator: + with self.subTest(method=foo.__name__): + results = loop.run_until_complete( + foo((coro(0), coro(1), coro(2), coro(3))) + ) + self.assertCountEqual(results, (0, 1, 2, 3)) + def test_as_completed_reverse_wait(self): + """Tests the plain iterator style of as_completed iteration to ensure + that the first future awaited resolves to the first completed + awaitable from the set we passed in, even if it wasn't the first + future generated by as_completed.""" def gen(): yield 0 yield 0.05 @@ -1676,7 +1815,9 @@ def gen(): self.assertAlmostEqual(0.10, loop.time()) def test_as_completed_concurrent(self): - + """Tests the plain iterator style of as_completed iteration to ensure + that more than one generated future from as_completed can be awaited + concurrently""" def gen(): when = yield self.assertAlmostEqual(0.05, when) @@ -1684,43 +1825,62 @@ def gen(): self.assertAlmostEqual(0.05, when) yield 0.05 - loop = self.new_test_loop(gen) + async def try_iterator(fs, loop): + return list(asyncio.as_completed(fs, loop=loop)) - a = asyncio.sleep(0.05, 'a') - b = asyncio.sleep(0.05, 'b') - fs = {a, b} - with self.assertWarns(DeprecationWarning): - futs = list(asyncio.as_completed(fs, loop=loop)) - self.assertEqual(len(futs), 2) - waiter = asyncio.wait(futs) - # Deprecation from passing coros in futs to asyncio.wait() - with self.assertWarns(DeprecationWarning): - done, pending = loop.run_until_complete(waiter) - self.assertEqual(set(f.result() for f in done), {'a', 'b'}) + async def try_async_iterator(fs, loop): + return [f async for f in asyncio.as_completed(fs, loop=loop)] + + for foo in try_iterator, try_async_iterator: + with self.subTest(method=foo.__name__): + loop = self.new_test_loop(gen) + + a = asyncio.sleep(0.05, 'a') + b = asyncio.sleep(0.05, 'b') + fs = {a, b} + with self.assertWarns(DeprecationWarning): + futs = loop.run_until_complete(foo(fs, loop)) + self.assertEqual(len(futs), 2) + waiter = asyncio.wait(futs) + if any(asyncio.iscoroutine(f) for f in futs): + # Deprecation from passing coros in futs to asyncio.wait() + warns = self.assertWarns(DeprecationWarning) + else: + warns = contextlib.nullcontext() + with warns: + done, pending = loop.run_until_complete(waiter) + self.assertEqual({f.result() for f in done}, {'a', 'b'}) def test_as_completed_duplicate_coroutines(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(s): - return s + async def coro(s): + return s - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def runner(): - result = [] - c = coro('ham') + async def try_iterator(): + result = [] + c = coro('ham') + with self.assertWarns(DeprecationWarning): for f in asyncio.as_completed([c, c, coro('spam')], loop=self.loop): - result.append((yield from f)) - return result + result.append(await f) + return result - with self.assertWarns(DeprecationWarning): - fut = self.new_task(self.loop, runner()) - self.loop.run_until_complete(fut) - result = fut.result() - self.assertEqual(set(result), {'ham', 'spam'}) - self.assertEqual(len(result), 2) + async def try_async_iterator(): + result = [] + c = coro('ham') + with self.assertWarns(DeprecationWarning): + async for f in asyncio.as_completed([c, c, coro('spam')], + loop=self.loop): + result.append(await f) + return result + + for foo in try_iterator, try_async_iterator: + with self.subTest(method=foo.__name__): + fut = self.new_task(self.loop, foo()) + self.loop.run_until_complete(fut) + result = fut.result() + self.assertEqual(set(result), {'ham', 'spam'}) + self.assertEqual(len(result), 2) def test_sleep(self): @@ -2182,14 +2342,32 @@ def test_gather_shield(self): test_utils.run_briefly(self.loop) def test_as_completed_invalid_args(self): + """as_completed() expects a list of futures, not a future instance + TypeError should be raised either on iterator construction or first + iteration + """ + # Plain iterator fut = self.new_future(self.loop) + with self.assertRaises(TypeError): + iterator = asyncio.as_completed(fut, loop=self.loop) + next(iterator) + coro = coroutine_function() + with self.assertRaises(TypeError): + iterator = asyncio.as_completed(coro, loop=self.loop) + next(iterator) + coro.close() - # as_completed() expects a list of futures, not a future instance - self.assertRaises(TypeError, self.loop.run_until_complete, - asyncio.as_completed(fut, loop=self.loop)) + # Async iterator + async def try_async_iterator(aw, loop): + async for f in asyncio.as_completed(aw, loop=loop): + break + + fut = self.new_future(self.loop) + with self.assertRaises(TypeError): + self.loop.run_until_complete(try_async_iterator(fut, self.loop)) coro = coroutine_function() - self.assertRaises(TypeError, self.loop.run_until_complete, - asyncio.as_completed(coro, loop=self.loop)) + with self.assertRaises(TypeError): + self.loop.run_until_complete(try_async_iterator(coro, self.loop)) coro.close() def test_wait_invalid_args(self): diff --git a/Misc/ACKS b/Misc/ACKS index 9be0e777ca2942..ba66a818954233 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -70,6 +70,7 @@ Alexandru Ardelean Emmanuel Arias Alicia Arlen Jeffrey Armstrong +Justin Turner Arthur Jason Asbahr David Ascher Ammar Askar diff --git a/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst b/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst new file mode 100644 index 00000000000000..23577096fdf75e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst @@ -0,0 +1,2 @@ +:func:`asyncio.as_completed` now returns an asynchronous iterator. Patch by +Justin Arthur. From f0f22f06f209232fc240b9a1f67b0d12f5a4f26f Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Sun, 4 Oct 2020 00:42:58 -0500 Subject: [PATCH 02/16] New docstring for revised async iteration pattern. Doc polish. Follows the revisions made based on @hniksic's feedback. --- Doc/library/asyncio-task.rst | 21 +++++++++++---------- Lib/asyncio/tasks.py | 28 +++++++++++++++------------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 43c6d24de3fe4a..8dc9a4f7daaff3 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -592,17 +592,14 @@ Waiting Primitives .. function:: as_completed(aws, \*, loop=None, timeout=None) - Run :ref:`awaitable objects ` in the *aws* - set concurrently. Returns an asynchronous iterator of the next-completed - Tasks or Futures. If Tasks or Futures are supplied, those same objects - are yielded on completion. Other awaitables are scheduled and their - implicitly created Tasks are yielded instead. - - Raises :exc:`asyncio.TimeoutError` if the timeout occurs before - all Futures are done. + Run :ref:`awaitable objects ` in the *aws* set + concurrently. Returns an :term:`asynchronous iterator` of the next-completed + Tasks or Futures. If Tasks or Futures are supplied, those same objects are + yielded on completion. Other awaitables are scheduled and their implicitly + created Tasks are yielded instead. - .. deprecated-removed:: 3.8 3.10 - The *loop* parameter. + Raises :exc:`asyncio.TimeoutError` if the timeout occurs before all Futures + are done. Example:: @@ -618,6 +615,10 @@ Waiting Primitives earliest_result = await aw # ... + .. deprecated-removed:: 3.8 3.10 + The *loop* parameter. + + Running in Threads ================== diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index dbbf76057f830e..ae9e6a0ec4f164 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -624,29 +624,31 @@ async def _wait_for_one(self, resolve=False): def as_completed(fs, *, loop=None, timeout=None): - """Return an async iterator that yields results from the given awaitables + """Return an async iterator that yields tasks from the given awaitables in the order they finish as they finish. - async for result in as_completed(fs): + async for completed_task in as_completed(fs): + result = await completed_task # Use result. - The first of any exceptions from the given awaitables will be raised by - the async for statement, as will a TimeoutError if the timeout is reached - before the iterator is exhausted. + Supplied Tasks and Futures are yielded as-is once they've completed. + Coroutines will be scheduled and their implicitly created tasks will be + yielded instead. + + Any exception from the given awaitables will be raised by the async for + statement, as will a TimeoutError if the timeout is reached before + the iterator is exhausted. For backwards compatibility, the returned object can also be used as a - plain iterator that yields new awaitables representing each next awaitable + plain iterator that yields new coroutines representing each next awaitable to be completed: - for next_result in as_completed(fs): - result = await next_result # The 'await' may raise. + for next_completed in as_completed(fs): + result = await next_completed # The 'await' may raise. # Use result. - When waiting for the yielded awaitables, you'll get the results (or - exceptions!) of the original awaitables in the order in which and as soon - as they complete. - - Note: The futures 'f' are not necessarily members of fs. + The coroutines yielded by the plain iterator are not the original + awaitables passed in. """ if inspect.isawaitable(fs): raise TypeError( From 27d546ac72ee7c3a59b36795bcc8e053d912b62e Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Sun, 4 Oct 2020 11:23:41 -0500 Subject: [PATCH 03/16] Remove internal callbacks on as_completed iterator deletion. Suggested by @hniksic. --- Lib/asyncio/tasks.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index ae9e6a0ec4f164..5142269360fc2a 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -555,9 +555,10 @@ async def _cancel_and_wait(fut, loop): class _AsCompletedIterator: - """Doubles as an async iterator of as-completed results of a set of - awaitables and a plain iterator of awaitable as-completed results - (of awaitables). + """Doubles as an async iterator of as-completed tasks and futures + from a supplied set of awaitables and a plain iterator of + coroutines that resolve to results from the supplied awaitables as + their underlying tasks or futures complete. """ def __init__(self, aws, loop, timeout): self._done = asyncio.Queue(loop=loop) @@ -598,6 +599,10 @@ def __next__(self): raise StopIteration return self._wait_for_one(resolve=True) + def __del__(self): + for f in self._todo: + f.remove_done_callback(self._handle_completion) + def _handle_timeout(self): for f in self._todo: f.remove_done_callback(self._handle_completion) From c049b3a8ba17a60329a4fcfa483755af6112b005 Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Sun, 4 Oct 2020 12:01:18 -0500 Subject: [PATCH 04/16] Update docstring of concurrent test for revised async iterator shape --- Lib/test/test_asyncio/test_tasks.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index d5c95d7d131f83..b8abef22b8145f 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1787,10 +1787,10 @@ async def coro(i): self.assertCountEqual(results, (0, 1, 2, 3)) def test_as_completed_reverse_wait(self): - """Tests the plain iterator style of as_completed iteration to ensure - that the first future awaited resolves to the first completed - awaitable from the set we passed in, even if it wasn't the first - future generated by as_completed.""" + """Tests the plain iterator style of as_completed iteration to + ensure that the first future awaited resolves to the first + completed awaitable from the set we passed in, even if it wasn't + the first future generated by as_completed.""" def gen(): yield 0 yield 0.05 @@ -1815,9 +1815,8 @@ def gen(): self.assertAlmostEqual(0.10, loop.time()) def test_as_completed_concurrent(self): - """Tests the plain iterator style of as_completed iteration to ensure - that more than one generated future from as_completed can be awaited - concurrently""" + """Ensure that more than one future or coroutine yielded from + as_completed can be awaited concurrently.""" def gen(): when = yield self.assertAlmostEqual(0.05, when) From 891214931c1be17a37cd5a684b3f6b1fe16dfef3 Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Sun, 4 Oct 2020 15:35:59 -0500 Subject: [PATCH 05/16] Clarify TimeoutError source in as_completed docstring. --- Doc/library/asyncio-task.rst | 3 ++- Lib/asyncio/tasks.py | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 8dc9a4f7daaff3..ee677c28ea34e6 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -609,7 +609,8 @@ Waiting Primitives For backwards compatibility, the object returned by ``as_completed()`` can be iterated as a plain iterator, yielding new coroutines that return - the results of the passed in awaitables as their tasks finish.:: + the results or raise the errors of the passed in awaitables as their tasks + finish.:: for aw in as_completed(aws): earliest_result = await aw diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 5142269360fc2a..ad919dfb007863 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -640,20 +640,20 @@ def as_completed(fs, *, loop=None, timeout=None): Coroutines will be scheduled and their implicitly created tasks will be yielded instead. - Any exception from the given awaitables will be raised by the async for - statement, as will a TimeoutError if the timeout is reached before - the iterator is exhausted. + A TimeoutError is raised by the async for statement if the supplied + timeout is reached before the iterator is exhausted. For backwards compatibility, the returned object can also be used as a plain iterator that yields new coroutines representing each next awaitable to be completed: for next_completed in as_completed(fs): - result = await next_completed # The 'await' may raise. + result = await next_completed # Use result. The coroutines yielded by the plain iterator are not the original - awaitables passed in. + awaitables passed in and are the source of TimeoutErrors instead of the + for loop. """ if inspect.isawaitable(fs): raise TypeError( From f759fb4c0e91023ea16114b47caaafdfbee1299f Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Wed, 20 Dec 2023 16:07:34 +0200 Subject: [PATCH 06/16] Fix tests. --- Lib/asyncio/tasks.py | 1 + Lib/test/test_asyncio/test_tasks.py | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index ab4a93b465b754..7b73f7b6ac55ee 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -574,6 +574,7 @@ class _AsCompletedIterator: def __init__(self, aws, timeout): self._done = asyncio.Queue() self._timeout_handle = None + self._todo = set() loop = events.get_event_loop() todo = {ensure_future(aw, loop=loop) for aw in frozenset(aws)} diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 5ceca637c8ce21..bc6d88e65a4966 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1419,7 +1419,7 @@ async def sleeper(dt, x): loop.advance_time(0.14) return x - async def try_iterator(): + async def try_iterator(awaitables): values = [] for f in asyncio.as_completed(awaitables): values.append(await f) @@ -1547,8 +1547,6 @@ def gen(): yield 0 yield 0.01 - a = asyncio.sleep(0.01, 'a') - async def try_iterator(): for f in asyncio.as_completed([a], timeout=1): v = await f @@ -1561,6 +1559,7 @@ async def try_async_iterator(): for foo in try_iterator, try_async_iterator: with self.subTest(method=foo.__name__): + a = asyncio.sleep(0.01, 'a') loop = self.new_test_loop(gen) loop.run_until_complete(self.new_task(loop, foo())) loop.close() @@ -1709,8 +1708,8 @@ async def coro(): a = coro() self.addCleanup(a.close) - futs = asyncio.as_completed([a]) with self.assertRaisesRegex(RuntimeError, 'no current event loop'): + futs = asyncio.as_completed([a]) list(futs) def test_as_completed_coroutine_use_running_loop(self): From c74a16900780a138f9ccf00c504654016d62e3d0 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Wed, 20 Dec 2023 18:22:52 +0200 Subject: [PATCH 07/16] Update docs. --- Doc/library/asyncio-task.rst | 4 ++++ Doc/whatsnew/3.10.rst | 10 ---------- Doc/whatsnew/3.13.rst | 9 +++++++++ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index a48b492afaac80..2016345e63c393 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -895,6 +895,10 @@ Waiting Primitives .. versionchanged:: 3.12 Added support for generators yielding tasks. + .. versionchanged:: 3.13 + The result is now an :term:`asynchronous iterator` as well as + an :term:`iterator`. + Running in Threads ================== diff --git a/Doc/whatsnew/3.10.rst b/Doc/whatsnew/3.10.rst index 21f51ab38d9529..2da90b7ed55744 100644 --- a/Doc/whatsnew/3.10.rst +++ b/Doc/whatsnew/3.10.rst @@ -894,16 +894,6 @@ New Modules Improved Modules ================ -asyncio -------- - -:func:`asyncio.as_completed` now returns an :term:`asynchronous iterator` of -awaitables. The yielded awaitables include Task or Future objects that were -passed in, making it easier to associate results with the tasks being -completed. For backwards compatibility, the object returned by -:func:`asyncio.as_completed` can still be iterated as a regular -:term:`iterator` yielding new coroutines. -(Contributed by Justin Arthur in :issue:`33533`.) asyncio ------- diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index 2c869cbe11396b..e3dc90efce4e6a 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -163,6 +163,15 @@ asyncio the Unix socket when the server is closed. (Contributed by Pierre Ossman in :gh:`111246`.) +* :func:`asyncio.as_completed` now returns an :term:`asynchronous iterator` of + awaitables. + The yielded awaitables include Task or Future objects that were passed in, + making it easier to associate results with the tasks being completed. + For backwards compatibility, the object returned by + :func:`asyncio.as_completed` can still be iterated as a regular + :term:`iterator` yielding new coroutines. + (Contributed by Justin Arthur in :issue:`77714`.) + copy ---- From a2b3b77e9b02049f8668ffb1ebc2139707b8e0ff Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Thu, 21 Dec 2023 15:37:32 +0200 Subject: [PATCH 08/16] Avoid circular import problem. --- Lib/asyncio/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 7b73f7b6ac55ee..1bdd77adf2ba31 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -10,7 +10,6 @@ '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) -import asyncio import concurrent.futures import contextvars import functools @@ -572,7 +571,8 @@ class _AsCompletedIterator: their underlying tasks or futures complete. """ def __init__(self, aws, timeout): - self._done = asyncio.Queue() + from .queues import Queue # Import here to avoid circular import problem. + self._done = Queue() self._timeout_handle = None self._todo = set() From 0915257c4589279cbf258d6beb119ced398ccf02 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Thu, 21 Dec 2023 15:54:40 +0200 Subject: [PATCH 09/16] Small refactoring. --- Lib/asyncio/tasks.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 1bdd77adf2ba31..59d3ca02697e1c 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -577,7 +577,7 @@ def __init__(self, aws, timeout): self._todo = set() loop = events.get_event_loop() - todo = {ensure_future(aw, loop=loop) for aw in frozenset(aws)} + todo = {ensure_future(aw, loop=loop) for aw in set(aws)} for f in todo: f.add_done_callback(self._handle_completion) if todo and timeout is not None: @@ -594,15 +594,17 @@ def __iter__(self): return self async def __anext__(self): - self._todo_left -= 1 - if self._todo_left < 0: + if not self._todo_left: raise StopAsyncIteration + assert self._todo_left > 0 + self._todo_left -= 1 return await self._wait_for_one() def __next__(self): - self._todo_left -= 1 - if self._todo_left < 0: + if not self._todo_left: raise StopIteration + assert self._todo_left > 0 + self._todo_left -= 1 return self._wait_for_one(resolve=True) def __del__(self): From 9e6658a3eaff4885a82b67afdcd1b7e71f0cf7c8 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Thu, 21 Dec 2023 18:31:29 +0200 Subject: [PATCH 10/16] Remove __del__ method which does not have effect. --- Lib/asyncio/tasks.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 59d3ca02697e1c..b941496e8c3fc8 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -574,7 +574,6 @@ def __init__(self, aws, timeout): from .queues import Queue # Import here to avoid circular import problem. self._done = Queue() self._timeout_handle = None - self._todo = set() loop = events.get_event_loop() todo = {ensure_future(aw, loop=loop) for aw in set(aws)} @@ -607,10 +606,6 @@ def __next__(self): self._todo_left -= 1 return self._wait_for_one(resolve=True) - def __del__(self): - for f in self._todo: - f.remove_done_callback(self._handle_completion) - def _handle_timeout(self): for f in self._todo: f.remove_done_callback(self._handle_completion) From 32ac40214e44aa8ef54bce24f9333a84da89a899 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Thu, 21 Dec 2023 18:47:13 +0200 Subject: [PATCH 11/16] Fix What's New/ --- Doc/whatsnew/3.13.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index e3dc90efce4e6a..4b1e99b6a7d57a 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -170,7 +170,7 @@ asyncio For backwards compatibility, the object returned by :func:`asyncio.as_completed` can still be iterated as a regular :term:`iterator` yielding new coroutines. - (Contributed by Justin Arthur in :issue:`77714`.) + (Contributed by Justin Arthur in :gh:`77714`.) copy ---- From a6cdcf7de65b7aea44b0e505a528b95a44a61e17 Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Sat, 10 Feb 2024 14:01:13 -0600 Subject: [PATCH 12/16] Doc and docstring changes following @gvanrossum review. --- Doc/library/asyncio-task.rst | 53 ++++++++----- Doc/whatsnew/3.13.rst | 12 ++- Lib/asyncio/tasks.py | 78 ++++++++++++------- .../2020-10-02-17-35-19.bpo-33533.GLIhM5.rst | 7 +- 4 files changed, 93 insertions(+), 57 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 2016345e63c393..ec325301a273e3 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -862,28 +862,45 @@ Waiting Primitives .. function:: as_completed(aws, *, timeout=None) Run :ref:`awaitable objects ` in the *aws* iterable - concurrently. Returns an :term:`asynchronous iterator` of the next-completed - Tasks or Futures. If Tasks or Futures are supplied, those same objects are - yielded on completion. Other awaitables are scheduled and their implicitly - created Tasks are yielded instead. - - Raises :exc:`TimeoutError` if the timeout occurs before - all Futures are done. + concurrently. The returned object can be iterated to obtain earliest next + results of the awaitables as they finish. + + The object returned by ``as_completed()`` can be iterated as an + :term:`asynchronous iterator` or a plain :term:`iterator`. When asynchronous + iteration is used, the originally-supplied awaitables are yielded if they + are tasks or futures. This makes it easy to correlate previously-scheduled + tasks with their results. Example:: + + ipv4_connect = create_task( + open_connection("127.0.0.1", 80) + ) + ipv6_connect = create_task( + open_connection("::1", 80) + ) + tasks = [ipv4_connect, ipv6_connect] + + async for earliest_connect in as_completed(tasks): + reader, writer = await earliest_connect + if earliest_connect is ipv6_connect: + print('IPv6 connection established.') + else: + print('IPv4 connection established.') + # ... - Example:: + During asynchronous iteration, implicitly-created tasks will be yielded for + supplied awaitables that aren't tasks or futures. - async for task in as_completed(aws): - earliest_result = await task - # ... + When used as a plain iterator, each iteration yields a new coroutine that + returns the result or raises the exception of the next completed awaitable. + This pattern is compatible with Python versions older than 3.13:: - For backwards compatibility, the object returned by ``as_completed()`` - can be iterated as a plain iterator, yielding new coroutines that return - the results or raise the errors of the passed in awaitables as their tasks - finish.:: + for coro in as_completed(aws): + earliest_result = await coro + # ... - for aw in as_completed(aws): - earliest_result = await aw - # ... + A :exc:`TimeoutError` is raised if the timeout occurs before all awaitables + are done. This is raised by the ``async for`` loop during asynchronous + iteration or by the coroutines yielded during plain iteration. .. versionchanged:: 3.10 Removed the *loop* parameter. diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index 4b1e99b6a7d57a..ecbb3cc4b0b483 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -163,13 +163,11 @@ asyncio the Unix socket when the server is closed. (Contributed by Pierre Ossman in :gh:`111246`.) -* :func:`asyncio.as_completed` now returns an :term:`asynchronous iterator` of - awaitables. - The yielded awaitables include Task or Future objects that were passed in, - making it easier to associate results with the tasks being completed. - For backwards compatibility, the object returned by - :func:`asyncio.as_completed` can still be iterated as a regular - :term:`iterator` yielding new coroutines. +* :func:`asyncio.as_completed` now returns an object that is both an + :term:`asynchronous iterator` and a plain :term:`iterator` of awaitables. + The awaitables yielded by asynchronous iteration include original task or + future objects that were passed in, making it easier to associate results + with the tasks being completed. (Contributed by Justin Arthur in :gh:`77714`.) copy diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index b941496e8c3fc8..a0710428cc967b 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -565,8 +565,11 @@ async def _cancel_and_wait(fut): class _AsCompletedIterator: - """Doubles as an async iterator of as-completed tasks and futures - from a supplied set of awaitables and a plain iterator of + """Iterator of awaitables representing tasks of asyncio.as_completed + + Doubles as an async iterator and plain iterator of as-completed results. + Original tasks and futures from the supplied set of awaitables are yielded + during asynchronous iteration. plain iterator of coroutines that resolve to results from the supplied awaitables as their underlying tasks or futures complete. """ @@ -621,9 +624,9 @@ def _handle_completion(self, f): self._timeout_handle.cancel() async def _wait_for_one(self, resolve=False): - """Waits for the next future to be done and returns it unless resolve - is set, in which case it returns either the result of the future or - raises an exception.""" + # Wait for the next future to be done and return it unless resolve is + # set, in which case return either the result of the future or raise + # an exception. f = await self._done.get() if f is None: # Dummy value from _handle_timeout(). @@ -632,31 +635,46 @@ async def _wait_for_one(self, resolve=False): def as_completed(fs, *, timeout=None): - """Return an async iterator that yields tasks from the given awaitables - in the order they finish as they finish. - - async for completed_task in as_completed(fs): - result = await completed_task - # Use result. - - Supplied Tasks and Futures are yielded as-is once they've completed. - Coroutines will be scheduled and their implicitly created tasks will be - yielded instead. - - A TimeoutError is raised by the async for statement if the supplied - timeout is reached before the iterator is exhausted. - - For backwards compatibility, the returned object can also be used as a - plain iterator that yields new coroutines representing each next awaitable - to be completed: - - for next_completed in as_completed(fs): - result = await next_completed - # Use result. - - The coroutines yielded by the plain iterator are not the original - awaitables passed in and are the source of TimeoutErrors instead of the - for loop. + """Create an iterator of awaitables or their results in completion order. + + Run the supplied awaitables concurrently. The returned object can be + iterated to obtain earliest next results of the awaitables as they finish. + + The object returned can be iterated as an asynchronous iterator or a plain + iterator. When asynchronous iteration is used, the originally-supplied + awaitables are yielded if they are tasks or futures. This makes it easy to + correlate previously-scheduled tasks with their results: + + ipv4_connect = create_task( + open_connection("127.0.0.1", 80) + ) + ipv6_connect = create_task( + open_connection("::1", 80) + ) + tasks = [ipv4_connect, ipv6_connect] + + async for earliest_connect in as_completed(tasks): + reader, writer = await earliest_connect + if earliest_connect is ipv6_connect: + print('IPv6 connection established.') + else: + print('IPv4 connection established.') + # ... + + During asynchronous iteration, implicitly-created tasks will be yielded for + supplied awaitables that aren't tasks or futures. + + When used as a plain iterator, each iteration yields a new coroutine that + returns the result or raises the exception of the next completed awaitable. + This pattern is compatible with Python versions older than 3.13: + + for coro in as_completed(aws): + earliest_result = await coro + # ... + + A TimeoutError is raised if the timeout occurs before all awaitables + are done. This is raised by the async for loop during asynchronous + iteration or by the coroutines yielded during plain iteration. """ if inspect.isawaitable(fs): raise TypeError( diff --git a/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst b/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst index 23577096fdf75e..3ffd723cf1082a 100644 --- a/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst +++ b/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst @@ -1,2 +1,5 @@ -:func:`asyncio.as_completed` now returns an asynchronous iterator. Patch by -Justin Arthur. +:func:`asyncio.as_completed` now returns an object that is both an asynchronous +iterator and plain iterator. The new asynchronous iteration pattern allows for +easier correlation between prior tasks and their completed results. This is +a closer match to :func:`concurrent.futures.as_completed`'s iteration pattern. +Patch by Justin Arthur. From 40c13ad37761a1a1f770e60f01fa25b6383c8358 Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Mon, 26 Feb 2024 23:22:49 -0600 Subject: [PATCH 13/16] Consistent quotation marks in examples. --- Doc/library/asyncio-task.rst | 4 ++-- Lib/asyncio/tasks.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index ec325301a273e3..6b83c7f8c39ab1 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -882,9 +882,9 @@ Waiting Primitives async for earliest_connect in as_completed(tasks): reader, writer = await earliest_connect if earliest_connect is ipv6_connect: - print('IPv6 connection established.') + print("IPv6 connection established.") else: - print('IPv4 connection established.') + print("IPv4 connection established.") # ... During asynchronous iteration, implicitly-created tasks will be yielded for diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index a0710428cc967b..d4d79263a63d80 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -656,9 +656,9 @@ def as_completed(fs, *, timeout=None): async for earliest_connect in as_completed(tasks): reader, writer = await earliest_connect if earliest_connect is ipv6_connect: - print('IPv6 connection established.') + print("IPv6 connection established.") else: - print('IPv4 connection established.') + print("IPv4 connection established.") # ... During asynchronous iteration, implicitly-created tasks will be yielded for From 0e56f0b326c97d3b5c55d71c9097af844f75b397 Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Tue, 27 Feb 2024 21:25:47 -0600 Subject: [PATCH 14/16] Apply suggestions from code review. Co-authored-by: Guido van Rossum Co-authored-by: Serhiy Storchaka --- Doc/library/asyncio-task.rst | 16 ++++++---------- Lib/asyncio/tasks.py | 2 +- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 6b83c7f8c39ab1..c5a037e569c311 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -862,8 +862,8 @@ Waiting Primitives .. function:: as_completed(aws, *, timeout=None) Run :ref:`awaitable objects ` in the *aws* iterable - concurrently. The returned object can be iterated to obtain earliest next - results of the awaitables as they finish. + concurrently. The returned object can be iterated to obtain the results + of the awaitables as they finish. The object returned by ``as_completed()`` can be iterated as an :term:`asynchronous iterator` or a plain :term:`iterator`. When asynchronous @@ -871,12 +871,8 @@ Waiting Primitives are tasks or futures. This makes it easy to correlate previously-scheduled tasks with their results. Example:: - ipv4_connect = create_task( - open_connection("127.0.0.1", 80) - ) - ipv6_connect = create_task( - open_connection("::1", 80) - ) + ipv4_connect = create_task(open_connection("127.0.0.1", 80)) + ipv6_connect = create_task(open_connection("::1", 80)) tasks = [ipv4_connect, ipv6_connect] async for earliest_connect in as_completed(tasks): @@ -913,8 +909,8 @@ Waiting Primitives Added support for generators yielding tasks. .. versionchanged:: 3.13 - The result is now an :term:`asynchronous iterator` as well as - an :term:`iterator`. + The result can now be used as either an :term:`asynchronous iterator` + or as a plain :term:`iterator` (previously it was only a plain iterator). Running in Threads diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index d4d79263a63d80..49810508941250 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -565,7 +565,7 @@ async def _cancel_and_wait(fut): class _AsCompletedIterator: - """Iterator of awaitables representing tasks of asyncio.as_completed + """Iterator of awaitables representing tasks of asyncio.as_completed. Doubles as an async iterator and plain iterator of as-completed results. Original tasks and futures from the supplied set of awaitables are yielded From c4d84abe48555fdb77b70dbe97b43986c4befee0 Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Wed, 28 Feb 2024 21:43:36 -0600 Subject: [PATCH 15/16] Expand as_completed examples, clean _AsCompleted docstring salad. --- Doc/library/asyncio-task.rst | 16 ++++++++--- Lib/asyncio/tasks.py | 56 +++++++++++++++++++----------------- 2 files changed, 41 insertions(+), 31 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index c5a037e569c311..6e2df2982e7a41 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -876,12 +876,14 @@ Waiting Primitives tasks = [ipv4_connect, ipv6_connect] async for earliest_connect in as_completed(tasks): + # earliest_connect is done. The result can be obtained by + # awaiting it or calling earliest_connect.result() reader, writer = await earliest_connect + if earliest_connect is ipv6_connect: print("IPv6 connection established.") else: print("IPv4 connection established.") - # ... During asynchronous iteration, implicitly-created tasks will be yielded for supplied awaitables that aren't tasks or futures. @@ -890,9 +892,15 @@ Waiting Primitives returns the result or raises the exception of the next completed awaitable. This pattern is compatible with Python versions older than 3.13:: - for coro in as_completed(aws): - earliest_result = await coro - # ... + ipv4_connect = create_task(open_connection("127.0.0.1", 80)) + ipv6_connect = create_task(open_connection("::1", 80)) + tasks = [ipv4_connect, ipv6_connect] + + for next_connect in as_completed(tasks): + # next_connect is not one of the original task objects. It must be + # awaited to obtain the result value or raise the exception of the + # awaitable that finishes next. + reader, writer = await next_connect A :exc:`TimeoutError` is raised if the timeout occurs before all awaitables are done. This is raised by the ``async for`` loop during asynchronous diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 49810508941250..89a186c924efb9 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -567,11 +567,9 @@ async def _cancel_and_wait(fut): class _AsCompletedIterator: """Iterator of awaitables representing tasks of asyncio.as_completed. - Doubles as an async iterator and plain iterator of as-completed results. - Original tasks and futures from the supplied set of awaitables are yielded - during asynchronous iteration. plain iterator of - coroutines that resolve to results from the supplied awaitables as - their underlying tasks or futures complete. + As an asynchronous iterator, iteration yields futures as they finish. As a + plain iterator, new coroutines are yielded that will return or raise the + result of the next underlying future to complete. """ def __init__(self, aws, timeout): from .queues import Queue # Import here to avoid circular import problem. @@ -638,28 +636,26 @@ def as_completed(fs, *, timeout=None): """Create an iterator of awaitables or their results in completion order. Run the supplied awaitables concurrently. The returned object can be - iterated to obtain earliest next results of the awaitables as they finish. + iterated to obtain the results of the awaitables as they finish. The object returned can be iterated as an asynchronous iterator or a plain iterator. When asynchronous iteration is used, the originally-supplied awaitables are yielded if they are tasks or futures. This makes it easy to correlate previously-scheduled tasks with their results: - ipv4_connect = create_task( - open_connection("127.0.0.1", 80) - ) - ipv6_connect = create_task( - open_connection("::1", 80) - ) - tasks = [ipv4_connect, ipv6_connect] - - async for earliest_connect in as_completed(tasks): - reader, writer = await earliest_connect - if earliest_connect is ipv6_connect: - print("IPv6 connection established.") - else: - print("IPv4 connection established.") - # ... + ipv4_connect = create_task(open_connection("127.0.0.1", 80)) + ipv6_connect = create_task(open_connection("::1", 80)) + tasks = [ipv4_connect, ipv6_connect] + + async for earliest_connect in as_completed(tasks): + # earliest_connect is done. The result can be obtained by + # awaiting it or calling earliest_connect.result() + reader, writer = await earliest_connect + + if earliest_connect is ipv6_connect: + print("IPv6 connection established.") + else: + print("IPv4 connection established.") During asynchronous iteration, implicitly-created tasks will be yielded for supplied awaitables that aren't tasks or futures. @@ -668,13 +664,19 @@ def as_completed(fs, *, timeout=None): returns the result or raises the exception of the next completed awaitable. This pattern is compatible with Python versions older than 3.13: - for coro in as_completed(aws): - earliest_result = await coro - # ... + ipv4_connect = create_task(open_connection("127.0.0.1", 80)) + ipv6_connect = create_task(open_connection("::1", 80)) + tasks = [ipv4_connect, ipv6_connect] + + for next_connect in as_completed(tasks): + # next_connect is not one of the original task objects. It must be + # awaited to obtain the result value or raise the exception of the + # awaitable that finishes next. + reader, writer = await next_connect - A TimeoutError is raised if the timeout occurs before all awaitables - are done. This is raised by the async for loop during asynchronous - iteration or by the coroutines yielded during plain iteration. + A TimeoutError is raised if the timeout occurs before all awaitables are + done. This is raised by the async for loop during asynchronous iteration or + by the coroutines yielded during plain iteration. """ if inspect.isawaitable(fs): raise TypeError( From 378b69cc136aa55efe2d2ba329ced706c1369b4c Mon Sep 17 00:00:00 2001 From: Justin Turner Arthur Date: Thu, 29 Feb 2024 21:31:51 -0600 Subject: [PATCH 16/16] Import queues module at module-level of asyncio.tasks. --- Lib/asyncio/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 89a186c924efb9..0f1b6616ec625b 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -25,6 +25,7 @@ from . import events from . import exceptions from . import futures +from . import queues from . import timeouts # Helper to generate new task names @@ -572,8 +573,7 @@ class _AsCompletedIterator: result of the next underlying future to complete. """ def __init__(self, aws, timeout): - from .queues import Queue # Import here to avoid circular import problem. - self._done = Queue() + self._done = queues.Queue() self._timeout_handle = None loop = events.get_event_loop()