/
utils.py
257 lines (211 loc) · 7.47 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
"""Utilities for state machine execution."""
import datetime
import functools
import contextlib
from typing import Any, Dict, List, Tuple, Optional, Sequence, Collection
import dateutil.tz
from sqlalchemy import func
from routemaster.db import Label, History
from routemaster.app import App
from routemaster.feeds import feeds_for_state_machine
from routemaster.config import Gate, State, StateMachine, ContextNextStates
from routemaster.context import Context
from routemaster.logging import BaseLogger
from routemaster.state_machine.types import LabelRef, Metadata
from routemaster.state_machine.exceptions import (
UnknownLabel,
UnknownStateMachine,
)
def get_state_machine(app: App, label: LabelRef) -> StateMachine:
"""Finds the state machine instance by name in the app config."""
try:
return app.config.state_machines[label.state_machine]
except KeyError:
raise UnknownStateMachine(label.state_machine)
def choose_next_state(
state_machine: StateMachine,
current_state: State,
context: Context,
) -> State:
"""Assuming a transition is valid, choose a next state."""
next_state_name = current_state.next_states.next_state_for_label(context)
return state_machine.get_state(next_state_name)
def get_label_metadata(
app: App,
label: LabelRef,
state_machine: StateMachine,
) -> Tuple[Dict[str, Any], bool]:
"""Get the metadata and whether the label has been deleted."""
return app.session.query(Label.metadata, Label.deleted).filter_by(
name=label.name,
state_machine=state_machine.name,
).first()
def get_current_state(
app: App,
label: LabelRef,
state_machine: StateMachine,
) -> Optional[State]:
"""Get the current state of a label, based on its last history entry."""
history_entry = get_current_history(app, label)
if history_entry.new_state is None:
# label has been deleted
return None
return state_machine.get_state(history_entry.new_state)
def get_current_history(app: App, label: LabelRef) -> History:
"""Get a label's last history entry."""
history_entry = app.session.query(History).filter_by(
label_name=label.name,
label_state_machine=label.state_machine,
).order_by(
# Our model type stubs define the `id` attribute as `int`, yet
# sqlalchemy actually allows the attribute to be used for ordering like
# this; ignore the type check here specifically rather than complicate
# our type definitions.
History.id.desc(), # type: ignore
).first()
if history_entry is None:
raise UnknownLabel(label)
return history_entry
def needs_gate_evaluation_for_metadata_change(
app: App,
state_machine: StateMachine,
label: LabelRef,
update: Metadata,
) -> Tuple[bool, State]:
"""
Given a change to the metadata, should the gate evaluation be triggered.
"""
current_state = get_current_state(app, label, state_machine)
if current_state is None:
raise ValueError(
f"Cannot determine gate evaluation for deleted label {label} "
"(deleted labels have no current state)",
)
if not isinstance(current_state, Gate):
# Label is not a gate state so there's no trigger to resolve.
return False, current_state
if any(
trigger.should_trigger_for_update(update)
for trigger in current_state.metadata_triggers
):
return True, current_state
return False, current_state
def lock_label(app: App, label: LabelRef) -> Label:
"""Lock a label in the current transaction."""
row = app.session.query(Label).filter_by(
name=label.name,
state_machine=label.state_machine,
).with_for_update().first()
if row is None:
raise UnknownLabel(label)
return row
def labels_in_state(
app: App,
state_machine: StateMachine,
state: State,
) -> List[str]:
"""Util to get all the labels in an action state that need retrying."""
return _labels_in_state(app, state_machine, state, True)
def labels_in_state_with_metadata(
app: App,
state_machine: StateMachine,
state: State,
path: Sequence[str],
values: Collection[str],
) -> List[str]:
"""
Util to get all the labels in a given state with some metadata value.
The metadata lookup happens at the given path, allowing for any of the
posible values given.
"""
if not values:
raise ValueError("Must specify at least one possible value")
metadata_lookup = Label.metadata
for part in path:
metadata_lookup = metadata_lookup[part] # type: ignore
return _labels_in_state(
app,
state_machine,
state,
metadata_lookup.astext.in_(values), # type: ignore
)
def labels_needing_metadata_update_retry_in_gate(
app: App,
state_machine: StateMachine,
state: State,
) -> List[str]:
"""Util to get all the labels in a gate state that need retrying."""
if not isinstance(state, Gate): # pragma: no branch
raise ValueError( # pragma: no cover
f"labels_needing_metadata_update_retry_in_gate called with "
f"{state.name} which is not a Gate",
)
return _labels_in_state(
app,
state_machine,
state,
~Label.metadata_triggers_processed,
)
def _labels_in_state(
app: App,
state_machine: StateMachine,
state: State,
filter_: Any,
) -> List[str]:
"""Util to get all the labels in an action state that need retrying."""
states_by_rank = app.session.query(
History.label_name,
History.new_state,
func.row_number().over(
# Our model type stubs define the `id` attribute as `int`, yet
# sqlalchemy actually allows the attribute to be used for ordering
# like this; ignore the type check here specifically rather than
# complicate our type definitions.
order_by=History.id.desc(), # type: ignore
partition_by=History.label_name,
).label('rank'),
).filter_by(
label_state_machine=state_machine.name,
).subquery()
ranked_transitions = app.session.query(
states_by_rank.c.label_name,
).filter(
states_by_rank.c.rank == 1,
states_by_rank.c.new_state == state.name,
).join(Label).filter(
filter_,
)
return [x for x, in ranked_transitions]
def context_for_label(
label: LabelRef,
metadata: Metadata,
state_machine: StateMachine,
state: State,
history_entry: Any,
logger: BaseLogger,
) -> Context:
"""Util to build the context for a label in a state."""
feeds = feeds_for_state_machine(state_machine)
accessed_variables: List[str] = []
if isinstance(state, Gate):
accessed_variables.extend(state.exit_condition.accessed_variables())
if isinstance(state.next_states, ContextNextStates):
accessed_variables.append(state.next_states.path)
@contextlib.contextmanager
def feed_logging_context(feed_url):
with logger.process_feed(state_machine, state, feed_url):
yield functools.partial(
logger.feed_response,
state_machine,
state,
feed_url,
)
return Context(
label=label.name,
metadata=metadata,
now=datetime.datetime.now(dateutil.tz.tzutc()),
feeds=feeds,
accessed_variables=accessed_variables,
current_history_entry=history_entry,
feed_logging_context=feed_logging_context,
)