-
-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathtest_concurrent.py
73 lines (56 loc) · 2.45 KB
/
test_concurrent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import trio
import trio_asyncio
import asyncio
import pytest
from trio_asyncio._loop import TrioPolicy
# Tests for concurrent or nested loops
@pytest.mark.trio
async def test_parallel():
loops = [None, None]
async with trio.open_nursery() as n:
async def gen_loop(i, task_status=trio.TASK_STATUS_IGNORED):
task_status.started()
async with trio_asyncio.open_loop() as loop:
loops[i] = loop
assert not isinstance(asyncio._get_running_loop(), trio_asyncio.TrioEventLoop)
await n.start(gen_loop, 0)
await n.start(gen_loop, 1)
assert isinstance(loops[0], trio_asyncio.TrioEventLoop)
assert isinstance(loops[1], trio_asyncio.TrioEventLoop)
assert loops[0] is not loops[1]
@pytest.mark.trio
async def test_nested():
loops = [None, None]
async with trio.open_nursery() as n:
async def gen_loop(i, task_status=trio.TASK_STATUS_IGNORED):
task_status.started()
async with trio_asyncio.open_loop() as loop:
loops[i] = loop
if i > 0:
await n.start(gen_loop, i - 1)
assert not isinstance(asyncio._get_running_loop(), trio_asyncio.TrioEventLoop)
await n.start(gen_loop, 1)
assert not isinstance(asyncio._get_running_loop(), trio_asyncio.TrioEventLoop)
assert isinstance(loops[0], trio_asyncio.TrioEventLoop)
assert isinstance(loops[1], trio_asyncio.TrioEventLoop)
assert loops[0] is not loops[1]
async def _test_same_task():
assert isinstance(asyncio.get_event_loop_policy(), TrioPolicy)
def get_loop(i, loop, policy):
assert loop == asyncio.get_event_loop()
assert policy == asyncio.get_event_loop_policy()
async with trio.open_nursery():
async with trio_asyncio.open_loop() as loop1:
policy = asyncio.get_event_loop_policy()
assert isinstance(policy, TrioPolicy)
async with trio_asyncio.open_loop() as loop2:
p2 = asyncio.get_event_loop_policy()
assert policy is p2, (policy, p2)
loop1.call_later(0.1, get_loop, 0, loop1, policy)
loop2.call_later(0.1, get_loop, 1, loop2, policy)
await trio.sleep(0.2)
assert isinstance(asyncio.get_event_loop_policy(), TrioPolicy)
assert asyncio._get_running_loop() is None
def test_same_task():
assert not isinstance(asyncio.get_event_loop_policy(), TrioPolicy)
trio.run(_test_same_task)