-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtest_scheduler.py
153 lines (116 loc) · 3.88 KB
/
test_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# ruff: noqa: D100, D101, D102, D103, D104, D107
from __future__ import annotations
import asyncio
import threading
from dataclasses import replace
from typing import TYPE_CHECKING, TypeAlias
from unittest.mock import call
from immutable import Immutable
from redux.basic_types import (
BaseAction,
BaseEvent,
CompleteReducerResult,
CreateStoreOptions,
FinishAction,
FinishEvent,
InitAction,
InitializationActionError,
ReducerResult,
)
from redux.main import Store
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine
from pytest_mock import MockerFixture
class StateType(Immutable):
value: int
class IncrementAction(BaseAction): ...
class WaitEvent(BaseEvent): ...
Action = IncrementAction | InitAction | FinishAction
Event = WaitEvent
def reducer(
state: StateType | None,
action: Action,
) -> ReducerResult[StateType, Action, Event]:
if state is None:
if isinstance(action, InitAction):
return StateType(value=0)
raise InitializationActionError(action)
if isinstance(action, IncrementAction):
return CompleteReducerResult(
state=replace(state, value=state.value + 1),
events=[WaitEvent()],
)
return state
StoreType: TypeAlias = Store[StateType, IncrementAction | InitAction, FinishEvent]
class Scheduler(threading.Thread):
def __init__(self: Scheduler) -> None:
super().__init__()
self.stopped = False
self._callbacks: list[tuple[Callable[[], None], float]] = []
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tasks: set[asyncio.Task] = set()
def run(self: Scheduler) -> None:
self.loop.run_forever()
def set(self: Scheduler, callback: Callable[[], None], *, interval: bool) -> None:
self.loop.call_soon_threadsafe(
self.loop.create_task,
self.call_callback(callback, interval=interval),
)
async def call_callback(
self: Scheduler,
callback: Callable[[], None],
*,
interval: bool,
) -> None:
if self.stopped:
return
self.tasks.add(self.loop.create_task(asyncio.to_thread(callback)))
if interval:
await asyncio.sleep(0.01)
self.tasks.add(
self.loop.create_task(self.call_callback(callback, interval=interval)),
)
async def graceful_stop(self: Scheduler) -> None:
await asyncio.sleep(0.05)
self.loop.stop()
def schedule_stop(self: Scheduler) -> None:
self.stopped = True
self.loop.call_soon_threadsafe(self.loop.create_task, self.graceful_stop())
def test_scheduler(mocker: MockerFixture) -> None:
scheduler = Scheduler()
scheduler.start()
def _create_task_with_callback(
coro: Coroutine,
callback: Callable[[asyncio.Task], None] | None = None,
) -> None:
def create_task_with_callback() -> None:
task = scheduler.loop.create_task(coro)
if callback:
callback(task)
scheduler.loop.call_soon_threadsafe(create_task_with_callback)
store = Store(
reducer,
options=CreateStoreOptions(
auto_init=True,
scheduler=scheduler.set,
task_creator=_create_task_with_callback,
on_finish=scheduler.schedule_stop,
grace_time_in_seconds=0.2,
),
)
render = mocker.stub()
store.subscribe_event(
FinishEvent,
lambda _: time.sleep(0.1) or store.dispatch(IncrementAction()),
)
store._subscribe(render) # noqa: SLF001
import time
time.sleep(0.1)
for _ in range(10):
store.dispatch(IncrementAction())
store.dispatch(FinishAction())
scheduler.join()
render.assert_has_calls(
[call(StateType(value=i)) for i in range(11)] + [call(StateType(value=10))],
)