Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-112202: Ensure that condition.notify() succeeds even when racing with Task.cancel() #112201

Merged
merged 8 commits into from
Feb 3, 2024
10 changes: 8 additions & 2 deletions Doc/library/asyncio-sync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ Condition

.. method:: notify(n=1)

Wake up at most *n* tasks (1 by default) waiting on this
Wake up *n* tasks (1 by default) waiting on this
condition. The method is no-op if no tasks are waiting.
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved

The lock must be acquired before this method is called and
Expand Down Expand Up @@ -257,12 +257,18 @@ Condition
Once awakened, the Condition re-acquires its lock and this method
returns ``True``.

Note that a task *may* return from this call without having been
explicity awoken, which is why the caller should always re-check the state
and be prepared to :meth:`wait` again. For this reason, you may
prefer to use :meth:`wait_for` instead.
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved

.. coroutinemethod:: wait_for(predicate)

Wait until a predicate becomes *true*.

The predicate must be a callable which result will be
interpreted as a boolean value. The final value is the
interpreted as a boolean value. The method will repeatedly
:meth:`wait` until the predicate evaluates to *true*. The final value is the
return value.


Expand Down
64 changes: 40 additions & 24 deletions Lib/asyncio/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,43 +257,57 @@ async def wait(self):
until it is awakened by a notify() or notify_all() call for
the same condition variable in another coroutine. Once
awakened, it re-acquires the lock and returns True.

This method may return without having been explicitly awoken by
a notify() or notify_all(), which is why the caller should always
re-check the state and be prepared to wait() again.
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved
"""
if not self.locked():
raise RuntimeError('cannot wait on un-acquired lock')

fut = self._get_loop().create_future()
self.release()
try:
fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
return True
finally:
self._waiters.remove(fut)

finally:
# Must re-acquire lock even if wait is cancelled.
# We only catch CancelledError here, since we don't want any
# other (fatal) errors with the future to cause us to spin.
err = None
while True:
try:
await self.acquire()
break
except exceptions.CancelledError as e:
err = e

if err:
self._waiters.append(fut)
try:
raise err # Re-raise most recent exception instance.
await fut
return True
finally:
err = None # Break reference cycles.
self._waiters.remove(fut)

finally:
# Must re-acquire lock even if wait is cancelled.
# We only catch CancelledError here, since we don't want any
# other (fatal) errors with the future to cause us to spin.
err = None
while True:
try:
await self.acquire()
break
except exceptions.CancelledError as e:
err = e

if err:
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved
try:
raise err # Re-raise most recent exception instance.
finally:
err = None # Break reference cycles.
except BaseException:
# Any error raised out of here _may_ have occurred after this Task
# believed to have been successfully notified.
# Make sure to notify another Task instead. This may result
# in a "spurious wakeup", which is allowed as part of the
# Condition Variable protocol.
self._notify(1)
raise

async def wait_for(self, predicate):
"""Wait until a predicate becomes true.

The predicate should be a callable which result will be
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved
interpreted as a boolean value. The final predicate value is
interpreted as a boolean value. The method will repeatedly
wait() until it evaluates to true. The final predicate value is
the return value.
"""
result = predicate()
Expand All @@ -307,7 +321,7 @@ def notify(self, n=1):
If the calling coroutine has not acquired the lock when this method
is called, a RuntimeError is raised.

This method wakes up at most n of the coroutines waiting for the
This method wakes up n of the coroutines waiting for the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as in the docs above.

condition variable; it is a no-op if no coroutines are waiting.
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved

Note: an awakened coroutine does not actually return from its
Expand All @@ -316,7 +330,9 @@ def notify(self, n=1):
"""
if not self.locked():
raise RuntimeError('cannot notify on un-acquired lock')
self._notify(n)

def _notify(self, n):
idx = 0
for fut in self._waiters:
if idx >= n:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the docstring for notify_all() below mentions threads (twice). That should probably be changed to tasks. (Or coroutines, like above? Though IMO that should also be tasks -- in practice all coroutines are wrapped by tasks, and tasks are the unit of control that users are encouraged to think in terms of.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the mention of "coroutines" is a relic from the very old days.
The locks.py discusses coroutines in a lot of the docstrings where "tasks" are more appropriate. scheduling works on Task objects, not coroutines. I can change it wholesale, do I use "Task" or "task" when doing so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use "task" -- it doesn't really matter whether they are technically instances of asyncio.Task, and in fact IIRC even loops may override create_task() to return instances of some other class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I take it you approve of me going over the other inline docs/comments and making that correction, I'll do that in a separate commit.

Expand Down
92 changes: 92 additions & 0 deletions Lib/test/test_asyncio/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,98 @@ async def func():
# originally raised.
self.assertIs(err.exception, raised)

async def test_cancelled_wakeup(self):
# Test that a task cancelled at the "same" time as it is woken
# up as part of a Condition.notify() does not result in a lost wakeup.
# This test simulates a cancel while the target task is awaiting initial
# wakeup on the wakeup queue.
condition = asyncio.Condition()
state = 0
async def consumer():
nonlocal state
async with condition:
while True:
await condition.wait_for(lambda: state != 0)
if state < 0:
return
state -= 1

# create two consumers
c = [asyncio.create_task(consumer()) for _ in range(2)]
# wait for them to settle
await asyncio.sleep(0)
async with condition:
# produce one item and wake up one
state += 1
condition.notify(1)

# Cancel it while it is awaiting to be run.
# This cancellation could come from the outside
c[0].cancel()

# now wait for the item to be consumed
# if it doesn't means that our "notify" didn"t take hold.
# because it raced with a cancel()
try:
async with asyncio.timeout(0.01):
await condition.wait_for(lambda: state == 0)
except TimeoutError:
pass
self.assertEqual(state, 0)

# clean up
state = -1
condition.notify_all()
await c[1]

async def test_cancelled_wakeup_relock(self):
# Test that a task cancelled at the "same" time as it is woken
# up as part of a Condition.notify() does not result in a lost wakeup.
# This test simulates a cancel while the target task is acquiring the lock
# again.
condition = asyncio.Condition()
state = 0
async def consumer():
nonlocal state
async with condition:
while True:
await condition.wait_for(lambda: state != 0)
if state < 0:
return
state -= 1

# create two consumers
c = [asyncio.create_task(consumer()) for _ in range(2)]
# wait for them to settle
await asyncio.sleep(0)
async with condition:
# produce one item and wake up one
state += 1
condition.notify(1)

# now we sleep for a bit. This allows the target task to wake up and
# settle on re-aquiring the lock
await asyncio.sleep(0)

# Cancel it while awaiting the lock
# This cancel could come the outside.
c[0].cancel()

# now wait for the item to be consumed
# if it doesn't means that our "notify" didn"t take hold.
# because it raced with a cancel()
try:
async with asyncio.timeout(0.01):
await condition.wait_for(lambda: state == 0)
except TimeoutError:
pass
self.assertEqual(state, 0)

# clean up
state = -1
condition.notify_all()
await c[1]

class SemaphoreTests(unittest.IsolatedAsyncioTestCase):

def test_initial_value_zero(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that a :func:`asyncio.Condition.notify` call does not get lost if the notified ``Task`` is simultaneously cancelled or encounters any other error.
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved