Skip to content

Commit

Permalink
TaskGroup.join(): don't wait for tasks to cancel if exiting by exception
Browse files Browse the repository at this point in the history
Add testcase from Issue 37 for cancel_remaining().  It used to happen
that the task waited on itself, which makes asyncio lose its mind.

Fixes #37.
  • Loading branch information
Neil Booth committed Mar 9, 2021
1 parent 85cf61e commit 0437440
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 10 deletions.
36 changes: 26 additions & 10 deletions aiorpcx/curio.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
CancelledError, get_event_loop, Queue, Event, Lock, Semaphore, sleep, current_task
)
from collections import deque
from contextlib import suppress

from aiorpcx.util import instantiate_coroutine

Expand Down Expand Up @@ -251,18 +250,35 @@ async def join(self):
and self.completed)):
return
finally:
await self.cancel_remaining()
# Cancel everything but don't wait as cancellation can be ignored and our
# exception could be e.g. a timeout.
await self._cancel_remaining(wait=False)
self._closed = True

async def _cancel_remaining(self, wait):
'''Cancel all remaining tasks including daemons. Wait for them to complete if wait is
True.
'''
def pop_task(task):
unfinished.remove(task)
if not unfinished:
all_done.set()

unfinished = self._pending.copy()
unfinished.update(self.daemons)
for task in unfinished:
task.cancel()
# Let the loop process the cancellations so the tasks are marked cancelled
await sleep(0)
if wait and unfinished:
all_done = Event()
for task in unfinished:
task.add_done_callback(pop_task)
await all_done.wait()

async def cancel_remaining(self):
'''Cancel all remaining tasks including daemons.'''
self._closed = True
task_list = list(self._pending)
task_list.extend(self.daemons)
for task in task_list:
task.cancel()
for task in task_list:
with suppress(BaseException):
await task
await self._cancel_remaining(wait=True)

def closed(self):
return self._closed
Expand Down
1 change: 1 addition & 0 deletions pycodestyle
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pycodestyle --max-line-length=100 aiorpcx/*.py
54 changes: 54 additions & 0 deletions tests/test_curio.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,60 @@ async def test_daemon_task_errors_ignored():
assert g.exception is None


@pytest.mark.asyncio
async def test_timeout_on_join_with_stubborn_task():
evt = Event()

async def ignore_cancellation():
while True:
try:
await evt.wait()
break
except CancelledError as e:
pass


async with ignore_after(0.05):
async with TaskGroup() as g:
t = await g.spawn(ignore_cancellation)
# Clean teardown
evt.set()


# See https://github.com/kyuupichan/aiorpcX/issues/37
@pytest.mark.asyncio
async def test_cancel_remaining_on_group_with_stubborn_task():
evt = Event()

async def run_forever():
while True:
try:
await evt.wait()
break
except CancelledError as e:
pass

async def run_group():
async with group:
await group.spawn(run_forever)

from asyncio import create_task

group = TaskGroup()
create_task(run_group())
await sleep(0.01)

try:
async with timeout_after(0.01):
await group.cancel_remaining()
except TaskTimeout:
pass

# Clean teardown
evt.set()
await sleep(0.001)


def test_TaskTimeout_str():
t = TaskTimeout(0.5)
assert str(t) == 'task timed out after 0.5s'

0 comments on commit 0437440

Please sign in to comment.