forked from RustPython/RustPython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_stuff.py
72 lines (56 loc) · 1.17 KB
/
async_stuff.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import asyncio
class ContextManager:
async def __aenter__(self):
print("Entrada")
ls.append(1)
return 1
def __str__(self):
ls.append(2)
return "c'est moi!"
async def __aexit__(self, exc_type, exc_val, exc_tb):
ls.append(3)
print("Wiedersehen")
ls = []
class AIterWrap:
def __init__(self, obj):
self._it = iter(obj)
def __aiter__(self):
return self
async def __anext__(self):
try:
value = next(self._it)
except StopIteration:
raise StopAsyncIteration
return value
async def a(s, m):
async with ContextManager() as b:
print(f"val = {b}")
await asyncio.sleep(s)
async for i in AIterWrap(range(0, 2)):
print(i)
ls.append(m)
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.wait(
[a(0, "hello1"), a(0.75, "hello2"), a(1.5, "hello3"), a(2.25, "hello4")]
)
)
assert ls == [
1,
3,
1,
3,
1,
3,
1,
3,
"hello1",
"hello2",
"hello1",
"hello3",
"hello2",
"hello4",
"hello3",
"hello4",
]