From 7edee0b2ae2bc8fcddf677961f279b59d4bf720d Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 10 Feb 2022 15:19:04 -0800 Subject: [PATCH 01/23] Integrate task groups from EdgeDb My plan is roughly the following: - [x] Copy the files from EdgeDb without modifications (named following asyncio conventions) - [ ] Bang on the tests until they will run - [ ] Bang on the tests until they pass - [ ] Remove pre-3.11 compatibility code - [ ] Switch from MultiError to ExceptionGroup - [ ] Other cleanup - [ ] Add a public API to tasks.py to replace `__cancel_requested__` --- Lib/asyncio/taskgroups.py | 315 ++++++++++++ Lib/test/test_asyncio/test_taskgroups.py | 602 +++++++++++++++++++++++ 2 files changed, 917 insertions(+) create mode 100644 Lib/asyncio/taskgroups.py create mode 100644 Lib/test/test_asyncio/test_taskgroups.py diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py new file mode 100644 index 00000000000000..731ce7238e0519 --- /dev/null +++ b/Lib/asyncio/taskgroups.py @@ -0,0 +1,315 @@ +# +# This source file is part of the EdgeDB open source project. +# +# Copyright 2016-present MagicStack Inc. and the EdgeDB authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from __future__ import annotations + +import asyncio +import itertools +import sys +import textwrap +import traceback +import types +import weakref + + +class TaskGroup: + + def __init__(self, *, name=None): + if name is None: + self._name = f'tg-{_name_counter()}' + else: + self._name = str(name) + + self._entered = False + self._exiting = False + self._aborting = False + self._loop = None + self._parent_task = None + self._parent_cancel_requested = False + self._tasks = weakref.WeakSet() + self._unfinished_tasks = 0 + self._errors = [] + self._base_error = None + self._on_completed_fut = None + + def get_name(self): + return self._name + + def __repr__(self): + msg = f'= (3, 8): + + # In Python 3.8 Tasks propagate all exceptions correctly, + # except for KeybaordInterrupt and SystemExit which are + # still considered special. + + def _is_base_error(self, exc: BaseException) -> bool: + assert isinstance(exc, BaseException) + return isinstance(exc, (SystemExit, KeyboardInterrupt)) + + else: + + # In Python prior to 3.8 all BaseExceptions are special and + # are bypassing the proper propagation through async/await + # code, essentially aborting the execution. + + def _is_base_error(self, exc: BaseException) -> bool: + assert isinstance(exc, BaseException) + return not isinstance(exc, Exception) + + def _patch_task(self, task): + # In Python 3.8 we'll need proper API on asyncio.Task to + # make TaskGroups possible. We need to be able to access + # information about task cancellation, more specifically, + # we need a flag to say if a task was cancelled or not. + # We also need to be able to flip that flag. + + if sys.version_info >= (3, 9): + def _task_cancel(self, msg=None): + self.__cancel_requested__ = True + return asyncio.Task.cancel(self, msg) + else: + def _task_cancel(self): + self.__cancel_requested__ = True + return asyncio.Task.cancel(self) + + if hasattr(task, '__cancel_requested__'): + return + + task.__cancel_requested__ = False + # confirm that we were successful at adding the new attribute: + assert not task.__cancel_requested__ + + task.cancel = types.MethodType(_task_cancel, task) + + def _abort(self): + self._aborting = True + + for t in self._tasks: + if not t.done(): + t.cancel() + + def _on_task_done(self, task): + self._unfinished_tasks -= 1 + assert self._unfinished_tasks >= 0 + + if self._exiting and not self._unfinished_tasks: + if not self._on_completed_fut.done(): + self._on_completed_fut.set_result(True) + + if task.cancelled(): + return + + exc = task.exception() + if exc is None: + return + + self._errors.append(exc) + if self._is_base_error(exc) and self._base_error is None: + self._base_error = exc + + if self._parent_task.done(): + # Not sure if this case is possible, but we want to handle + # it anyways. + self._loop.call_exception_handler({ + 'message': f'Task {task!r} has errored out but its parent ' + f'task {self._parent_task} is already completed', + 'exception': exc, + 'task': task, + }) + return + + self._abort() + if not self._parent_task.__cancel_requested__: + # If parent task *is not* being cancelled, it means that we want + # to manually cancel it to abort whatever is being run right now + # in the TaskGroup. But we want to mark parent task as + # "not cancelled" later in __aexit__. Example situation that + # we need to handle: + # + # async def foo(): + # try: + # async with TaskGroup() as g: + # g.create_task(crash_soon()) + # await something # <- this needs to be canceled + # # by the TaskGroup, e.g. + # # foo() needs to be cancelled + # except Exception: + # # Ignore any exceptions raised in the TaskGroup + # pass + # await something_else # this line has to be called + # # after TaskGroup is finished. + self._parent_cancel_requested = True + self._parent_task.cancel() + + +class MultiError(Exception): + + def __init__(self, msg, *args, errors=()): + if errors: + types = set(type(e).__name__ for e in errors) + msg = f'{msg}; {len(errors)} sub errors: ({", ".join(types)})' + for er in errors: + msg += f'\n + {type(er).__name__}: {er}' + if er.__traceback__: + er_tb = ''.join(traceback.format_tb(er.__traceback__)) + er_tb = textwrap.indent(er_tb, ' | ') + msg += f'\n{er_tb}\n' + super().__init__(msg, *args) + self.__errors__ = tuple(errors) + + def get_error_types(self): + return {type(e) for e in self.__errors__} + + def __reduce__(self): + return (type(self), (self.args,), {'__errors__': self.__errors__}) + + +class TaskGroupError(MultiError): + pass + + +_name_counter = itertools.count(1).__next__ diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py new file mode 100644 index 00000000000000..712050ac974086 --- /dev/null +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -0,0 +1,602 @@ +# +# This source file is part of the EdgeDB open source project. +# +# Copyright 2016-present MagicStack Inc. and the EdgeDB authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import asyncio + +from edb.common import taskgroup +from edb.testbase import server as tb + + +class MyExc(Exception): + pass + + +class TestTaskGroup(tb.TestCase): + + async def test_taskgroup_01(self): + + async def foo1(): + await asyncio.sleep(0.1) + return 42 + + async def foo2(): + await asyncio.sleep(0.2) + return 11 + + async with taskgroup.TaskGroup() as g: + t1 = g.create_task(foo1()) + t2 = g.create_task(foo2()) + + self.assertEqual(t1.result(), 42) + self.assertEqual(t2.result(), 11) + + async def test_taskgroup_02(self): + + async def foo1(): + await asyncio.sleep(0.1) + return 42 + + async def foo2(): + await asyncio.sleep(0.2) + return 11 + + async with taskgroup.TaskGroup() as g: + t1 = g.create_task(foo1()) + await asyncio.sleep(0.15) + t2 = g.create_task(foo2()) + + self.assertEqual(t1.result(), 42) + self.assertEqual(t2.result(), 11) + + async def test_taskgroup_03(self): + + async def foo1(): + await asyncio.sleep(1) + return 42 + + async def foo2(): + await asyncio.sleep(0.2) + return 11 + + async with taskgroup.TaskGroup() as g: + t1 = g.create_task(foo1()) + await asyncio.sleep(0.15) + # cancel t1 explicitly, i.e. everything should continue + # working as expected. + t1.cancel() + + t2 = g.create_task(foo2()) + + self.assertTrue(t1.cancelled()) + self.assertEqual(t2.result(), 11) + + async def test_taskgroup_04(self): + + NUM = 0 + t2_cancel = False + t2 = None + + async def foo1(): + await asyncio.sleep(0.1) + 1 / 0 + + async def foo2(): + nonlocal NUM, t2_cancel + try: + await asyncio.sleep(1) + except asyncio.CancelledError: + t2_cancel = True + raise + NUM += 1 + + async def runner(): + nonlocal NUM, t2 + + async with taskgroup.TaskGroup() as g: + g.create_task(foo1()) + t2 = g.create_task(foo2()) + + NUM += 10 + + with self.assertRaisesRegex(taskgroup.TaskGroupError, + r'1 sub errors: \(ZeroDivisionError\)'): + await self.loop.create_task(runner()) + + self.assertEqual(NUM, 0) + self.assertTrue(t2_cancel) + self.assertTrue(t2.cancelled()) + + async def test_taskgroup_05(self): + + NUM = 0 + t2_cancel = False + runner_cancel = False + + async def foo1(): + await asyncio.sleep(0.1) + 1 / 0 + + async def foo2(): + nonlocal NUM, t2_cancel + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + t2_cancel = True + raise + NUM += 1 + + async def runner(): + nonlocal NUM, runner_cancel + + async with taskgroup.TaskGroup() as g: + g.create_task(foo1()) + g.create_task(foo1()) + g.create_task(foo1()) + g.create_task(foo2()) + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + runner_cancel = True + raise + + NUM += 10 + + # The 3 foo1 sub tasks can be racy when the host is busy - if the + # cancellation happens in the middle, we'll see partial sub errors here + with self.assertRaisesRegex( + taskgroup.TaskGroupError, + r'(1|2|3) sub errors: \(ZeroDivisionError\)', + ): + await self.loop.create_task(runner()) + + self.assertEqual(NUM, 0) + self.assertTrue(t2_cancel) + self.assertTrue(runner_cancel) + + async def test_taskgroup_06(self): + + NUM = 0 + + async def foo(): + nonlocal NUM + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + NUM += 1 + raise + + async def runner(): + async with taskgroup.TaskGroup() as g: + for _ in range(5): + g.create_task(foo()) + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 5) + + async def test_taskgroup_07(self): + + NUM = 0 + + async def foo(): + nonlocal NUM + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + NUM += 1 + raise + + async def runner(): + nonlocal NUM + async with taskgroup.TaskGroup() as g: + for _ in range(5): + g.create_task(foo()) + + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + raise + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 15) + + async def test_taskgroup_08(self): + + async def foo(): + await asyncio.sleep(0.1) + 1 / 0 + + async def runner(): + async with taskgroup.TaskGroup() as g: + for _ in range(5): + g.create_task(foo()) + + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + raise + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_09(self): + + t1 = t2 = None + + async def foo1(): + await asyncio.sleep(1) + return 42 + + async def foo2(): + await asyncio.sleep(2) + return 11 + + async def runner(): + nonlocal t1, t2 + async with taskgroup.TaskGroup() as g: + t1 = g.create_task(foo1()) + t2 = g.create_task(foo2()) + await asyncio.sleep(0.1) + 1 / 0 + + try: + await runner() + except taskgroup.TaskGroupError as t: + self.assertEqual(t.get_error_types(), {ZeroDivisionError}) + else: + self.fail('TaskGroupError was not raised') + + self.assertTrue(t1.cancelled()) + self.assertTrue(t2.cancelled()) + + async def test_taskgroup_10(self): + + t1 = t2 = None + + async def foo1(): + await asyncio.sleep(1) + return 42 + + async def foo2(): + await asyncio.sleep(2) + return 11 + + async def runner(): + nonlocal t1, t2 + async with taskgroup.TaskGroup() as g: + t1 = g.create_task(foo1()) + t2 = g.create_task(foo2()) + 1 / 0 + + try: + await runner() + except taskgroup.TaskGroupError as t: + self.assertEqual(t.get_error_types(), {ZeroDivisionError}) + else: + self.fail('TaskGroupError was not raised') + + self.assertTrue(t1.cancelled()) + self.assertTrue(t2.cancelled()) + + async def test_taskgroup_11(self): + + async def foo(): + await asyncio.sleep(0.1) + 1 / 0 + + async def runner(): + async with taskgroup.TaskGroup(): + async with taskgroup.TaskGroup() as g2: + for _ in range(5): + g2.create_task(foo()) + + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + raise + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_12(self): + + async def foo(): + await asyncio.sleep(0.1) + 1 / 0 + + async def runner(): + async with taskgroup.TaskGroup() as g1: + g1.create_task(asyncio.sleep(10)) + + async with taskgroup.TaskGroup() as g2: + for _ in range(5): + g2.create_task(foo()) + + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + raise + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_13(self): + + async def crash_after(t): + await asyncio.sleep(t) + raise ValueError(t) + + async def runner(): + async with taskgroup.TaskGroup(name='g1') as g1: + g1.create_task(crash_after(0.1)) + + async with taskgroup.TaskGroup(name='g2') as g2: + g2.create_task(crash_after(0.2)) + + r = self.loop.create_task(runner()) + with self.assertRaisesRegex(taskgroup.TaskGroupError, r'1 sub errors'): + await r + + async def test_taskgroup_14(self): + + async def crash_after(t): + await asyncio.sleep(t) + raise ValueError(t) + + async def runner(): + async with taskgroup.TaskGroup(name='g1') as g1: + g1.create_task(crash_after(0.2)) + + async with taskgroup.TaskGroup(name='g2') as g2: + g2.create_task(crash_after(0.1)) + + r = self.loop.create_task(runner()) + with self.assertRaisesRegex(taskgroup.TaskGroupError, r'1 sub errors'): + await r + + async def test_taskgroup_15(self): + + async def crash_soon(): + await asyncio.sleep(0.3) + 1 / 0 + + async def runner(): + async with taskgroup.TaskGroup(name='g1') as g1: + g1.create_task(crash_soon()) + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + await asyncio.sleep(0.5) + raise + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_16(self): + + async def crash_soon(): + await asyncio.sleep(0.3) + 1 / 0 + + async def nested_runner(): + async with taskgroup.TaskGroup(name='g1') as g1: + g1.create_task(crash_soon()) + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + await asyncio.sleep(0.5) + raise + + async def runner(): + t = self.loop.create_task(nested_runner()) + await t + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_17(self): + NUM = 0 + + async def runner(): + nonlocal NUM + async with taskgroup.TaskGroup(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + raise + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 10) + + async def test_taskgroup_18(self): + NUM = 0 + + async def runner(): + nonlocal NUM + async with taskgroup.TaskGroup(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + # This isn't a good idea, but we have to support + # this weird case. + raise MyExc + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + + try: + await r + except taskgroup.TaskGroupError as t: + self.assertEqual(t.get_error_types(), {MyExc}) + else: + self.fail('TaskGroupError was not raised') + + self.assertEqual(NUM, 10) + + async def test_taskgroup_19(self): + async def crash_soon(): + await asyncio.sleep(0.1) + 1 / 0 + + async def nested(): + try: + await asyncio.sleep(10) + finally: + raise MyExc + + async def runner(): + async with taskgroup.TaskGroup() as g: + g.create_task(crash_soon()) + await nested() + + r = self.loop.create_task(runner()) + try: + await r + except taskgroup.TaskGroupError as t: + self.assertEqual(t.get_error_types(), {MyExc, ZeroDivisionError}) + else: + self.fail('TasgGroupError was not raised') + + async def test_taskgroup_20(self): + async def crash_soon(): + await asyncio.sleep(0.1) + 1 / 0 + + async def nested(): + try: + await asyncio.sleep(10) + finally: + raise KeyboardInterrupt + + async def runner(): + async with taskgroup.TaskGroup() as g: + g.create_task(crash_soon()) + await nested() + + with self.assertRaises(KeyboardInterrupt): + await runner() + + async def _test_taskgroup_21(self): + # This test doesn't work as asyncio, currently, doesn't + # know how to handle BaseExceptions. + + async def crash_soon(): + await asyncio.sleep(0.1) + raise KeyboardInterrupt + + async def nested(): + try: + await asyncio.sleep(10) + finally: + raise TypeError + + async def runner(): + async with taskgroup.TaskGroup() as g: + g.create_task(crash_soon()) + await nested() + + with self.assertRaises(KeyboardInterrupt): + await runner() + + async def test_taskgroup_22(self): + + async def foo1(): + await asyncio.sleep(1) + return 42 + + async def foo2(): + await asyncio.sleep(2) + return 11 + + async def runner(): + async with taskgroup.TaskGroup() as g: + g.create_task(foo1()) + g.create_task(foo2()) + + r = self.loop.create_task(runner()) + await asyncio.sleep(0.05) + r.cancel() + + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_23(self): + + async def do_job(delay): + await asyncio.sleep(delay) + + async with taskgroup.TaskGroup() as g: + for count in range(10): + await asyncio.sleep(0.1) + g.create_task(do_job(0.3)) + if count == 5: + self.assertLess(len(g._tasks), 5) + await asyncio.sleep(1.35) + self.assertEqual(len(g._tasks), 0) From f4953754aa507b92c4dfdf1306d57ceee25365ae Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 10 Feb 2022 15:40:36 -0800 Subject: [PATCH 02/23] Make test_taskgroups.py run and pass Two remaining issues: - [ ] _test_taskgroup_21 doesn't work (it didn't in EdgeDb either) - [ ] the test framework complains about a change to the event loop policy --- Lib/test/test_asyncio/test_taskgroups.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 712050ac974086..8c11784e18f05f 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -19,15 +19,18 @@ import asyncio -from edb.common import taskgroup -from edb.testbase import server as tb +from asyncio import taskgroups as taskgroup +import unittest class MyExc(Exception): pass -class TestTaskGroup(tb.TestCase): +class TestTaskGroup(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + self.loop = asyncio.get_event_loop() async def test_taskgroup_01(self): From a87275a694fd4e40077472b7df92070584ea4e94 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 10 Feb 2022 15:47:00 -0800 Subject: [PATCH 03/23] Rename taskgroup to taskgroups in the test code --- Lib/test/test_asyncio/test_taskgroups.py | 72 ++++++++++++------------ 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 8c11784e18f05f..dcf7bde9e31f47 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -19,7 +19,7 @@ import asyncio -from asyncio import taskgroups as taskgroup +from asyncio import taskgroups import unittest @@ -42,7 +42,7 @@ async def foo2(): await asyncio.sleep(0.2) return 11 - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: t1 = g.create_task(foo1()) t2 = g.create_task(foo2()) @@ -59,7 +59,7 @@ async def foo2(): await asyncio.sleep(0.2) return 11 - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: t1 = g.create_task(foo1()) await asyncio.sleep(0.15) t2 = g.create_task(foo2()) @@ -77,7 +77,7 @@ async def foo2(): await asyncio.sleep(0.2) return 11 - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: t1 = g.create_task(foo1()) await asyncio.sleep(0.15) # cancel t1 explicitly, i.e. everything should continue @@ -111,13 +111,13 @@ async def foo2(): async def runner(): nonlocal NUM, t2 - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: g.create_task(foo1()) t2 = g.create_task(foo2()) NUM += 10 - with self.assertRaisesRegex(taskgroup.TaskGroupError, + with self.assertRaisesRegex(taskgroups.TaskGroupError, r'1 sub errors: \(ZeroDivisionError\)'): await self.loop.create_task(runner()) @@ -147,7 +147,7 @@ async def foo2(): async def runner(): nonlocal NUM, runner_cancel - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: g.create_task(foo1()) g.create_task(foo1()) g.create_task(foo1()) @@ -163,7 +163,7 @@ async def runner(): # The 3 foo1 sub tasks can be racy when the host is busy - if the # cancellation happens in the middle, we'll see partial sub errors here with self.assertRaisesRegex( - taskgroup.TaskGroupError, + taskgroups.TaskGroupError, r'(1|2|3) sub errors: \(ZeroDivisionError\)', ): await self.loop.create_task(runner()) @@ -185,7 +185,7 @@ async def foo(): raise async def runner(): - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: for _ in range(5): g.create_task(foo()) @@ -213,7 +213,7 @@ async def foo(): async def runner(): nonlocal NUM - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: for _ in range(5): g.create_task(foo()) @@ -240,7 +240,7 @@ async def foo(): 1 / 0 async def runner(): - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: for _ in range(5): g.create_task(foo()) @@ -271,7 +271,7 @@ async def foo2(): async def runner(): nonlocal t1, t2 - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: t1 = g.create_task(foo1()) t2 = g.create_task(foo2()) await asyncio.sleep(0.1) @@ -279,7 +279,7 @@ async def runner(): try: await runner() - except taskgroup.TaskGroupError as t: + except taskgroups.TaskGroupError as t: self.assertEqual(t.get_error_types(), {ZeroDivisionError}) else: self.fail('TaskGroupError was not raised') @@ -301,14 +301,14 @@ async def foo2(): async def runner(): nonlocal t1, t2 - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: t1 = g.create_task(foo1()) t2 = g.create_task(foo2()) 1 / 0 try: await runner() - except taskgroup.TaskGroupError as t: + except taskgroups.TaskGroupError as t: self.assertEqual(t.get_error_types(), {ZeroDivisionError}) else: self.fail('TaskGroupError was not raised') @@ -323,8 +323,8 @@ async def foo(): 1 / 0 async def runner(): - async with taskgroup.TaskGroup(): - async with taskgroup.TaskGroup() as g2: + async with taskgroups.TaskGroup(): + async with taskgroups.TaskGroup() as g2: for _ in range(5): g2.create_task(foo()) @@ -348,10 +348,10 @@ async def foo(): 1 / 0 async def runner(): - async with taskgroup.TaskGroup() as g1: + async with taskgroups.TaskGroup() as g1: g1.create_task(asyncio.sleep(10)) - async with taskgroup.TaskGroup() as g2: + async with taskgroups.TaskGroup() as g2: for _ in range(5): g2.create_task(foo()) @@ -375,14 +375,14 @@ async def crash_after(t): raise ValueError(t) async def runner(): - async with taskgroup.TaskGroup(name='g1') as g1: + async with taskgroups.TaskGroup(name='g1') as g1: g1.create_task(crash_after(0.1)) - async with taskgroup.TaskGroup(name='g2') as g2: + async with taskgroups.TaskGroup(name='g2') as g2: g2.create_task(crash_after(0.2)) r = self.loop.create_task(runner()) - with self.assertRaisesRegex(taskgroup.TaskGroupError, r'1 sub errors'): + with self.assertRaisesRegex(taskgroups.TaskGroupError, r'1 sub errors'): await r async def test_taskgroup_14(self): @@ -392,14 +392,14 @@ async def crash_after(t): raise ValueError(t) async def runner(): - async with taskgroup.TaskGroup(name='g1') as g1: + async with taskgroups.TaskGroup(name='g1') as g1: g1.create_task(crash_after(0.2)) - async with taskgroup.TaskGroup(name='g2') as g2: + async with taskgroups.TaskGroup(name='g2') as g2: g2.create_task(crash_after(0.1)) r = self.loop.create_task(runner()) - with self.assertRaisesRegex(taskgroup.TaskGroupError, r'1 sub errors'): + with self.assertRaisesRegex(taskgroups.TaskGroupError, r'1 sub errors'): await r async def test_taskgroup_15(self): @@ -409,7 +409,7 @@ async def crash_soon(): 1 / 0 async def runner(): - async with taskgroup.TaskGroup(name='g1') as g1: + async with taskgroups.TaskGroup(name='g1') as g1: g1.create_task(crash_soon()) try: await asyncio.sleep(10) @@ -432,7 +432,7 @@ async def crash_soon(): 1 / 0 async def nested_runner(): - async with taskgroup.TaskGroup(name='g1') as g1: + async with taskgroups.TaskGroup(name='g1') as g1: g1.create_task(crash_soon()) try: await asyncio.sleep(10) @@ -457,7 +457,7 @@ async def test_taskgroup_17(self): async def runner(): nonlocal NUM - async with taskgroup.TaskGroup(): + async with taskgroups.TaskGroup(): try: await asyncio.sleep(10) except asyncio.CancelledError: @@ -479,7 +479,7 @@ async def test_taskgroup_18(self): async def runner(): nonlocal NUM - async with taskgroup.TaskGroup(): + async with taskgroups.TaskGroup(): try: await asyncio.sleep(10) except asyncio.CancelledError: @@ -496,7 +496,7 @@ async def runner(): try: await r - except taskgroup.TaskGroupError as t: + except taskgroups.TaskGroupError as t: self.assertEqual(t.get_error_types(), {MyExc}) else: self.fail('TaskGroupError was not raised') @@ -515,14 +515,14 @@ async def nested(): raise MyExc async def runner(): - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: g.create_task(crash_soon()) await nested() r = self.loop.create_task(runner()) try: await r - except taskgroup.TaskGroupError as t: + except taskgroups.TaskGroupError as t: self.assertEqual(t.get_error_types(), {MyExc, ZeroDivisionError}) else: self.fail('TasgGroupError was not raised') @@ -539,7 +539,7 @@ async def nested(): raise KeyboardInterrupt async def runner(): - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: g.create_task(crash_soon()) await nested() @@ -561,7 +561,7 @@ async def nested(): raise TypeError async def runner(): - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: g.create_task(crash_soon()) await nested() @@ -579,7 +579,7 @@ async def foo2(): return 11 async def runner(): - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: g.create_task(foo1()) g.create_task(foo2()) @@ -595,7 +595,7 @@ async def test_taskgroup_23(self): async def do_job(delay): await asyncio.sleep(delay) - async with taskgroup.TaskGroup() as g: + async with taskgroups.TaskGroup() as g: for count in range(10): await asyncio.sleep(0.1) g.create_task(do_job(0.3)) From 4df0accad4163881765d25bc454d24540de8e3cd Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 10 Feb 2022 15:52:41 -0800 Subject: [PATCH 04/23] Export TaskGroup from asyncio; remove __future__ import --- Lib/asyncio/__init__.py | 1 + Lib/asyncio/taskgroups.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 200b14c2a3f21e..db1124cc9bd1ee 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -17,6 +17,7 @@ from .streams import * from .subprocess import * from .tasks import * +from .taskgroups import * from .threads import * from .transports import * diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 731ce7238e0519..edc21681eec0ae 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -17,7 +17,7 @@ # -from __future__ import annotations +__all__ = ["TaskGroup"] import asyncio import itertools From 56db921d7278672cba9a6e4aac9b61a690a75fb9 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 10 Feb 2022 15:59:13 -0800 Subject: [PATCH 05/23] Only keep the newest _is_base_error() and _task_cancel() --- Lib/asyncio/taskgroups.py | 38 ++++++++++---------------------------- 1 file changed, 10 insertions(+), 28 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index edc21681eec0ae..af8417fc6ad3a4 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -21,7 +21,6 @@ import asyncio import itertools -import sys import textwrap import traceback import types @@ -180,41 +179,24 @@ def create_task(self, coro): self._tasks.add(task) return task - if sys.version_info >= (3, 8): + # In Python 3.8 Tasks propagate all exceptions correctly, + # except for KeybaordInterrupt and SystemExit which are + # still considered special. - # In Python 3.8 Tasks propagate all exceptions correctly, - # except for KeybaordInterrupt and SystemExit which are - # still considered special. - - def _is_base_error(self, exc: BaseException) -> bool: - assert isinstance(exc, BaseException) - return isinstance(exc, (SystemExit, KeyboardInterrupt)) - - else: - - # In Python prior to 3.8 all BaseExceptions are special and - # are bypassing the proper propagation through async/await - # code, essentially aborting the execution. - - def _is_base_error(self, exc: BaseException) -> bool: - assert isinstance(exc, BaseException) - return not isinstance(exc, Exception) + def _is_base_error(self, exc: BaseException) -> bool: + assert isinstance(exc, BaseException) + return isinstance(exc, (SystemExit, KeyboardInterrupt)) def _patch_task(self, task): - # In Python 3.8 we'll need proper API on asyncio.Task to + # In the future we'll need proper API on asyncio.Task to # make TaskGroups possible. We need to be able to access # information about task cancellation, more specifically, # we need a flag to say if a task was cancelled or not. # We also need to be able to flip that flag. - if sys.version_info >= (3, 9): - def _task_cancel(self, msg=None): - self.__cancel_requested__ = True - return asyncio.Task.cancel(self, msg) - else: - def _task_cancel(self): - self.__cancel_requested__ = True - return asyncio.Task.cancel(self) + def _task_cancel(self, msg=None): + self.__cancel_requested__ = True + return asyncio.Task.cancel(self, msg) if hasattr(task, '__cancel_requested__'): return From 500581eb0c788043756ff1127160e8c5827cb1d2 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 10 Feb 2022 16:35:24 -0800 Subject: [PATCH 06/23] Get rid of MultiError in favor of ExceptionGroup This required some changes to the tests since EdgeDb's MultiError has some ergonomic conveniences that ExceptionGroup doesn't: - A helper method to get the types of the exceptions - It puts the number and types of the exceptions in the message Also, in one case (test_taskgroup_14) an EG nested inside another EG was raised, whereas the original code just raised one EG. This remains to be investigated. --- Lib/asyncio/taskgroups.py | 27 ++----------------- Lib/test/test_asyncio/test_taskgroups.py | 33 +++++++++++++++--------- 2 files changed, 23 insertions(+), 37 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index af8417fc6ad3a4..d10eae6c0377e8 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -164,8 +164,7 @@ async def __aexit__(self, et, exc, tb): errors = self._errors self._errors = None - me = TaskGroupError('unhandled errors in a TaskGroup', - errors=errors) + me = TaskGroupError('unhandled errors in a TaskGroup', errors) raise me from None def create_task(self, coro): @@ -268,29 +267,7 @@ def _on_task_done(self, task): self._parent_task.cancel() -class MultiError(Exception): - - def __init__(self, msg, *args, errors=()): - if errors: - types = set(type(e).__name__ for e in errors) - msg = f'{msg}; {len(errors)} sub errors: ({", ".join(types)})' - for er in errors: - msg += f'\n + {type(er).__name__}: {er}' - if er.__traceback__: - er_tb = ''.join(traceback.format_tb(er.__traceback__)) - er_tb = textwrap.indent(er_tb, ' | ') - msg += f'\n{er_tb}\n' - super().__init__(msg, *args) - self.__errors__ = tuple(errors) - - def get_error_types(self): - return {type(e) for e in self.__errors__} - - def __reduce__(self): - return (type(self), (self.args,), {'__errors__': self.__errors__}) - - -class TaskGroupError(MultiError): +class TaskGroupError(ExceptionGroup): pass diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index dcf7bde9e31f47..fd6fec8c3b7d2a 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -27,6 +27,10 @@ class MyExc(Exception): pass +def get_error_types(eg): + return {type(exc) for exc in eg.exceptions} + + class TestTaskGroup(unittest.IsolatedAsyncioTestCase): def setUp(self): @@ -117,10 +121,11 @@ async def runner(): NUM += 10 - with self.assertRaisesRegex(taskgroups.TaskGroupError, - r'1 sub errors: \(ZeroDivisionError\)'): + with self.assertRaises(taskgroups.TaskGroupError) as cm: await self.loop.create_task(runner()) + self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) + self.assertEqual(NUM, 0) self.assertTrue(t2_cancel) self.assertTrue(t2.cancelled()) @@ -162,12 +167,10 @@ async def runner(): # The 3 foo1 sub tasks can be racy when the host is busy - if the # cancellation happens in the middle, we'll see partial sub errors here - with self.assertRaisesRegex( - taskgroups.TaskGroupError, - r'(1|2|3) sub errors: \(ZeroDivisionError\)', - ): + with self.assertRaises(taskgroups.TaskGroupError) as cm: await self.loop.create_task(runner()) + self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) self.assertEqual(NUM, 0) self.assertTrue(t2_cancel) self.assertTrue(runner_cancel) @@ -280,7 +283,7 @@ async def runner(): try: await runner() except taskgroups.TaskGroupError as t: - self.assertEqual(t.get_error_types(), {ZeroDivisionError}) + self.assertEqual(get_error_types(t), {ZeroDivisionError}) else: self.fail('TaskGroupError was not raised') @@ -309,7 +312,7 @@ async def runner(): try: await runner() except taskgroups.TaskGroupError as t: - self.assertEqual(t.get_error_types(), {ZeroDivisionError}) + self.assertEqual(get_error_types(t), {ZeroDivisionError}) else: self.fail('TaskGroupError was not raised') @@ -382,9 +385,11 @@ async def runner(): g2.create_task(crash_after(0.2)) r = self.loop.create_task(runner()) - with self.assertRaisesRegex(taskgroups.TaskGroupError, r'1 sub errors'): + with self.assertRaises(taskgroups.TaskGroupError) as cm: await r + self.assertEqual(get_error_types(cm.exception), {ValueError}) + async def test_taskgroup_14(self): async def crash_after(t): @@ -399,9 +404,13 @@ async def runner(): g2.create_task(crash_after(0.1)) r = self.loop.create_task(runner()) - with self.assertRaisesRegex(taskgroups.TaskGroupError, r'1 sub errors'): + with self.assertRaises(taskgroups.TaskGroupError) as cm: await r + # TODO(guido): Check that the nested exception group is expected + self.assertEqual(get_error_types(cm.exception), {taskgroups.TaskGroupError}) + self.assertEqual(get_error_types(cm.exception.exceptions[0]), {ValueError}) + async def test_taskgroup_15(self): async def crash_soon(): @@ -497,7 +506,7 @@ async def runner(): try: await r except taskgroups.TaskGroupError as t: - self.assertEqual(t.get_error_types(), {MyExc}) + self.assertEqual(get_error_types(t),{MyExc}) else: self.fail('TaskGroupError was not raised') @@ -523,7 +532,7 @@ async def runner(): try: await r except taskgroups.TaskGroupError as t: - self.assertEqual(t.get_error_types(), {MyExc, ZeroDivisionError}) + self.assertEqual(get_error_types(t), {MyExc, ZeroDivisionError}) else: self.fail('TasgGroupError was not raised') From 4843e94dfc00350de44e0612e704a63dcd162c97 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 10 Feb 2022 18:11:13 -0800 Subject: [PATCH 07/23] Add TaskGroupError to __all__ --- Lib/asyncio/taskgroups.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index d10eae6c0377e8..f457bbf566b16d 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -17,7 +17,7 @@ # -__all__ = ["TaskGroup"] +__all__ = ["TaskGroup", "TaskGroupError"] import asyncio import itertools From 63e712dc2d381f9b91cb5fe2644b17823671952a Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 11 Feb 2022 13:55:15 -0800 Subject: [PATCH 08/23] Avoid DeprecationWarning: There is no current event loop --- Lib/test/test_asyncio/test_taskgroups.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index fd6fec8c3b7d2a..e53e1265724b6a 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -33,7 +33,7 @@ def get_error_types(eg): class TestTaskGroup(unittest.IsolatedAsyncioTestCase): - def setUp(self): + async def asyncSetUp(self): self.loop = asyncio.get_event_loop() async def test_taskgroup_01(self): From d233dd199ecfeb36fcd230589b4f435da820d9b2 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 11 Feb 2022 14:10:09 -0800 Subject: [PATCH 09/23] Prevent warning "test altered the execution environment" --- Lib/test/test_asyncio/test_taskgroups.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index e53e1265724b6a..4b5c409090a42a 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -23,6 +23,11 @@ import unittest +# To prevent a warning "test altered the execution environment" +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + class MyExc(Exception): pass From af574d58300b8920127cca23d8f1878f5d614b3b Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 11 Feb 2022 15:30:20 -0800 Subject: [PATCH 10/23] Get rid of custom TaskGroupError Just use [Base]ExceptionGroup --- Lib/asyncio/taskgroups.py | 8 ++------ Lib/test/test_asyncio/test_taskgroups.py | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index f457bbf566b16d..9512282aff73b5 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -17,7 +17,7 @@ # -__all__ = ["TaskGroup", "TaskGroupError"] +__all__ = ["TaskGroup"] import asyncio import itertools @@ -164,7 +164,7 @@ async def __aexit__(self, et, exc, tb): errors = self._errors self._errors = None - me = TaskGroupError('unhandled errors in a TaskGroup', errors) + me = BaseExceptionGroup('unhandled errors in a TaskGroup', errors) raise me from None def create_task(self, coro): @@ -267,8 +267,4 @@ def _on_task_done(self, task): self._parent_task.cancel() -class TaskGroupError(ExceptionGroup): - pass - - _name_counter = itertools.count(1).__next__ diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 4b5c409090a42a..0b0c62958f5d03 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -126,7 +126,7 @@ async def runner(): NUM += 10 - with self.assertRaises(taskgroups.TaskGroupError) as cm: + with self.assertRaises(ExceptionGroup) as cm: await self.loop.create_task(runner()) self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) @@ -172,7 +172,7 @@ async def runner(): # The 3 foo1 sub tasks can be racy when the host is busy - if the # cancellation happens in the middle, we'll see partial sub errors here - with self.assertRaises(taskgroups.TaskGroupError) as cm: + with self.assertRaises(ExceptionGroup) as cm: await self.loop.create_task(runner()) self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) @@ -287,10 +287,10 @@ async def runner(): try: await runner() - except taskgroups.TaskGroupError as t: + except ExceptionGroup as t: self.assertEqual(get_error_types(t), {ZeroDivisionError}) else: - self.fail('TaskGroupError was not raised') + self.fail('ExceptionGroup was not raised') self.assertTrue(t1.cancelled()) self.assertTrue(t2.cancelled()) @@ -316,10 +316,10 @@ async def runner(): try: await runner() - except taskgroups.TaskGroupError as t: + except ExceptionGroup as t: self.assertEqual(get_error_types(t), {ZeroDivisionError}) else: - self.fail('TaskGroupError was not raised') + self.fail('ExceptionGroup was not raised') self.assertTrue(t1.cancelled()) self.assertTrue(t2.cancelled()) @@ -390,7 +390,7 @@ async def runner(): g2.create_task(crash_after(0.2)) r = self.loop.create_task(runner()) - with self.assertRaises(taskgroups.TaskGroupError) as cm: + with self.assertRaises(ExceptionGroup) as cm: await r self.assertEqual(get_error_types(cm.exception), {ValueError}) @@ -409,11 +409,11 @@ async def runner(): g2.create_task(crash_after(0.1)) r = self.loop.create_task(runner()) - with self.assertRaises(taskgroups.TaskGroupError) as cm: + with self.assertRaises(ExceptionGroup) as cm: await r # TODO(guido): Check that the nested exception group is expected - self.assertEqual(get_error_types(cm.exception), {taskgroups.TaskGroupError}) + self.assertEqual(get_error_types(cm.exception), {ExceptionGroup}) self.assertEqual(get_error_types(cm.exception.exceptions[0]), {ValueError}) async def test_taskgroup_15(self): @@ -510,10 +510,10 @@ async def runner(): try: await r - except taskgroups.TaskGroupError as t: + except ExceptionGroup as t: self.assertEqual(get_error_types(t),{MyExc}) else: - self.fail('TaskGroupError was not raised') + self.fail('ExceptionGroup was not raised') self.assertEqual(NUM, 10) @@ -536,7 +536,7 @@ async def runner(): r = self.loop.create_task(runner()) try: await r - except taskgroups.TaskGroupError as t: + except ExceptionGroup as t: self.assertEqual(get_error_types(t), {MyExc, ZeroDivisionError}) else: self.fail('TasgGroupError was not raised') From 299f3663dcd0c0e5174d3f29e8a03681335b40d6 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 11 Feb 2022 17:28:59 -0800 Subject: [PATCH 11/23] Update comments explaining why test 21 doesn't work --- Lib/asyncio/taskgroups.py | 4 ++-- Lib/test/test_asyncio/test_taskgroups.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 9512282aff73b5..0b7e0a412e7b2f 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -178,8 +178,8 @@ def create_task(self, coro): self._tasks.add(task) return task - # In Python 3.8 Tasks propagate all exceptions correctly, - # except for KeybaordInterrupt and SystemExit which are + # Since Python 3.8 Tasks propagate all exceptions correctly, + # except for KeyboardInterrupt and SystemExit which are # still considered special. def _is_base_error(self, exc: BaseException) -> bool: diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 0b0c62958f5d03..9d5a235bfe6d69 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -562,7 +562,11 @@ async def runner(): async def _test_taskgroup_21(self): # This test doesn't work as asyncio, currently, doesn't - # know how to handle BaseExceptions. + # correctly propagate KeyboardInterrupt (or SystemExit) -- + # those cause the event loop itself to crash. + # (Compare to the previous (passing) test -- that one raises + # a plain exception but raises KeyboardInterrupt in nested(); + # this test does it the other way around.) async def crash_soon(): await asyncio.sleep(0.1) From 9de3c878d1cba85bb5a21d93a22d2bbc50e12c2a Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Fri, 11 Feb 2022 17:43:01 -0800 Subject: [PATCH 12/23] Add tests showing that 'plain' BaseExceptions work --- Lib/test/test_asyncio/test_taskgroups.py | 49 ++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 9d5a235bfe6d69..a0c6f15647ab67 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -32,6 +32,10 @@ class MyExc(Exception): pass +class MyBaseExc(BaseException): + pass + + def get_error_types(eg): return {type(exc) for exc in eg.exceptions} @@ -560,6 +564,29 @@ async def runner(): with self.assertRaises(KeyboardInterrupt): await runner() + async def test_taskgroup_20a(self): + async def crash_soon(): + await asyncio.sleep(0.1) + 1 / 0 + + async def nested(): + try: + await asyncio.sleep(10) + finally: + raise MyBaseExc + + async def runner(): + async with taskgroups.TaskGroup() as g: + g.create_task(crash_soon()) + await nested() + + with self.assertRaises(BaseExceptionGroup) as cm: + await runner() + + self.assertEqual( + get_error_types(cm.exception), {MyBaseExc, ZeroDivisionError} + ) + async def _test_taskgroup_21(self): # This test doesn't work as asyncio, currently, doesn't # correctly propagate KeyboardInterrupt (or SystemExit) -- @@ -586,6 +613,28 @@ async def runner(): with self.assertRaises(KeyboardInterrupt): await runner() + async def test_taskgroup_21a(self): + + async def crash_soon(): + await asyncio.sleep(0.1) + raise MyBaseExc + + async def nested(): + try: + await asyncio.sleep(10) + finally: + raise TypeError + + async def runner(): + async with taskgroups.TaskGroup() as g: + g.create_task(crash_soon()) + await nested() + + with self.assertRaises(BaseExceptionGroup) as cm: + await runner() + + self.assertEqual(get_error_types(cm.exception), {MyBaseExc, TypeError}) + async def test_taskgroup_22(self): async def foo1(): From 0e1355d55509d083ec911995ef2d7837ff888017 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sat, 12 Feb 2022 09:53:42 -0800 Subject: [PATCH 13/23] Allow creating new tasks while __aexit__ is waiting We stop when there are no unfinished tasks, and then no new tasks can be created. (TODO: more thorough testing of edge cases?) --- Lib/asyncio/taskgroups.py | 6 ++-- Lib/test/test_asyncio/test_taskgroups.py | 42 ++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 0b7e0a412e7b2f..4f946eb0e49e1b 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -170,8 +170,8 @@ async def __aexit__(self, et, exc, tb): def create_task(self, coro): if not self._entered: raise RuntimeError(f"TaskGroup {self!r} has not been entered") - if self._exiting: - raise RuntimeError(f"TaskGroup {self!r} is awaiting in exit") + if self._exiting and self._unfinished_tasks == 0: + raise RuntimeError(f"TaskGroup {self!r} is finished") task = self._loop.create_task(coro) task.add_done_callback(self._on_task_done) self._unfinished_tasks += 1 @@ -217,7 +217,7 @@ def _on_task_done(self, task): self._unfinished_tasks -= 1 assert self._unfinished_tasks >= 0 - if self._exiting and not self._unfinished_tasks: + if self._on_completed_fut is not None and not self._unfinished_tasks: if not self._on_completed_fut.done(): self._on_completed_fut.set_result(True) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index a0c6f15647ab67..8a01985c064a7f 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -670,3 +670,45 @@ async def do_job(delay): self.assertLess(len(g._tasks), 5) await asyncio.sleep(1.35) self.assertEqual(len(g._tasks), 0) + + async def test_taskgroup_24(self): + + async def root(g): + await asyncio.sleep(0.1) + g.create_task(coro1(0.1)) + g.create_task(coro1(0.2)) + + async def coro1(delay): + await asyncio.sleep(delay) + + async def runner(): + async with taskgroups.TaskGroup() as g: + g.create_task(root(g)) + + await runner() + + async def test_taskgroup_25(self): + nhydras = 0 + + async def hydra(g): + nonlocal nhydras + nhydras += 1 + await asyncio.sleep(0.01) + g.create_task(hydra(g)) + g.create_task(hydra(g)) + + async def hercules(): + while nhydras < 10: + await asyncio.sleep(0.015) + 1 / 0 + + async def runner(): + async with taskgroups.TaskGroup() as g: + g.create_task(hydra(g)) + g.create_task(hercules()) + + with self.assertRaises(ExceptionGroup) as cm: + await runner() + + self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) + self.assertGreaterEqual(nhydras, 10) From 77ec0e42b3fcd8a928ab89651707d6384971add7 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Mon, 14 Feb 2022 11:12:43 -0800 Subject: [PATCH 14/23] Add an API to Task to manage 'cancel_requested' flag This means we no longer have to monkey-patch the parent task. It does introduce new semantics for task cancellation: When a task is cancelled, further attempts to cancel it have *no* effect unless the task calls self.uncancel(). Borrowed from GH-31313 by @asvetlov. --- Lib/asyncio/taskgroups.py | 25 ++----------- Lib/asyncio/tasks.py | 16 ++++++++- Modules/_asynciomodule.c | 59 +++++++++++++++++++++++++++++++ Modules/clinic/_asynciomodule.c.h | 49 ++++++++++++++++++++++++- 4 files changed, 124 insertions(+), 25 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 4f946eb0e49e1b..f2a223b57860aa 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -78,7 +78,6 @@ async def __aenter__(self): if self._parent_task is None: raise RuntimeError( f'TaskGroup {self!r} cannot determine the parent task') - self._patch_task(self._parent_task) return self @@ -95,7 +94,7 @@ async def __aexit__(self, et, exc, tb): if self._parent_cancel_requested: # Only if we did request task to cancel ourselves # we mark it as no longer cancelled. - self._parent_task.__cancel_requested__ = False + self._parent_task.uncancel() else: propagate_cancellation_error = et @@ -186,26 +185,6 @@ def _is_base_error(self, exc: BaseException) -> bool: assert isinstance(exc, BaseException) return isinstance(exc, (SystemExit, KeyboardInterrupt)) - def _patch_task(self, task): - # In the future we'll need proper API on asyncio.Task to - # make TaskGroups possible. We need to be able to access - # information about task cancellation, more specifically, - # we need a flag to say if a task was cancelled or not. - # We also need to be able to flip that flag. - - def _task_cancel(self, msg=None): - self.__cancel_requested__ = True - return asyncio.Task.cancel(self, msg) - - if hasattr(task, '__cancel_requested__'): - return - - task.__cancel_requested__ = False - # confirm that we were successful at adding the new attribute: - assert not task.__cancel_requested__ - - task.cancel = types.MethodType(_task_cancel, task) - def _abort(self): self._aborting = True @@ -244,7 +223,7 @@ def _on_task_done(self, task): return self._abort() - if not self._parent_task.__cancel_requested__: + if not self._parent_task.cancelling(): # If parent task *is not* being cancelled, it means that we want # to manually cancel it to abort whatever is being run right now # in the TaskGroup. But we want to mark parent task as diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 2bee5c050ded7d..c11d0daaefea7e 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -105,6 +105,7 @@ def __init__(self, coro, *, loop=None, name=None): else: self._name = str(name) + self._cancel_requested = False self._must_cancel = False self._fut_waiter = None self._coro = coro @@ -201,6 +202,9 @@ def cancel(self, msg=None): self._log_traceback = False if self.done(): return False + if self._cancel_requested: + return False + self._cancel_requested = True if self._fut_waiter is not None: if self._fut_waiter.cancel(msg=msg): # Leave self._fut_waiter; it may be a Task that @@ -212,6 +216,16 @@ def cancel(self, msg=None): self._cancel_message = msg return True + def cancelling(self): + return self._cancel_requested + + def uncancel(self): + if self._cancel_requested: + self._cancel_requested = False + return True + else: + return False + def __step(self, exc=None): if self.done(): raise exceptions.InvalidStateError( @@ -634,7 +648,7 @@ def _ensure_future(coro_or_future, *, loop=None): loop = events._get_event_loop(stacklevel=4) try: return loop.create_task(coro_or_future) - except RuntimeError: + except RuntimeError: if not called_wrap_awaitable: coro_or_future.close() raise diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 2216dd0178173a..adde627803ed2a 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -90,6 +90,7 @@ typedef struct { PyObject *task_context; int task_must_cancel; int task_log_destroy_pending; + int task_cancel_requested; } TaskObj; typedef struct { @@ -2038,6 +2039,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, Py_CLEAR(self->task_fut_waiter); self->task_must_cancel = 0; self->task_log_destroy_pending = 1; + self->task_cancel_requested = 0; Py_INCREF(coro); Py_XSETREF(self->task_coro, coro); @@ -2204,6 +2206,11 @@ _asyncio_Task_cancel_impl(TaskObj *self, PyObject *msg) Py_RETURN_FALSE; } + if (self->task_cancel_requested) { + Py_RETURN_FALSE; + } + self->task_cancel_requested = 1; + if (self->task_fut_waiter) { PyObject *res; int is_true; @@ -2231,6 +2238,56 @@ _asyncio_Task_cancel_impl(TaskObj *self, PyObject *msg) Py_RETURN_TRUE; } +/*[clinic input] +_asyncio.Task.cancelling + +Return True if the task is in the process of being cancelled. + +This is set once .cancel() is called +and remains set until .uncancel() is called. + +As long as this flag is set, further .cancel() calls will be ignored, +until .uncancel() is called to reset it. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_cancelling_impl(TaskObj *self) +/*[clinic end generated code: output=803b3af96f917d7e input=c50e50f9c3ca4676]*/ +/*[clinic end generated code]*/ +{ + if (self->task_cancel_requested) { + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + +/*[clinic input] +_asyncio.Task.uncancel + +Reset the flag returned by cancelling(). + +This should be used by tasks that catch CancelledError +and wish to continue indefinitely until they are cancelled again. + +Returns the previous value of the flag. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_uncancel_impl(TaskObj *self) +/*[clinic end generated code: output=58184d236a817d3c input=5db95e28fcb6f7cd]*/ +/*[clinic end generated code]*/ +{ + if (self->task_cancel_requested) { + self->task_cancel_requested = 0; + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + /*[clinic input] _asyncio.Task.get_stack @@ -2454,6 +2511,8 @@ static PyMethodDef TaskType_methods[] = { _ASYNCIO_TASK_SET_RESULT_METHODDEF _ASYNCIO_TASK_SET_EXCEPTION_METHODDEF _ASYNCIO_TASK_CANCEL_METHODDEF + _ASYNCIO_TASK_CANCELLING_METHODDEF + _ASYNCIO_TASK_UNCANCEL_METHODDEF _ASYNCIO_TASK_GET_STACK_METHODDEF _ASYNCIO_TASK_PRINT_STACK_METHODDEF _ASYNCIO_TASK__MAKE_CANCELLED_ERROR_METHODDEF diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index c472e652fb7c56..5648e14f337f7f 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -447,6 +447,53 @@ _asyncio_Task_cancel(TaskObj *self, PyObject *const *args, Py_ssize_t nargs, PyO return return_value; } +PyDoc_STRVAR(_asyncio_Task_cancelling__doc__, +"cancelling($self, /)\n" +"--\n" +"\n" +"Return True if the task is in the process of being cancelled.\n" +"\n" +"This is set once .cancel() is called\n" +"and remains set until .uncancel() is called.\n" +"\n" +"As long as this flag is set, further .cancel() calls will be ignored,\n" +"until .uncancel() is called to reset it."); + +#define _ASYNCIO_TASK_CANCELLING_METHODDEF \ + {"cancelling", (PyCFunction)_asyncio_Task_cancelling, METH_NOARGS, _asyncio_Task_cancelling__doc__}, + +static PyObject * +_asyncio_Task_cancelling_impl(TaskObj *self); + +static PyObject * +_asyncio_Task_cancelling(TaskObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Task_cancelling_impl(self); +} + +PyDoc_STRVAR(_asyncio_Task_uncancel__doc__, +"uncancel($self, /)\n" +"--\n" +"\n" +"Reset the flag returned by cancelling().\n" +"\n" +"This should be used by tasks that catch CancelledError\n" +"and wish to continue indefinitely until they are cancelled again.\n" +"\n" +"Returns the previous value of the flag."); + +#define _ASYNCIO_TASK_UNCANCEL_METHODDEF \ + {"uncancel", (PyCFunction)_asyncio_Task_uncancel, METH_NOARGS, _asyncio_Task_uncancel__doc__}, + +static PyObject * +_asyncio_Task_uncancel_impl(TaskObj *self); + +static PyObject * +_asyncio_Task_uncancel(TaskObj *self, PyObject *Py_UNUSED(ignored)) +{ + return _asyncio_Task_uncancel_impl(self); +} + PyDoc_STRVAR(_asyncio_Task_get_stack__doc__, "get_stack($self, /, *, limit=None)\n" "--\n" @@ -871,4 +918,4 @@ _asyncio__leave_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, exit: return return_value; } -/*[clinic end generated code: output=0d127162ac92e0c0 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=c02708a9d6a774cc input=a9049054013a1b77]*/ From 17b64b54ad9dd929c2ae9250a05354d0c289e949 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Mon, 14 Feb 2022 11:31:57 -0800 Subject: [PATCH 15/23] Add tests for .cancelling() and .uncancel() --- Lib/test/test_asyncio/test_tasks.py | 41 +++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 8c4dceacdeec96..6948e3fee66ea7 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -496,6 +496,47 @@ async def run(): # This also distinguishes from the initial has_cycle=None. self.assertEqual(has_cycle, False) + + def test_cancelling(self): + loop = asyncio.new_event_loop() + + async def task(): + await asyncio.sleep(10) + + try: + t = self.new_task(loop, task()) + self.assertFalse(t.cancelling()) + self.assertTrue(t.cancel()) + self.assertTrue(t.cancelling()) + self.assertFalse(t.cancel()) + + with self.assertRaises(asyncio.CancelledError): + loop.run_until_complete(t) + finally: + loop.close() + + def test_uncancel(self): + loop = asyncio.new_event_loop() + + async def task(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + asyncio.current_task().uncancel() + await asyncio.sleep(10) + + try: + t = self.new_task(loop, task()) + loop.run_until_complete(asyncio.sleep(0.01)) + self.assertTrue(t.cancel()) # Cancel first sleep + loop.run_until_complete(asyncio.sleep(0.01)) + self.assertTrue(t.cancel()) # Cancel second sleep + + with self.assertRaises(asyncio.CancelledError): + loop.run_until_complete(t) + finally: + loop.close() + def test_cancel(self): def gen(): From 0b9bccd71d9b3dc0e05850e7792fa42933696b11 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Mon, 14 Feb 2022 21:21:52 +0000 Subject: [PATCH 16/23] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2022-02-14-21-21-49.bpo-46752.m6ldTm.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2022-02-14-21-21-49.bpo-46752.m6ldTm.rst diff --git a/Misc/NEWS.d/next/Library/2022-02-14-21-21-49.bpo-46752.m6ldTm.rst b/Misc/NEWS.d/next/Library/2022-02-14-21-21-49.bpo-46752.m6ldTm.rst new file mode 100644 index 00000000000000..f460600c8c9dde --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-02-14-21-21-49.bpo-46752.m6ldTm.rst @@ -0,0 +1,2 @@ +Add task groups to asyncio (structured concurrency, inspired by Trio's nurseries). +This also introduces a change to task cancellation, where a cancelled task can't be cancelled again until it calls .uncancel(). From 137ebe6de92a3c7995c434e01f0fe501ed441123 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Mon, 14 Feb 2022 20:28:07 -0800 Subject: [PATCH 17/23] Replace EdgeDb copyright with a simpler attribution Yury, the author, re-licenses the code under PSFL, and since he's signed Python's contributor agreement he has no problem assigning his copyright on the implementation and tests to the PSF. --- Lib/asyncio/taskgroups.py | 18 +----------------- Lib/test/test_asyncio/test_taskgroups.py | 18 +----------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index f2a223b57860aa..521fe5e5137c35 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -1,20 +1,4 @@ -# -# This source file is part of the EdgeDB open source project. -# -# Copyright 2016-present MagicStack Inc. and the EdgeDB authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# +# Adapted with permission from the EdgeDb project. __all__ = ["TaskGroup"] diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 8a01985c064a7f..82728b33eb7904 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -1,20 +1,4 @@ -# -# This source file is part of the EdgeDB open source project. -# -# Copyright 2016-present MagicStack Inc. and the EdgeDB authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# +# Adapted with permission from the EdgeDb project. import asyncio From f693c1c989ad732aed0e742a6171c8ef2448068b Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 15 Feb 2022 15:57:52 +0200 Subject: [PATCH 18/23] Use task.cancelling() in task repr instead of access to private attributes --- Lib/asyncio/base_tasks.py | 2 +- Lib/test/test_asyncio/test_tasks.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/base_tasks.py b/Lib/asyncio/base_tasks.py index 09bb171a2ce750..1d623899f69a9d 100644 --- a/Lib/asyncio/base_tasks.py +++ b/Lib/asyncio/base_tasks.py @@ -8,7 +8,7 @@ def _task_repr_info(task): info = base_futures._future_repr_info(task) - if task._must_cancel: + if task.cancelling() and not task.done(): # replace status info[0] = 'cancelling' diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 6948e3fee66ea7..fe6bfb363f1c67 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -506,8 +506,10 @@ async def task(): try: t = self.new_task(loop, task()) self.assertFalse(t.cancelling()) + self.assertNotIn(" cancelling ", repr(t)) self.assertTrue(t.cancel()) self.assertTrue(t.cancelling()) + self.assertIn(" cancelling ", repr(t)) self.assertFalse(t.cancel()) with self.assertRaises(asyncio.CancelledError): @@ -529,7 +531,9 @@ async def task(): t = self.new_task(loop, task()) loop.run_until_complete(asyncio.sleep(0.01)) self.assertTrue(t.cancel()) # Cancel first sleep + self.assertIn(" cancelling ", repr(t)) loop.run_until_complete(asyncio.sleep(0.01)) + self.assertNotIn(" cancelling ", repr(t)) # after .uncancel() self.assertTrue(t.cancel()) # Cancel second sleep with self.assertRaises(asyncio.CancelledError): From b83734cf1efec6cce890c3f3ae65892e1612bb36 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Tue, 15 Feb 2022 08:56:27 -0800 Subject: [PATCH 19/23] Change the internal imports --- Lib/asyncio/taskgroups.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 521fe5e5137c35..79eee8add90cf9 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -3,13 +3,15 @@ __all__ = ["TaskGroup"] -import asyncio import itertools import textwrap import traceback import types import weakref +from . import events +from . import exceptions +from . import tasks class TaskGroup: @@ -56,9 +58,9 @@ async def __aenter__(self): self._entered = True if self._loop is None: - self._loop = asyncio.get_running_loop() + self._loop = events.get_running_loop() - self._parent_task = asyncio.current_task(self._loop) + self._parent_task = tasks.current_task(self._loop) if self._parent_task is None: raise RuntimeError( f'TaskGroup {self!r} cannot determine the parent task') @@ -74,7 +76,7 @@ async def __aexit__(self, et, exc, tb): self._base_error is None): self._base_error = exc - if et is asyncio.CancelledError: + if et is exceptions.CancelledError: if self._parent_cancel_requested: # Only if we did request task to cancel ourselves # we mark it as no longer cancelled. @@ -89,7 +91,7 @@ async def __aexit__(self, et, exc, tb): # g.create_task(...) # await ... # <- CancelledError # - if et is asyncio.CancelledError: + if et is exceptions.CancelledError: propagate_cancellation_error = et # or there's an exception in "async with": @@ -110,7 +112,7 @@ async def __aexit__(self, et, exc, tb): try: await self._on_completed_fut - except asyncio.CancelledError as ex: + except exceptions.CancelledError as ex: if not self._aborting: # Our parent task is being cancelled: # @@ -137,7 +139,7 @@ async def __aexit__(self, et, exc, tb): # request now. raise propagate_cancellation_error - if et is not None and et is not asyncio.CancelledError: + if et is not None and et is not exceptions.CancelledError: self._errors.append(exc) if self._errors: From de3d82012d3d97bed8ea50e7081404eb51ae8923 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Tue, 15 Feb 2022 09:00:24 -0800 Subject: [PATCH 20/23] Avoid needing self.loop in test --- Lib/test/test_asyncio/test_taskgroups.py | 35 +++++++++++------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 82728b33eb7904..9e0e5be88eb637 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -26,9 +26,6 @@ def get_error_types(eg): class TestTaskGroup(unittest.IsolatedAsyncioTestCase): - async def asyncSetUp(self): - self.loop = asyncio.get_event_loop() - async def test_taskgroup_01(self): async def foo1(): @@ -115,7 +112,7 @@ async def runner(): NUM += 10 with self.assertRaises(ExceptionGroup) as cm: - await self.loop.create_task(runner()) + await asyncio.create_task(runner()) self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) @@ -161,7 +158,7 @@ async def runner(): # The 3 foo1 sub tasks can be racy when the host is busy - if the # cancellation happens in the middle, we'll see partial sub errors here with self.assertRaises(ExceptionGroup) as cm: - await self.loop.create_task(runner()) + await asyncio.create_task(runner()) self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) self.assertEqual(NUM, 0) @@ -185,7 +182,7 @@ async def runner(): for _ in range(5): g.create_task(foo()) - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -219,7 +216,7 @@ async def runner(): NUM += 10 raise - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -245,7 +242,7 @@ async def runner(): except asyncio.CancelledError: raise - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -329,7 +326,7 @@ async def runner(): except asyncio.CancelledError: raise - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -356,7 +353,7 @@ async def runner(): except asyncio.CancelledError: raise - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -377,7 +374,7 @@ async def runner(): async with taskgroups.TaskGroup(name='g2') as g2: g2.create_task(crash_after(0.2)) - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) with self.assertRaises(ExceptionGroup) as cm: await r @@ -396,7 +393,7 @@ async def runner(): async with taskgroups.TaskGroup(name='g2') as g2: g2.create_task(crash_after(0.1)) - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) with self.assertRaises(ExceptionGroup) as cm: await r @@ -419,7 +416,7 @@ async def runner(): await asyncio.sleep(0.5) raise - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -443,10 +440,10 @@ async def nested_runner(): raise async def runner(): - t = self.loop.create_task(nested_runner()) + t = asyncio.create_task(nested_runner()) await t - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -466,7 +463,7 @@ async def runner(): NUM += 10 raise - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -490,7 +487,7 @@ async def runner(): # this weird case. raise MyExc - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.1) self.assertFalse(r.done()) @@ -521,7 +518,7 @@ async def runner(): g.create_task(crash_soon()) await nested() - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) try: await r except ExceptionGroup as t: @@ -634,7 +631,7 @@ async def runner(): g.create_task(foo1()) g.create_task(foo2()) - r = self.loop.create_task(runner()) + r = asyncio.create_task(runner()) await asyncio.sleep(0.05) r.cancel() From 9712241fed0c203fc6a1612a0040b68fb7779481 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Tue, 15 Feb 2022 11:45:05 -0800 Subject: [PATCH 21/23] Make test 14 more robust (By making the sleep in the outer task longer.) --- Lib/test/test_asyncio/test_taskgroups.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 9e0e5be88eb637..7cefabb1bad55f 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -388,7 +388,7 @@ async def crash_after(t): async def runner(): async with taskgroups.TaskGroup(name='g1') as g1: - g1.create_task(crash_after(0.2)) + g1.create_task(crash_after(10)) async with taskgroups.TaskGroup(name='g2') as g2: g2.create_task(crash_after(0.1)) @@ -397,7 +397,6 @@ async def runner(): with self.assertRaises(ExceptionGroup) as cm: await r - # TODO(guido): Check that the nested exception group is expected self.assertEqual(get_error_types(cm.exception), {ExceptionGroup}) self.assertEqual(get_error_types(cm.exception.exceptions[0]), {ValueError}) From b3d4d18f04780a88193257d3f62f7f9e284d91a4 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 15 Feb 2022 14:58:54 -0800 Subject: [PATCH 22/23] Update Lib/asyncio/taskgroups.py --- Lib/asyncio/taskgroups.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 79eee8add90cf9..718277892c51c9 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -1,4 +1,4 @@ -# Adapted with permission from the EdgeDb project. +# Adapted with permission from the EdgeDB project. __all__ = ["TaskGroup"] From c1e5d64a27325f3c08668aa5d255038b991bd30e Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Tue, 15 Feb 2022 14:59:05 -0800 Subject: [PATCH 23/23] Update Lib/test/test_asyncio/test_taskgroups.py --- Lib/test/test_asyncio/test_taskgroups.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py index 7cefabb1bad55f..ea6ee2ed43d2f8 100644 --- a/Lib/test/test_asyncio/test_taskgroups.py +++ b/Lib/test/test_asyncio/test_taskgroups.py @@ -1,4 +1,4 @@ -# Adapted with permission from the EdgeDb project. +# Adapted with permission from the EdgeDB project. import asyncio