forked from dask/distributed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_threadpoolexecutor.py
128 lines (93 loc) · 3.06 KB
/
test_threadpoolexecutor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from time import sleep
import threading
from distributed.metrics import time
from distributed.threadpoolexecutor import ThreadPoolExecutor, secede, rejoin
def test_tpe():
with ThreadPoolExecutor(2) as e:
list(e.map(sleep, [0.01] * 4))
threads = e._threads.copy()
assert len(threads) == 2
def f():
secede()
return 1
assert e.submit(f).result() == 1
list(e.map(sleep, [0.01] * 4))
assert len(threads | e._threads) == 3
start = time()
while all(t.is_alive() for t in threads):
sleep(0.01)
assert time() < start + 1
def test_shutdown_timeout():
e = ThreadPoolExecutor(1)
futures = [e.submit(sleep, 0.1 * i) for i in range(1, 3, 1)]
sleep(0.01)
start = time()
e.shutdown()
end = time()
assert end - start > 0.1
def test_shutdown_timeout_raises():
e = ThreadPoolExecutor(1)
futures = [e.submit(sleep, 0.1 * i) for i in range(1, 3, 1)]
sleep(0.05)
start = time()
e.shutdown(timeout=0.1)
end = time()
assert end - start > 0.05
def test_shutdown_wait():
e = ThreadPoolExecutor(1)
future = e.submit(sleep, 1)
sleep(0.01)
start = time()
e.shutdown(wait=False)
end = time()
assert end - start < 1
def test_secede_rejoin_busy():
with ThreadPoolExecutor(2) as e:
def f():
assert threading.current_thread() in e._threads
secede()
sleep(0.1)
assert threading.current_thread() not in e._threads
rejoin()
assert len(e._threads) == 2
assert threading.current_thread() in e._threads
return threading.current_thread()
future = e.submit(f)
L = [e.submit(sleep, 0.2) for i in range(10)]
start = time()
special_thread = future.result()
stop = time()
assert 0.1 < stop - start < 0.3
assert len(e._threads) == 2
assert special_thread in e._threads
def f():
sleep(0.01)
return threading.current_thread()
futures = [e.submit(f) for _ in range(10)]
assert special_thread in {future.result() for future in futures}
def test_secede_rejoin_quiet():
with ThreadPoolExecutor(2) as e:
def f():
assert threading.current_thread() in e._threads
secede()
sleep(0.1)
assert threading.current_thread() not in e._threads
rejoin()
assert len(e._threads) == 2
assert threading.current_thread() in e._threads
return threading.current_thread()
future = e.submit(f)
result = future.result()
def test_rejoin_idempotent():
with ThreadPoolExecutor(2) as e:
def f():
secede()
for i in range(5):
rejoin()
return 1
future = e.submit(f)
result = future.result()
def test_thread_name():
with ThreadPoolExecutor(2) as e:
e.map(id, range(10))
assert len({thread.name for thread in e._threads}) == 2