Skip to content

Commit

Permalink
refactor: pipeline transformation structure
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrugman committed Nov 24, 2021
1 parent 8977b1e commit 2534cea
Show file tree
Hide file tree
Showing 31 changed files with 572 additions and 289 deletions.
22 changes: 9 additions & 13 deletions popmon/alerting/alerts_summary.py
Expand Up @@ -19,6 +19,7 @@


import fnmatch
from typing import Optional

import numpy as np
import pandas as pd
Expand All @@ -31,6 +32,8 @@ class AlertsSummary(Module):
It combines the alerts-summaries of all individual features into an artificial feature "_AGGREGATE_".
"""
_input_keys = ("read_key", )
_output_keys = ("store_key", )

def __init__(
self,
Expand All @@ -50,21 +53,16 @@ def __init__(
"""
super().__init__()
self.read_key = read_key
self.store_key = store_key
if not self.store_key:
self.store_key = self.read_key
self.store_key = store_key or self.read_key
self.features = features or []
self.ignore_features = ignore_features or []
self.combined_variable = combined_variable

def transform(self, datastore):
# fetch and check input data
data = self.get_datastore_object(datastore, self.read_key, dtype=dict)

def transform(self, data: dict) -> Optional[dict]:
# determine all possible features, used for the comparison below
features = self.get_features(data.keys())
features = self.get_features(list(data.keys()))
if len(features) == 0:
return datastore
return None

self.logger.info(
f'Combining alerts into artificial variable "{self.combined_variable}"'
Expand All @@ -88,7 +86,7 @@ def transform(self, datastore):
self.logger.warning(
"indices of features are different. no alerts summary generated."
)
return datastore
return None

# STEP 2: Concatenate the dataframes, there was one for each original feature.
tlv = pd.concat(df_list, axis=1)
Expand All @@ -104,6 +102,4 @@ def transform(self, datastore):

# store combination of traffic alerts
data[self.combined_variable] = dfc
datastore[self.store_key] = data

return datastore
return data
68 changes: 34 additions & 34 deletions popmon/alerting/compute_tl_bounds.py
Expand Up @@ -18,11 +18,10 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


import collections
import copy
import fnmatch
import uuid
from collections import defaultdict
from typing import Tuple, Any

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -117,6 +116,8 @@ class ComputeTLBounds(Module):
meant to be generic. Then bounds can be stored as either raw
values or as directly calculated values on the statistics of the data.
"""
_input_keys = ("read_key", )
_output_keys = ("store_key", "apply_funcs_key")

def __init__(
self,
Expand All @@ -133,7 +134,7 @@ def __init__(
entire=False,
**kwargs,
):
"""Initialize an instance of TafficLightBounds module.
"""Initialize an instance of TrafficLightBounds module.
:param str read_key: key of input data to read from datastore
:param str store_key: key of output data to store in datastore (optional)
Expand All @@ -152,23 +153,26 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
self.apply_funcs_key = apply_funcs_key

self.monitoring_rules = monitoring_rules or {}
self.features = features or []
self.ignore_features = ignore_features or []
self.traffic_lights = {}
self.traffic_light_funcs = []
self.apply_funcs_key = apply_funcs_key
self.traffic_light_func = func if func is not None else traffic_light
self.metrics_wide = metrics_wide
self.prefix = prefix
self.suffix = suffix
self.entire = entire
self.kwargs = copy.copy(kwargs)

# check inputs
if not isinstance(self.traffic_light_func, collections.Callable):
if not callable(self.traffic_light_func):
raise TypeError("supplied function must be callable object")

def get_description(self):
return self.traffic_light_func.__name__

def _set_traffic_lights(self, feature, cols, pattern, rule_name):
process_cols = fnmatch.filter(cols, pattern)

Expand All @@ -195,12 +199,9 @@ def _set_traffic_lights(self, feature, cols, pattern, rule_name):
}
)

def transform(self, datastore):
# fetch and check input data
test_data = self.get_datastore_object(datastore, self.read_key, dtype=dict)

def transform(self, test_data: dict) -> Tuple[Any, Any]:
# determine all possible features, used for the comparison below
features = self.get_features(test_data.keys())
features = self.get_features(list(test_data.keys()))

pkeys, nkeys = collect_traffic_light_bounds(self.monitoring_rules)

Expand All @@ -212,7 +213,9 @@ def transform(self, datastore):
# --- 1. tl bounds explicitly defined for a particular feature
if feature in pkeys:
explicit_cols = [
pcol for pcol in pkeys[feature] if pcol in test_df.columns
pcol
for pcol in pkeys[feature]
if pcol in test_df.columns
]
implicit_cols = set(pkeys[feature]) - set(explicit_cols)

Expand All @@ -237,13 +240,7 @@ def transform(self, datastore):
feature, test_df.columns, pattern, rule_name="pattern"
)

# storage
if self.store_key:
datastore[self.store_key] = self.traffic_lights
if self.apply_funcs_key:
datastore[self.apply_funcs_key] = self.traffic_light_funcs

return datastore
return self.traffic_lights, self.traffic_light_funcs


def pull_bounds(
Expand Down Expand Up @@ -338,7 +335,12 @@ class DynamicBounds(Pipeline):
"""Calculate dynamic traffic light bounds based on pull thresholds and dynamic mean and std.deviation."""

def __init__(
self, read_key, rules, store_key="", suffix_mean="_mean", suffix_std="_std"
self,
read_key,
rules,
store_key="",
suffix_mean="_mean",
suffix_std="_std",
):
"""Initialize an instance of DynamicTrafficLightBounds.
Expand All @@ -348,10 +350,8 @@ def __init__(
:param str suffix_mean: suffix of mean. mean column = metric + suffix_mean
:param str suffix_std: suffix of std. std column = metric + suffix_std
"""
super().__init__(modules=[])
self.read_key = read_key

apply_funcs_key = str(uuid.uuid4())
apply_funcs_key = f"{read_key}__{store_key}"

expand_bounds = ComputeTLBounds(
read_key=read_key,
Expand All @@ -368,8 +368,7 @@ def __init__(
assign_to_key=store_key,
apply_funcs_key=apply_funcs_key,
)

self.modules = [expand_bounds, calc_bounds]
super().__init__(modules=[expand_bounds, calc_bounds])

def transform(self, datastore):
self.logger.info(f'Calculating dynamic bounds for "{self.read_key}"')
Expand All @@ -380,7 +379,12 @@ class StaticBounds(Pipeline):
"""Calculate static traffic light bounds based on pull thresholds and static mean and std.deviation."""

def __init__(
self, read_key, rules, store_key="", suffix_mean="_mean", suffix_std="_std"
self,
read_key,
rules,
store_key="",
suffix_mean="_mean",
suffix_std="_std",
):
"""Initialize an instance of StaticBounds.
Expand All @@ -390,10 +394,8 @@ def __init__(
:param str suffix_mean: suffix of mean. mean column = metric + suffix_mean
:param str suffix_std: suffix of std. std column = metric + suffix_std
"""
super().__init__(modules=[])
self.read_key = read_key

apply_funcs_key = str(uuid.uuid4())
apply_funcs_key = f"{read_key}__{store_key}"

expand_bounds = ComputeTLBounds(
read_key=read_key,
Expand All @@ -411,7 +413,7 @@ def __init__(
apply_funcs_key=apply_funcs_key,
)

self.modules = [expand_bounds, calc_bounds]
super().__init__(modules=[expand_bounds, calc_bounds])

def transform(self, datastore):
self.logger.info(f'Calculating static bounds for "{self.read_key}"')
Expand All @@ -437,10 +439,8 @@ def __init__(self, read_key, store_key, rules, expanded_rules_key=""):
:param str expanded_rules_key: store key of expanded monitoring rules to store in data store,
eg. these can be used for plotting. (optional)
"""
super().__init__(modules=[])
self.read_key = read_key

apply_funcs_key = str(uuid.uuid4())
apply_funcs_key = f"{read_key}__{store_key}"

# generate static traffic light bounds by expanding the wildcarded monitoring rules
expand_bounds = ComputeTLBounds(
Expand All @@ -457,7 +457,7 @@ def __init__(self, read_key, store_key, rules, expanded_rules_key=""):
apply_funcs_key=apply_funcs_key,
)

self.modules = [expand_bounds, apply_bounds]
super().__init__(modules=[expand_bounds, apply_bounds])

def transform(self, datastore):
self.logger.info(f'Calculating traffic light alerts for "{self.read_key}"')
Expand Down
45 changes: 24 additions & 21 deletions popmon/analysis/apply_func.py
Expand Up @@ -19,6 +19,7 @@


import warnings
from typing import Optional

import numpy as np
import pandas as pd
Expand All @@ -32,6 +33,8 @@ class ApplyFunc(Module):
Extra parameters (kwargs) can be passed to the apply function.
"""
_input_keys = ("apply_to_key", "assign_to_key", "apply_funcs_key")
_output_keys = ("store_key", )

def __init__(
self,
Expand Down Expand Up @@ -67,9 +70,10 @@ def __init__(
"""
super().__init__()
self.apply_to_key = apply_to_key
self.assign_to_key = self.apply_to_key if not assign_to_key else assign_to_key
self.store_key = self.assign_to_key if not store_key else store_key
self.assign_to_key = assign_to_key or apply_to_key
self.apply_funcs_key = apply_funcs_key
self.store_key = store_key or self.assign_to_key

self.features = features or []
self.metrics = metrics or []
self.msg = msg
Expand All @@ -79,6 +83,14 @@ def __init__(
for af in apply_funcs:
self.add_apply_func(**af)

def get_description(self):
if len(self.apply_funcs) > 0:
return " and ".join([x['func'].__name__ for x in self.apply_funcs])
elif self.apply_funcs_key:
return f"functions from arg '{self.apply_funcs_key}'"
else:
raise NotImplementedError

def add_apply_func(
self,
func,
Expand Down Expand Up @@ -127,7 +139,7 @@ def add_apply_func(
}
)

def transform(self, datastore):
def transform(self, apply_to_data: dict, assign_to_data: Optional[dict] = None, apply_funcs: Optional[list] = None):
"""
Apply functions to specified feature and metrics
Expand All @@ -137,23 +149,17 @@ def transform(self, datastore):
:return: updated datastore
:rtype: dict
"""
if self.msg:
self.logger.info(self.msg)
assert isinstance(apply_to_data, dict)
if assign_to_data is None:
assign_to_data = {}

apply_to_data = self.get_datastore_object(
datastore, self.apply_to_key, dtype=dict
)
assign_to_data = self.get_datastore_object(
datastore, self.assign_to_key, dtype=dict, default={}
)

if self.apply_funcs_key:
apply_funcs = self.get_datastore_object(
datastore, self.apply_funcs_key, dtype=list
)
if apply_funcs is not None:
self.apply_funcs += apply_funcs

features = self.get_features(apply_to_data.keys())
if self.msg:
self.logger.info(self.msg)

features = self.get_features(list(apply_to_data.keys()))

same_key = self.assign_to_key == self.apply_to_key

Expand Down Expand Up @@ -181,10 +187,7 @@ def transform(self, datastore):
]
result = parallel(apply_func_array, args, mode="kwargs")
new_metrics = dict(result)

# storage
datastore[self.store_key] = new_metrics
return datastore
return new_metrics


def apply_func_array(
Expand Down

0 comments on commit 2534cea

Please sign in to comment.