/
_algorithm_utils.py
427 lines (364 loc) · 19.5 KB
/
_algorithm_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
"""Basic utilities to work with algorithm and pipeline objects."""
from __future__ import annotations
import types
import warnings
from functools import wraps
from inspect import isclass
from pickle import PicklingError
from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar, Union, cast, overload
from typing_extensions import Concatenate, ParamSpec
from tpcp import Algorithm
from tpcp._base import NOTHING, BaseTpcpObject, _get_annotated_fields_of_type, _Nothing
from tpcp._hash import custom_hash
from tpcp._parameters import _ParaTypes
from tpcp.exceptions import PotentialUserErrorWarning
ACTION_METHOD_INDICATOR = "__tpcp_action_method"
OPTIMIZE_METHOD_INDICATOR = "__tpcp_optimize_method"
if TYPE_CHECKING:
from tpcp._algorithm import AlgorithmT
T = TypeVar("T")
K = TypeVar("K")
P = ParamSpec("P")
@overload
def _split_returns(values: tuple[T, K]) -> tuple[T, K]:
...
@overload
def _split_returns(values: T) -> tuple[T, tuple[_Nothing, _Nothing]]:
...
def _split_returns(values):
if isinstance(values, (list, tuple)):
value, other = values
else:
value = values
other = (NOTHING, NOTHING)
return value, other
def get_action_method(instance: Union[type[Algorithm], Algorithm], method_name: Optional[str] = None) -> Callable:
"""Get the action method for an Algorithm.
If method_name is None, the primary action method is returned (the one listed first in `Algorithm._action_methods`).
Otherwise, the action method belonging to the respective name is returned.
"""
method_names = get_action_methods_names(instance)
if method_name is not None:
if method_name not in method_names:
raise ValueError(
"`method_name` must be one of the specified action methods of the algorithm. "
f"Valid ones are {method_names}"
)
else:
method_name = method_names[0]
return getattr(instance, method_name)
def get_action_methods_names(instance_or_cls: Union[type[Algorithm], Algorithm]) -> tuple[str, ...]:
"""Get the names of all action methods of a class.
This basically returns `instance_or_cls._action_method`, but ensures that the return type is a tuple.
"""
method_names = instance_or_cls._action_methods
if isinstance(method_names, str):
method_names = (method_names,)
if not isinstance(method_names, tuple) and len(method_names) == 0:
if isclass(instance_or_cls):
instance_or_cls = cast(type[Algorithm], instance_or_cls)
name = instance_or_cls.__name__
else:
name = type(instance_or_cls).__name__
raise ValueError(f"`_action_methods` of {name} must either be a string or a tuple of strings.")
return method_names
def get_action_params(instance: Algorithm) -> dict[str, Any]:
"""Get all "Action Params" / "Other Parameters" of the Algorithm.
Action params are all parameters passed as input to the action method.
Note: We do not magically set these values on the algorithm instance. Instead, the developer of the algorithms must
implement the algorithm to follow this convention.
In general, this function is not that useful but might be used for debugging purposes.
Returns
-------
params : dict
Parameter names mapped to their values.
"""
params = instance.get_params()
attrs = {
v: getattr(instance, v)
for v in vars(instance)
if not v.endswith("_") and not v.startswith("_") and v not in params
}
return attrs
def get_results(instance: Algorithm) -> dict[str, Any]:
"""Get all Results of the Algorithm.
"Results" or "Attributes" are all values considered results of the algorithm.
They are indicated by a trailing "_" in their name.
The values are only populated after the action method of the algorithm was called.
Returns
-------
params : dict
Parameter names mapped to their values.
Raises
------
AttributeError
If one or more of the attributes are not retrievable from the instance.
This usually indicates that the action method was not called yet.
"""
all_attributes = dir(instance)
attrs = {
v: getattr(instance, v)
for v in all_attributes
if v.endswith("_") and not v.startswith("__") and not isinstance(getattr(instance, v), types.MethodType)
}
return attrs
def is_action_applied(instance: Algorithm) -> bool:
"""Check if the action method was already called/results were generated."""
if len(get_results(instance)) == 0:
return False
return True
def _check_safe_run(algorithm: AlgorithmT, old_method: Callable, *args: Any, **kwargs: Any) -> AlgorithmT:
"""Run the pipeline and check that run behaved as expected."""
before_paras = algorithm.get_params()
before_paras_hash = custom_hash(before_paras)
output: AlgorithmT
# In this case the method is already bound and we do not need to pass the algo as first argument
output = old_method(*args, **kwargs) if hasattr(old_method, "__self__") else old_method(algorithm, *args, **kwargs)
after_paras = algorithm.get_params()
after_paras_hash = custom_hash(after_paras)
if not before_paras_hash == after_paras_hash:
raise ValueError(
f"Running `{old_method.__name__}` of {type(algorithm).__name__} did modify the parameters of the "
"algorithm. "
"This must not happen to make sure individual runs of the algorithm/pipeline are independent.\n\n"
"This usually happens when you use an algorithm object or other mutable objects as a parameter to your "
"algorithm/pipeline. "
"In this case, make sure you call `algo_object.clone()` or more general `clone(mutable_input)` on the "
f"object within the `{old_method.__name__}` method before modifying the mutable or running the nested "
"algorithm."
)
if not isinstance(output, type(algorithm)):
raise TypeError(
f"The `{old_method.__name__}` method of {type(algorithm).__name__} must return `self` or in rare cases a "
f"new instance of {type(algorithm).__name__}. "
f"But the return value had the type {type(output)}."
)
if not is_action_applied(output):
raise ValueError(
f"Running the `{old_method.__name__}` method of {type(algorithm).__name__} did not set any results on the "
"output. "
f"Make sure the `{old_method.__name__}` method sets the result values as expected as class attributes and "
f"all names of result attributes have a trailing `_` to mark them as such."
)
return output
def make_action_safe(action_method: Callable[P, T]) -> Callable[P, T]:
"""Mark a method as an "action" and apply a set of runtime checks to prevent implementation errors.
This decorator marks a method as action.
Each algorithm is expected to have at least one action method.
For pipelines this action method is called "run".
This means, when implementing a custom action or run method, it must always be wrapped in this decorator.
Besides registering the method, the following things are checked at runtime:
- The action method must return `self` (or at least an instance of the algorithm or pipeline)
- The action method must set result attributes on the pipeline
- All result attributes must have a trailing `_` in their name
- The action method must not modify the input parameters of the pipeline
In general, we recommend to just apply this decorator to all custom action methods.
The runtime overhead is usually small enough to not make a difference.
Examples
--------
>>> from tpcp import Algorithm, make_action_safe
>>> class MyAlgorithm(Algorithm):
... @make_action_safe
... def detect(self, data, sampling_rate_hz):
... ...
... return self
"""
if getattr(action_method, ACTION_METHOD_INDICATOR, False) is True:
# It seems like the decorator was already applied and we do not want to apply it multiple times and run
# duplicated checks.
return action_method
@wraps(action_method)
def safe_wrapped(*args: P.args, **kwargs: P.kwargs) -> AlgorithmT:
self: AlgorithmT = args[0]
if action_method.__name__ not in get_action_methods_names(self):
warnings.warn(
"The `make_action_safe` decorator should only be applied to an action method "
f"({get_action_methods_names(self)} for {type(self)}) of an algorithm or methods. "
f"To register an action method add the following to the class definition of {type(self)}:\n\n"
f"` _action_methods = ({action_method.__name__},)`\n\n"
"Or append it to the tuple, if it already exists.",
PotentialUserErrorWarning,
stacklevel=2,
)
return _check_safe_run(self, action_method, *args[1:], **kwargs)
setattr(safe_wrapped, ACTION_METHOD_INDICATOR, True)
return cast(Callable[P, T], safe_wrapped)
def _get_nested_opti_paras(
algorithm: BaseTpcpObject, opti_para_names: list[str]
) -> tuple[dict[str, Any], dict[str, Any]]:
paras = algorithm.get_params(deep=True)
optimizable_paras = {}
other_paras = {}
for p, v in paras.items():
if p in opti_para_names or any(p.startswith(o + "__") for o in opti_para_names):
# For each optimizable parameter, we also add all children, as they are also allowed to change,
# if the parent is allowed to.
optimizable_paras[p] = v
else:
other_paras[p] = v
# We need to exclude "parent" objects, when a nested para is marked as optimizable
# Because, if the nested para changes, the parent para will change as well, and we can not do anything about it.
for p in optimizable_paras:
parent_name = p.rsplit("__", 1)[0]
other_paras.pop(parent_name, None)
return optimizable_paras, other_paras
def _check_safe_optimize( # noqa: C901, PLR0912
algorithm: AlgorithmT,
old_method: Callable[Concatenate[AlgorithmT, P], Union[AlgorithmT, tuple[AlgorithmT, T]]],
*args: Any,
**kwargs: Any,
) -> Union[AlgorithmT, tuple[AlgorithmT, T]]:
# record the hash of the pipeline to make an educated guess if the optimization works
opti_para_names = _get_annotated_fields_of_type(algorithm, _ParaTypes.OPTI)
optimizable_paras, other_paras = _get_nested_opti_paras(algorithm, opti_para_names)
if len(optimizable_paras) == 0:
raise ValueError(
f"No parameter of {type(algorithm).__name__} was marked as optimizable. "
"Mark at least one; parameter with the `OptiPara`/`OptimizablePara` annotation to use "
"`self_optimize`."
)
before_hash_optimizable = custom_hash(optimizable_paras)
before_hash_other = custom_hash(other_paras)
# We also precalculate the hash of the indidividual inputs here.
# Otherwise, we can not capture the "before" state correctly, in case some parameters are mutables (container,
# or custom object instances)
before_hash_other_individual = {k: custom_hash(v) for k, v in other_paras.items()}
optimized_algorithm: AlgorithmT
if hasattr(old_method, "__self__"):
# In this case the method is already bound and we do not need to pass the algo as first argument
optimized_algorithm, other_returns = _split_returns(old_method(*args, **kwargs))
else:
optimized_algorithm, other_returns = _split_returns(old_method(algorithm, *args, **kwargs))
if old_method.__name__ == "self_optimize" and other_returns != (NOTHING, NOTHING):
raise ValueError(
"Calling `self_optimize` returned further return values beside `self`."
"If you want to return other results besides the optimized pipeline itself, implement and "
"use `self_optimize_with_info` instead of `self_optimize`."
)
if old_method.__name__ == "self_optimize_with_info" and other_returns == (NOTHING, NOTHING):
raise ValueError(
"Calling `self_optimize_with_info` returned only a single result."
"This method is expected to return the optimized pipline/algorithm AND additional "
"information from the optimization process."
"If you don't have additional information to return, use/implement `self_optimize` instead "
"of `self_optimize_with_info` or return `None` as additional information."
)
if not isinstance(optimized_algorithm, type(algorithm)):
raise TypeError(
"Calling `self_optimize`/`self_optimize_with_info` did not return an instance of the algorithm/pipeline "
"itself! Normally, this method should return `self`."
)
# We calculate the hash afterwards twice.
# Once directly after the optimization and once after cloning.
# The first hash records any changes to the object.
# The second hash only records changes to the parameters, because everything else is removed by clone.
# Hence, if we see differences between the hashes, other things besides the parameters are changed.
after_hash = custom_hash(optimized_algorithm)
after_hash_after_clone = custom_hash(optimized_algorithm.clone())
if after_hash_after_clone != after_hash:
raise RuntimeError(
"Optimizing seems to have changed class attributes that are not parameters (i.e. not provided in the "
"`__init__`). "
"This can lead to unexpected issues!"
)
# Now we need to check, which parameters have been modified.
# We only expect/allow parameters that are marked as "Optimizable".
# Therefore, we calculate the hash of all other parameters and check if they have changed.
# We also consider parameter changed, that did not exist or were completely removed.
# Most of the complicated magic here is in _get_nested_opti_paras.
# It takes care of including and excluding the correct parameters in the other list, even if nested paras are
# marked as "Optimizable"
after_optimizable_paras, after_other_paras = _get_nested_opti_paras(algorithm, opti_para_names)
after_hash_optimizable = custom_hash(after_optimizable_paras)
after_hash_other = custom_hash(after_other_paras)
if before_hash_other != after_hash_other:
# In this case we raise an error anyway, so lets go deep:
removed_paras = set(other_paras) - set(after_other_paras)
added_paras = set(after_other_paras) - set(other_paras)
changed_paras = []
for k in set(other_paras) - set(removed_paras):
if before_hash_other_individual[k] != custom_hash(after_other_paras[k]):
changed_paras.append(k)
changed_paras = sorted(changed_paras)
changed_paras.extend([f"{p} (removed)" for p in sorted(removed_paras)])
changed_paras.extend([f"{p} (added)" for p in sorted(added_paras)])
if not removed_paras and not added_paras and not changed_paras:
raise ValueError(
"Optimizing the pipeline has modified parameters that are not marked as optimizable. "
"However, we could not determine, which parameter actual changed. "
"This could hint at a bug with the way `tpcp` hashes objects. "
"Consider submitting a bug report for tpcp on github with a minimal example to reproduce this issue"
)
raise RuntimeError(
"Optimizing the pipeline has modified the following parameters, that were not marked as optimizable: "
f"{changed_paras}. "
"Double check the implementation of `self_optimize`/`self_optimize_with_info` and either mark the changing "
"parameters as optimizable by adding `OptiPara`/`OptimizableParameter` as type annotation or make sure "
"that they are not accidentally modified in your implementation."
)
if before_hash_optimizable == after_hash_optimizable:
# If the hash didn't change the object didn't change.
# Something might have gone wrong.
warnings.warn(
"Optimizing the algorithm doesn't seem to have changed any of the parameters marked as optimizable "
f"({optimizable_paras}). "
"This could indicate an implementation error of the `self_optimize` method.",
PotentialUserErrorWarning,
stacklevel=2,
)
if other_returns != (NOTHING, NOTHING):
return optimized_algorithm, other_returns
return optimized_algorithm
def make_optimize_safe(self_optimize_method: Callable[P, T]) -> Callable[P, T]:
"""Apply a set of runtime checks to a custom `self_optimize` method to prevent implementation errors.
The following things are checked:
- The `self_optimize` method must return `self` (or at least an instance of the algorithm or pipeline).
- The `self_optimize` method must only modify input parameters of the pipeline and not any other attributes.
- The `self_optimize` method should modify at least one of the input parameters (this doesn't raise an error,
but just a warning).
In general, we recommend to just apply this decorator to all custom `self_optimize` methods.
The runtime overhead is usually small enough to not make a difference.
The only execption are custom pipelines that you only optimize using the :class:`~tpcp.optimize.Optimize` wrapper.
This wrapper will apply the same runtime checks anyway.
However, it doesn't hurt to apply it as decorator as well.
We make sure that the cheks will still only be performed once.
Examples
--------
>>> from tpcp import Algorithm, make_optimize_safe
>>> class MyAlgorithm(Algorithm):
... def __init__(self, para_1: int = 4):
... self.para_1 = para_1
...
... @make_optimize_safe
... def self_optimize(self, train_data, **kwargs):
... # find a better value for para_1 based on the provided trainings data
... better_value_for_para_1 = 5
... self.para_1 = better_value_for_para_1
... return self
"""
if getattr(self_optimize_method, OPTIMIZE_METHOD_INDICATOR, False) is True:
# It seems like the decorator was already applied, and we do not want to apply it multiple times and run
# duplicated checks.
return self_optimize_method
@wraps(self_optimize_method)
def safe_wrapped(self: Algorithm, *args: P.args, **kwargs: P.kwargs) -> T:
if self_optimize_method.__name__ not in ("self_optimize", "self_optimize_with_info"):
warnings.warn(
"The `make_optimize_safe` decorator is only meant for the `self_optimize` method, but you applied it "
f"to the `{self_optimize_method.__name__}` method.",
PotentialUserErrorWarning,
stacklevel=2,
)
try:
return _check_safe_optimize(self, self_optimize_method, *args, **kwargs)
except PicklingError as e:
raise ValueError(
"We had trouble hashing your class instance."
"This is required to run the safety checks for the optimize method. "
"This usually happens, if your pipeline or algorithm or one of its parameters is based "
"on a dynamically defined class (e.g. a class defined within a function). "
"Try defining your classes on a module level. "
"If this is not possible for you, you need to disable the safety checks."
) from e
setattr(safe_wrapped, OPTIMIZE_METHOD_INDICATOR, True)
return cast(Callable[P, T], safe_wrapped)