-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtest_monitor_fixtures.py
131 lines (99 loc) · 2.94 KB
/
test_monitor_fixtures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# ruff: noqa: D100, D101, D102, D103, D104, D107
from __future__ import annotations
from dataclasses import replace
from typing import TYPE_CHECKING
import pytest
from immutable import Immutable
from redux.basic_types import (
BaseAction,
BaseEvent,
CompleteReducerResult,
CreateStoreOptions,
FinishAction,
FinishEvent,
InitAction,
InitializationActionError,
)
from redux.main import Store
if TYPE_CHECKING:
from redux_pytest.fixtures.monitor import StoreMonitor
from redux_pytest.fixtures.snapshot import StoreSnapshot
from redux_pytest.fixtures.wait_for import WaitFor
class StateType(Immutable):
value: int
mirrored_value: int
class IncrementAction(BaseAction): ...
class DummyEvent(BaseEvent): ...
Action = IncrementAction | InitAction | FinishAction
@pytest.fixture
def snapshot_prefix() -> str:
return 'prefix'
def reducer(
state: StateType | None,
action: Action,
) -> StateType | CompleteReducerResult[StateType, Action, DummyEvent | FinishEvent]:
if state is None:
if isinstance(action, InitAction):
return StateType(value=0, mirrored_value=0)
raise InitializationActionError(action)
if isinstance(action, IncrementAction):
return replace(state, value=state.value + 1)
return state
@pytest.fixture
def store() -> Store:
return Store(
reducer,
options=CreateStoreOptions(
auto_init=True,
),
)
def test_monitor_action(
store: Store,
store_monitor: StoreMonitor,
needs_finish: None,
) -> None:
_ = needs_finish
store.dispatch(IncrementAction())
store_monitor.dispatched_actions.assert_called_once_with(IncrementAction())
def test_monitor_event(
store: Store,
store_monitor: StoreMonitor,
needs_finish: None,
) -> None:
_ = needs_finish
store.dispatch(DummyEvent())
store_monitor.dispatched_events.assert_called_once_with(DummyEvent())
def test_multiple_stores(
store: Store,
store_monitor: StoreMonitor,
needs_finish: None,
) -> None:
_ = needs_finish
store = Store(
reducer,
options=CreateStoreOptions(
auto_init=True,
),
)
store.dispatch(IncrementAction())
store_monitor.dispatched_actions.assert_not_called()
store_monitor.monitor(store)
store.dispatch(IncrementAction())
store_monitor.dispatched_actions.assert_called_once_with(IncrementAction())
store.dispatch(FinishAction())
def test_closed_snapshot_store(
store_snapshot: StoreSnapshot,
wait_for: WaitFor,
store: Store,
) -> None:
store.dispatch(FinishAction())
@wait_for
def is_closed() -> None:
assert store_snapshot._is_closed # noqa: SLF001
with pytest.raises(
RuntimeError,
match='^Snapshot context is closed, make sure you are not calling `take` '
'after `FinishEvent` is dispatched.$',
):
store_snapshot.take()
is_closed()