forked from adafruit/circuitpython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasyncio_lock.py
98 lines (81 loc) · 2.48 KB
/
asyncio_lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# Test Lock class
#
# This test has a corresponding .exp file because it tests a very specific ordering of
# events when cancelling a task waiting on a lock, and that ordering should not change
# (even though it does match CPython's output).
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
async def task_loop(id, lock):
print("task start", id)
for i in range(3):
async with lock:
print("task have", id, i)
print("task end", id)
async def task_sleep(lock):
async with lock:
print("task have", lock.locked())
await asyncio.sleep(0.2)
print("task release", lock.locked())
await lock.acquire()
print("task have again")
lock.release()
async def task_cancel(id, lock, to_cancel=None):
try:
async with lock:
print("task got", id)
await asyncio.sleep(0.1)
print("task release", id)
if to_cancel:
to_cancel[0].cancel()
except asyncio.CancelledError:
print("task cancel", id)
async def main():
lock = asyncio.Lock()
# Basic acquire/release
print(lock.locked())
await lock.acquire()
print(lock.locked())
await asyncio.sleep(0)
lock.release()
print(lock.locked())
await asyncio.sleep(0)
# Use with "async with"
async with lock:
print("have lock")
# 3 tasks wanting the lock
print("----")
asyncio.create_task(task_loop(1, lock))
asyncio.create_task(task_loop(2, lock))
t3 = asyncio.create_task(task_loop(3, lock))
await lock.acquire()
await asyncio.sleep(0)
lock.release()
await t3
# 2 sleeping tasks both wanting the lock
print("----")
asyncio.create_task(task_sleep(lock))
await asyncio.sleep(0.1)
await task_sleep(lock)
# 3 tasks, the first cancelling the second, the third should still run
print("----")
ts = [None]
asyncio.create_task(task_cancel(0, lock, ts))
ts[0] = asyncio.create_task(task_cancel(1, lock))
asyncio.create_task(task_cancel(2, lock))
await asyncio.sleep(0.3)
print(lock.locked())
# 3 tasks, the second and third being cancelled while waiting on the lock
print("----")
t0 = asyncio.create_task(task_cancel(0, lock))
t1 = asyncio.create_task(task_cancel(1, lock))
t2 = asyncio.create_task(task_cancel(2, lock))
await asyncio.sleep(0.05)
t1.cancel()
await asyncio.sleep(0.1)
t2.cancel()
await asyncio.sleep(0.1)
print(lock.locked())
asyncio.run(main())