forked from dask/distributed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_metrics.py
63 lines (52 loc) · 1.62 KB
/
test_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import sys
import threading
import time
from distributed import metrics
from distributed.utils_test import run_for
def test_wall_clock():
for i in range(3):
time.sleep(0.01)
t = time.time()
samples = [metrics.time() for j in range(50)]
# Resolution
deltas = [samples[j + 1] - samples[j] for j in range(len(samples) - 1)]
assert min(deltas) >= 0.0, deltas
assert max(deltas) <= 1.0, deltas
assert any(lambda d: 0.0 < d < 0.0001 for d in deltas), deltas
# Close to time.time()
assert t - 0.5 < samples[0] < t + 0.5
def test_process_time():
start = metrics.process_time()
run_for(0.05)
dt = metrics.process_time() - start
assert 0.03 <= dt <= 0.2
# All threads counted
t = threading.Thread(target=run_for, args=(0.1,))
start = metrics.process_time()
t.start()
t.join()
dt = metrics.process_time() - start
assert dt >= 0.05
# Sleep time not counted
start = metrics.process_time()
time.sleep(0.1)
dt = metrics.process_time() - start
assert dt <= 0.05
def test_thread_time():
start = metrics.thread_time()
run_for(0.05)
dt = metrics.thread_time() - start
assert 0.03 <= dt <= 0.2
# Sleep time not counted
start = metrics.thread_time()
time.sleep(0.1)
dt = metrics.thread_time() - start
assert dt <= 0.05
if sys.platform == "linux":
# Always per-thread on Linux
t = threading.Thread(target=run_for, args=(0.1,))
start = metrics.thread_time()
t.start()
t.join()
dt = metrics.thread_time() - start
assert dt <= 0.05