Skip to content

Commit

Permalink
Rework cancel_remaining() and expose blocking argument.
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Booth committed Mar 9, 2021
1 parent 1c69eca commit e3cb386
Showing 1 changed file with 14 additions and 17 deletions.
31 changes: 14 additions & 17 deletions aiorpcx/curio.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,33 +252,30 @@ async def join(self):
finally:
# Cancel everything but don't wait as cancellation can be ignored and our
# exception could be e.g. a timeout.
await self._cancel_remaining(wait=False)
await self.cancel_remaining(blocking=False)
self._closed = True

async def _cancel_remaining(self, wait):
async def cancel_remaining(self, blocking=True):
'''Cancel all remaining tasks including daemons. Wait for them to complete if wait is
True.
'''
def pop_task(task):
unfinished.remove(task)
if not unfinished:
all_done.set()

unfinished = self._pending.copy()
unfinished.update(self.daemons)
unfinished = self._pending.union(self.daemons)
for task in unfinished:
task.cancel()
# Let the loop process the cancellations so the tasks are marked cancelled
await sleep(0)
if wait and unfinished:
all_done = Event()

if blocking and unfinished:
def pop_task(task):
unfinished.remove(task)
if not unfinished:
all_done.set()

for task in unfinished:
task.add_done_callback(pop_task)
all_done = Event()
await all_done.wait()

async def cancel_remaining(self):
'''Cancel all remaining tasks including daemons.'''
await self._cancel_remaining(wait=True)
else:
# So loop processes cancellations
await sleep(0)

def closed(self):
return self._closed
Expand Down

0 comments on commit e3cb386

Please sign in to comment.