Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-111693: Propagate correct asyncio.CancelledError instance out of asyncio.Condition.wait() #111694

Merged
merged 14 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion Lib/asyncio/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ def __repr__(self):

def locked(self):
"""Returns True if semaphore cannot be acquired immediately."""
# due to state, or FIFO rules (must allow others to run first)
return self._value == 0 or (
any(not w.cancelled() for w in (self._waiters or ())))

Expand All @@ -379,6 +380,7 @@ async def acquire(self):
True.
"""
if not self.locked():
# Maintain FIFO, wait for others to start even if _value > 0
self._value -= 1
return True

Expand All @@ -403,7 +405,8 @@ async def acquire(self):
raise

finally:
# new waiters may have arrived. Wake up as many as are allowed
# new waiters may have arrived but had to wait due to FIFO.
# Wake up as many as are allowed.
while self._value > 0:
if not self._wake_up_next():
break # there was no-one to wake up
Expand Down
55 changes: 55 additions & 0 deletions Lib/test/test_asyncio/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,61 @@ async def c3(result):
await asyncio.gather(*tasks, return_exceptions=True)
self.assertEqual([2, 3], result)

async def test_acquire_fifo_order_4(self):
# test that a successfule `acquire()` will wake up multiple Tasks
# that were waiting in the Semaphore queue due to FIFO rules
sem = asyncio.Semaphore(0)
result = []
count = 0

async def c1(result):
# first task immediatlly waits for semaphore. It will be awoken by c2
self.assertEqual(sem._value, 0)
await sem.acquire()
# we should have woken up all waiting tasks now
self.assertEqual(sem._value, 0)
# create a fourth task. It should run after c3, not c2
nonlocal t4
t4 = asyncio.create_task(c4(result))
result.append(1)
return True

async def c2(result):
# second task begins by releasing semaphore three times, for c1, c2, and c2
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
sem.release()
sem.release()
sem.release()
self.assertEqual(sem._value, 2)
# it is locked, because c1 hasn't woken up yet
self.assertTrue(sem.locked())
await sem.acquire()
result.append(2)
return True

async def c3(result):
await sem.acquire()
self.assertTrue(sem.locked())
result.append(3)
return True

async def c4(result):
result.append(4)
return True

t1 = asyncio.create_task(c1(result))
t2 = asyncio.create_task(c2(result))
t3 = asyncio.create_task(c3(result))
t4 = None

await asyncio.sleep(0)
# three tasks are in the queue, the first hasn't woken up yet
self.assertEqual(sem._value, 2)
self.assertEqual(len(sem._waiters), 3)
await asyncio.sleep(0)

tasks = [t1, t2, t3, t4]
await asyncio.gather(*tasks)
self.assertEqual([1, 2, 3, 4], result)

class BarrierTests(unittest.IsolatedAsyncioTestCase):

Expand Down