This repository was archived by the owner on Aug 25, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 138
/
Copy pathtest_monitor.py
81 lines (64 loc) · 2.49 KB
/
test_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# SPDX-License-Identifier: MIT
# Copyright (c) 2019 Intel Corporation
import asyncio
from dffml.util.monitor import Monitor, Task
from dffml.util.asynctestcase import AsyncTestCase
async def test_task(task=Task()):
for i in range(0, 10):
await asyncio.sleep(0.01)
await task.update(i)
async def log_task(task=Task()):
for i in range(0, 10):
await task.log("i is now %d", i)
async def recv_statuses(status, sleep):
log = []
await asyncio.sleep(sleep)
async for msg in status:
log.append(msg)
return log
class TestMonitor(AsyncTestCase):
async def setUp(self):
await super().setUp()
self.monitor = Monitor()
async def test_00_await_complete(self):
await self.monitor.complete((await self.monitor.start(test_task))._key)
async def test_01_single_watching_status(self):
task = await self.monitor.start(test_task)
statuses = await recv_statuses(self.monitor.status(task._key), 0.05)
self.assertEqual(len(statuses), 10)
for i in range(0, 10):
self.assertEqual(statuses[i], i)
async def test_02_multiple_watching(self):
task = await self.monitor.start(test_task)
res = await asyncio.gather(
*[
recv_statuses(self.monitor.status(task._key), i * 0.01)
for i in range(0, 5)
]
)
for statuses in res:
self.assertEqual(len(statuses), 10)
for i in range(0, 10):
self.assertEqual(statuses[i], i)
async def test_03_log(self):
await self.monitor.complete((await self.monitor.start(log_task))._key)
async def test_04_already_complete(self):
task = await self.monitor.start(log_task)
await self.monitor.complete(task._key)
await self.monitor.complete(task._key)
async def test_05_already_complete_status(self):
task = await self.monitor.start(log_task)
await self.monitor.complete(task._key)
self.assertFalse([msg async for msg in self.monitor.status(task._key)])
async def test_06_log_status(self):
i = 0
async for msg in self.monitor.log_status(
(await self.monitor.start(test_task))._key
):
self.assertEqual(msg, i)
i += 1
self.assertEqual(i, 10)
async def test_07_already_running(self):
task = await self.monitor.start(test_task)
await self.monitor.start(task, task._key)
await self.monitor.complete(task._key)