Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-111693: Propagate correct asyncio.CancelledError instance out of asyncio.Condition.wait() #111694

Merged
merged 14 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions Lib/asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ def _make_cancelled_error(self):
exc = exceptions.CancelledError()
else:
exc = exceptions.CancelledError(self._cancel_message)
exc.__context__ = self._cancelled_exc
# Remove the reference since we don't need this anymore.
self._cancelled_exc = None
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
return exc

def cancel(self, msg=None):
Expand Down
49 changes: 26 additions & 23 deletions Lib/asyncio/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ async def acquire(self):
This method blocks until the lock is unlocked, then sets it to
locked and returns True.
"""
# Implement fair scheduling, where thread always waits
# its turn.
# Jumping the queue if all are cancelled is an optimization.
if (not self._locked and (self._waiters is None or
all(w.cancelled() for w in self._waiters))):
self._locked = True
Expand All @@ -105,19 +108,16 @@ async def acquire(self):
fut = self._get_loop().create_future()
self._waiters.append(fut)

# Finally block should be called before the CancelledError
# handling as we don't want CancelledError to call
# _wake_up_first() and attempt to wake up itself.
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
try:
try:
await fut
finally:
self._waiters.remove(fut)
except exceptions.CancelledError:
await fut
except BaseException:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not very comfortable with widening this exception net for the purpose of sneaking in support for your library. If we want that to be supported we should make it an explicit feature, everywhere, rather than just tweaking an except clause here and there until your tests pass. Without any comments explaining the intended guarantee, what's to stop the next clever maintainer from narrowing the exception being caught to CancelledError again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I quite understand that you are reluctant to touch this just to humour an eccentric experimental library which is trying to push the envelope of the original design. And I'm happy to have this rejected as long as we have had the chance to discuss it. I'll add a comment here in the mean time, just for safety.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you're planning to make your library to use a subclass of CancelledError, so you don't need this to be more general anyway. So I'd like to see CancelledError here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Of course, IMO, it would be even greater if in some future version, we would have something like:

class InterruptError(BaseException):
    pass

class CancelledError(InterruptError):
   pass

but that is future music :)

self._waiters.remove(fut)
if not self._locked:
# Error occurred after release() was called, must re-do release.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment makes things more mysterious to me, not less. If we're precise, we get to this exception handles whenever await fut raised something -- that can currently only be a cancellation, unless something reached into private data, extracted the future, and called set_exception() on it. (Apparently this is what you're planning to do in your own library?)

If I had to explain in plain English what was going on here, I'd have to say something like "The acquiring task was cancelled after the lock was released, but before it resumed execution. (This is one of these things that's easy to forget is possible when casually reading async code.) We're going to re-raise the cancellation here, but we should first wake up the next task waiting for the lock, if there are any." I guess you summarized the condition as "after release() was called", which I think is too concise.

Thinking all this over, I am also thinking there's an incredibly subtle invariant going on here regarding self._waiters. Skimming the code of release() and _wake_up_first() one starts wondering, "what if two or more waiters are cancelled, the first waiter wakes up (being cancelled), and its _wake_up_first() sees that the second waiter is also cancelled (and hence done), so it doesn't wake up the third waiter." My guess is that in this case the second waiter will also wake up and call _wake_up_first(), at which point the (originally) third waiter will be woken up. But maybe this deserves a reassuring comment somewhere?

Maybe there's a way of thinking about this that makes it easier to prove to oneself that the code here is correct, without such a "global" analysis? Maybe the invariant is something like "every cancelled future in the list corresponds to a task that will eventually be woken up, at which point it will remove itself from the list and call _wake_up_first(). Although that immediately makes me wonder if it is possible for the second waiter to be woken up first. Fortunately in that case things also work out, once the first waiter is also woken up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your thoughts. Yes, the comment was too concise and your version is better, but thinking more about it, it is not really true either. This here is not a case of "cancelled after lock was released". this is because release() does not do any modification of the _waiters queue itself. All it does is

  1. set self._locked=False
  2. Ensure that the Task owning the future at the head of the queue will wake up.

I go over the self._waiters in a comment below. Here is how I follow the logic:

  1. _wake_up_first() always wakes up the same Task, at the head of the queue.
  2. Only when a task resumes execution, does it remove itself from the queue, opening the possbility for a different Task to run. When it resumes execution, it either claims the lock for itself, or gives the task at the new head of the queue the chance to claim it.
  3. When a task isn't about to claim the lock, and self._locked() is False, it is always safe to call _wake_up_first(), even multiple times.

Looking at the case you mention:

  1. two waiters are cancelled. They still are in the self._waiters queue.
  2. first one wakes up. Removes itself from self._waiters. Decides it doesn't want the lock, sees that self._locked is False (no one else has it), and so calls `self._wake_up_first(). This is a no-op, since the future has already been cancelled.
  3. Second one wakes up. Removes itself from self._waiters. Decides it doesn't want the lock, seelf that self._locked is False, calls self._wake_up_first.
  4. If there was a Task at the head, then it is now awoken.

Not sure how to formulate that into an invariant, but something like this:

if not self._locked and not claiming_lock, and self._waiters:
  # ensure that the head of the queue will run
  head = self.waiters[0]
  head.set_result(True)

In other words, if the lock is not claimed, then the head of the queue, if any, should no longer be blocked.

self._wake_up_first()
raise

self._waiters.remove(fut)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I look at this more carefully, I think I like the original version using nested try blocks better. Reading the new code I have to correlate the two separate removals to prove to myself that as soon as we've awaited fut (whether it got a return value or was cancelled) it is removed from the list of waiters. And that's why it was written like that originally.

(Note that try blocks cause no overhead unless an exception is actually raised -- the compiler even duplicates the finally block to ensure this.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I guess it makes it simpler to reason about, good to know that nested blocks have no overhead anymore.

self._locked = True
return True

Expand Down Expand Up @@ -269,17 +269,22 @@ async def wait(self):
self._waiters.remove(fut)

finally:
# Must reacquire lock even if wait is cancelled
cancelled = False
# Must reacquire lock even if wait is cancelled.
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
# We only catch CancelledError here, since we don't want any
# other (fatal) errors with the future to cause us to spin.
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
err = None
while True:
try:
await self.acquire()
break
except exceptions.CancelledError:
cancelled = True
except exceptions.CancelledError as e:
err = e

if cancelled:
raise exceptions.CancelledError
if err:
try:
raise err # re-raise same exception instance
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved
finally:
err = None # brake reference cycles
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved

async def wait_for(self, predicate):
"""Wait until a predicate becomes true.
Expand Down Expand Up @@ -378,20 +383,18 @@ async def acquire(self):
fut = self._get_loop().create_future()
self._waiters.append(fut)

# Finally block should be called before the CancelledError
# handling as we don't want CancelledError to call
# _wake_up_first() and attempt to wake up itself.
try:
try:
await fut
finally:
self._waiters.remove(fut)
except exceptions.CancelledError:
if not fut.cancelled():
await fut
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as before -- I actually like the nested try/finally phrasing better. Also, again, sneaky about catching all errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, especially if it costs nothing.

except BaseException:
self._waiters.remove(fut)
if fut.done() and not fut.cancelled():
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
# Error occurred after release() was called for us. Must undo
# the bookkeeping done there and retry.
self._value += 1
self._wake_up_next()
raise

self._waiters.remove(fut)
if self._value > 0:
self._wake_up_next()
return True
Expand Down
57 changes: 57 additions & 0 deletions Lib/test/test_asyncio/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,63 @@ async def test_timeout_in_block(self):
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(condition.wait(), timeout=0.5)

async def test_cancelled_error_wakeup(self):
"""Test that a cancelled error, received when awaiting wakeup
will be re-raised un-modified.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please convert the docstrings on the tests to comments -- test docstrings tend to be printed by the test runner, messing up the neat output. Note how no other tests here have docstrings.

wake = False
raised = None
cond = asyncio.Condition()

async def func():
nonlocal raised
async with cond:
with self.assertRaises(asyncio.CancelledError) as err:
await cond.wait_for(lambda: wake)
raised = err.exception
raise raised

task = asyncio.create_task(func())
await asyncio.sleep(0)
# Task is waiting on the condition, cancel it there
task.cancel(msg="foo")
with self.assertRaises(asyncio.CancelledError) as err:
await task
self.assertEqual(err.exception.args, ("foo",))
# we should have got the _same_ exception instance as the one originally raised
self.assertIs(err.exception, raised)

async def test_cancelled_error_re_aquire(self):
"""Test that a cancelled error, received when re-aquiring lock,
will be re-raised un-modified.
"""
wake = False
raised = None
cond = asyncio.Condition()

async def func():
nonlocal raised
async with cond:
with self.assertRaises(asyncio.CancelledError) as err:
await cond.wait_for(lambda: wake)
raised = err.exception
raise raised

task = asyncio.create_task(func())
await asyncio.sleep(0)
# Task is waiting on the condition
await cond.acquire()
wake = True
cond.notify()
await asyncio.sleep(0)
# task is now trying to re-acquire the lock, cancel it there
task.cancel(msg="foo")
cond.release()
with self.assertRaises(asyncio.CancelledError) as err:
await task
self.assertEqual(err.exception.args, ("foo",))
# we should have got the _same_ exception instance as the one originally raised
self.assertIs(err.exception, raised)

class SemaphoreTests(unittest.IsolatedAsyncioTestCase):

Expand Down