diff --git a/popmon/alerting/alerts_summary.py b/popmon/alerting/alerts_summary.py
index 88a35343..870abcce 100644
--- a/popmon/alerting/alerts_summary.py
+++ b/popmon/alerting/alerts_summary.py
@@ -19,6 +19,7 @@
import fnmatch
+from typing import Optional
import numpy as np
import pandas as pd
@@ -31,6 +32,8 @@ class AlertsSummary(Module):
It combines the alerts-summaries of all individual features into an artificial feature "_AGGREGATE_".
"""
+ _input_keys = ("read_key", )
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -50,21 +53,16 @@ def __init__(
"""
super().__init__()
self.read_key = read_key
- self.store_key = store_key
- if not self.store_key:
- self.store_key = self.read_key
+ self.store_key = store_key or self.read_key
self.features = features or []
self.ignore_features = ignore_features or []
self.combined_variable = combined_variable
- def transform(self, datastore):
- # fetch and check input data
- data = self.get_datastore_object(datastore, self.read_key, dtype=dict)
-
+ def transform(self, data: dict) -> Optional[dict]:
# determine all possible features, used for the comparison below
- features = self.get_features(data.keys())
+ features = self.get_features(list(data.keys()))
if len(features) == 0:
- return datastore
+ return None
self.logger.info(
f'Combining alerts into artificial variable "{self.combined_variable}"'
@@ -88,7 +86,7 @@ def transform(self, datastore):
self.logger.warning(
"indices of features are different. no alerts summary generated."
)
- return datastore
+ return None
# STEP 2: Concatenate the dataframes, there was one for each original feature.
tlv = pd.concat(df_list, axis=1)
@@ -104,6 +102,4 @@ def transform(self, datastore):
# store combination of traffic alerts
data[self.combined_variable] = dfc
- datastore[self.store_key] = data
-
- return datastore
+ return data
diff --git a/popmon/alerting/compute_tl_bounds.py b/popmon/alerting/compute_tl_bounds.py
index ef7fb5bb..e8c77480 100644
--- a/popmon/alerting/compute_tl_bounds.py
+++ b/popmon/alerting/compute_tl_bounds.py
@@ -18,11 +18,10 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-import collections
import copy
import fnmatch
-import uuid
from collections import defaultdict
+from typing import Tuple, Any
import numpy as np
import pandas as pd
@@ -117,6 +116,8 @@ class ComputeTLBounds(Module):
meant to be generic. Then bounds can be stored as either raw
values or as directly calculated values on the statistics of the data.
"""
+ _input_keys = ("read_key", )
+ _output_keys = ("store_key", "apply_funcs_key")
def __init__(
self,
@@ -133,7 +134,7 @@ def __init__(
entire=False,
**kwargs,
):
- """Initialize an instance of TafficLightBounds module.
+ """Initialize an instance of TrafficLightBounds module.
:param str read_key: key of input data to read from datastore
:param str store_key: key of output data to store in datastore (optional)
@@ -152,12 +153,13 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
+ self.apply_funcs_key = apply_funcs_key
+
self.monitoring_rules = monitoring_rules or {}
self.features = features or []
self.ignore_features = ignore_features or []
self.traffic_lights = {}
self.traffic_light_funcs = []
- self.apply_funcs_key = apply_funcs_key
self.traffic_light_func = func if func is not None else traffic_light
self.metrics_wide = metrics_wide
self.prefix = prefix
@@ -165,10 +167,12 @@ def __init__(
self.entire = entire
self.kwargs = copy.copy(kwargs)
- # check inputs
- if not isinstance(self.traffic_light_func, collections.Callable):
+ if not callable(self.traffic_light_func):
raise TypeError("supplied function must be callable object")
+ def get_description(self):
+ return self.traffic_light_func.__name__
+
def _set_traffic_lights(self, feature, cols, pattern, rule_name):
process_cols = fnmatch.filter(cols, pattern)
@@ -195,12 +199,9 @@ def _set_traffic_lights(self, feature, cols, pattern, rule_name):
}
)
- def transform(self, datastore):
- # fetch and check input data
- test_data = self.get_datastore_object(datastore, self.read_key, dtype=dict)
-
+ def transform(self, test_data: dict) -> Tuple[Any, Any]:
# determine all possible features, used for the comparison below
- features = self.get_features(test_data.keys())
+ features = self.get_features(list(test_data.keys()))
pkeys, nkeys = collect_traffic_light_bounds(self.monitoring_rules)
@@ -212,7 +213,9 @@ def transform(self, datastore):
# --- 1. tl bounds explicitly defined for a particular feature
if feature in pkeys:
explicit_cols = [
- pcol for pcol in pkeys[feature] if pcol in test_df.columns
+ pcol
+ for pcol in pkeys[feature]
+ if pcol in test_df.columns
]
implicit_cols = set(pkeys[feature]) - set(explicit_cols)
@@ -237,13 +240,7 @@ def transform(self, datastore):
feature, test_df.columns, pattern, rule_name="pattern"
)
- # storage
- if self.store_key:
- datastore[self.store_key] = self.traffic_lights
- if self.apply_funcs_key:
- datastore[self.apply_funcs_key] = self.traffic_light_funcs
-
- return datastore
+ return self.traffic_lights, self.traffic_light_funcs
def pull_bounds(
@@ -338,7 +335,12 @@ class DynamicBounds(Pipeline):
"""Calculate dynamic traffic light bounds based on pull thresholds and dynamic mean and std.deviation."""
def __init__(
- self, read_key, rules, store_key="", suffix_mean="_mean", suffix_std="_std"
+ self,
+ read_key,
+ rules,
+ store_key="",
+ suffix_mean="_mean",
+ suffix_std="_std",
):
"""Initialize an instance of DynamicTrafficLightBounds.
@@ -348,10 +350,8 @@ def __init__(
:param str suffix_mean: suffix of mean. mean column = metric + suffix_mean
:param str suffix_std: suffix of std. std column = metric + suffix_std
"""
- super().__init__(modules=[])
self.read_key = read_key
-
- apply_funcs_key = str(uuid.uuid4())
+ apply_funcs_key = f"{read_key}__{store_key}"
expand_bounds = ComputeTLBounds(
read_key=read_key,
@@ -368,8 +368,7 @@ def __init__(
assign_to_key=store_key,
apply_funcs_key=apply_funcs_key,
)
-
- self.modules = [expand_bounds, calc_bounds]
+ super().__init__(modules=[expand_bounds, calc_bounds])
def transform(self, datastore):
self.logger.info(f'Calculating dynamic bounds for "{self.read_key}"')
@@ -380,7 +379,12 @@ class StaticBounds(Pipeline):
"""Calculate static traffic light bounds based on pull thresholds and static mean and std.deviation."""
def __init__(
- self, read_key, rules, store_key="", suffix_mean="_mean", suffix_std="_std"
+ self,
+ read_key,
+ rules,
+ store_key="",
+ suffix_mean="_mean",
+ suffix_std="_std",
):
"""Initialize an instance of StaticBounds.
@@ -390,10 +394,8 @@ def __init__(
:param str suffix_mean: suffix of mean. mean column = metric + suffix_mean
:param str suffix_std: suffix of std. std column = metric + suffix_std
"""
- super().__init__(modules=[])
self.read_key = read_key
-
- apply_funcs_key = str(uuid.uuid4())
+ apply_funcs_key = f"{read_key}__{store_key}"
expand_bounds = ComputeTLBounds(
read_key=read_key,
@@ -411,7 +413,7 @@ def __init__(
apply_funcs_key=apply_funcs_key,
)
- self.modules = [expand_bounds, calc_bounds]
+ super().__init__(modules=[expand_bounds, calc_bounds])
def transform(self, datastore):
self.logger.info(f'Calculating static bounds for "{self.read_key}"')
@@ -437,10 +439,8 @@ def __init__(self, read_key, store_key, rules, expanded_rules_key=""):
:param str expanded_rules_key: store key of expanded monitoring rules to store in data store,
eg. these can be used for plotting. (optional)
"""
- super().__init__(modules=[])
self.read_key = read_key
-
- apply_funcs_key = str(uuid.uuid4())
+ apply_funcs_key = f"{read_key}__{store_key}"
# generate static traffic light bounds by expanding the wildcarded monitoring rules
expand_bounds = ComputeTLBounds(
@@ -457,7 +457,7 @@ def __init__(self, read_key, store_key, rules, expanded_rules_key=""):
apply_funcs_key=apply_funcs_key,
)
- self.modules = [expand_bounds, apply_bounds]
+ super().__init__(modules=[expand_bounds, apply_bounds])
def transform(self, datastore):
self.logger.info(f'Calculating traffic light alerts for "{self.read_key}"')
diff --git a/popmon/analysis/apply_func.py b/popmon/analysis/apply_func.py
index 0ed7ae4e..617fc669 100644
--- a/popmon/analysis/apply_func.py
+++ b/popmon/analysis/apply_func.py
@@ -19,6 +19,7 @@
import warnings
+from typing import Optional
import numpy as np
import pandas as pd
@@ -32,6 +33,8 @@ class ApplyFunc(Module):
Extra parameters (kwargs) can be passed to the apply function.
"""
+ _input_keys = ("apply_to_key", "assign_to_key", "apply_funcs_key")
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -67,9 +70,10 @@ def __init__(
"""
super().__init__()
self.apply_to_key = apply_to_key
- self.assign_to_key = self.apply_to_key if not assign_to_key else assign_to_key
- self.store_key = self.assign_to_key if not store_key else store_key
+ self.assign_to_key = assign_to_key or apply_to_key
self.apply_funcs_key = apply_funcs_key
+ self.store_key = store_key or self.assign_to_key
+
self.features = features or []
self.metrics = metrics or []
self.msg = msg
@@ -79,6 +83,14 @@ def __init__(
for af in apply_funcs:
self.add_apply_func(**af)
+ def get_description(self):
+ if len(self.apply_funcs) > 0:
+ return " and ".join([x['func'].__name__ for x in self.apply_funcs])
+ elif self.apply_funcs_key:
+ return f"functions from arg '{self.apply_funcs_key}'"
+ else:
+ raise NotImplementedError
+
def add_apply_func(
self,
func,
@@ -127,7 +139,7 @@ def add_apply_func(
}
)
- def transform(self, datastore):
+ def transform(self, apply_to_data: dict, assign_to_data: Optional[dict] = None, apply_funcs: Optional[list] = None):
"""
Apply functions to specified feature and metrics
@@ -137,23 +149,17 @@ def transform(self, datastore):
:return: updated datastore
:rtype: dict
"""
- if self.msg:
- self.logger.info(self.msg)
+ assert isinstance(apply_to_data, dict)
+ if assign_to_data is None:
+ assign_to_data = {}
- apply_to_data = self.get_datastore_object(
- datastore, self.apply_to_key, dtype=dict
- )
- assign_to_data = self.get_datastore_object(
- datastore, self.assign_to_key, dtype=dict, default={}
- )
-
- if self.apply_funcs_key:
- apply_funcs = self.get_datastore_object(
- datastore, self.apply_funcs_key, dtype=list
- )
+ if apply_funcs is not None:
self.apply_funcs += apply_funcs
- features = self.get_features(apply_to_data.keys())
+ if self.msg:
+ self.logger.info(self.msg)
+
+ features = self.get_features(list(apply_to_data.keys()))
same_key = self.assign_to_key == self.apply_to_key
@@ -181,10 +187,7 @@ def transform(self, datastore):
]
result = parallel(apply_func_array, args, mode="kwargs")
new_metrics = dict(result)
-
- # storage
- datastore[self.store_key] = new_metrics
- return datastore
+ return new_metrics
def apply_func_array(
diff --git a/popmon/analysis/comparison/hist_comparer.py b/popmon/analysis/comparison/hist_comparer.py
index abdbc6ef..9f51fb36 100644
--- a/popmon/analysis/comparison/hist_comparer.py
+++ b/popmon/analysis/comparison/hist_comparer.py
@@ -162,13 +162,14 @@ def __init__(
:param args: (tuple, optional): residual args passed on to func_mean and func_std
:param kwargs: (dict, optional): residual kwargs passed on to func_mean and func_std
"""
- super().__init__(modules=[])
-
if assign_to_key is None:
assign_to_key = read_key
# make reference histogram(s)
- hist_collector = ApplyFunc(apply_to_key=read_key, assign_to_key=assign_to_key)
+ hist_collector = ApplyFunc(
+ apply_to_key=read_key,
+ assign_to_key=assign_to_key,
+ )
hist_collector.add_apply_func(
func=func_hist_collector, entire=True, suffix=suffix, *args, **kwargs
)
@@ -187,7 +188,8 @@ def __init__(
}
],
)
- self.modules = [hist_collector, hist_comparer]
+
+ super().__init__(modules=[hist_collector, hist_comparer])
class RollingHistComparer(HistComparer):
@@ -374,15 +376,20 @@ def __init__(
:param args: (tuple, optional): residual args passed on to func_hist_collector
:param kwargs: (dict, optional): residual kwargs passed on to func_hist_collector
"""
- super().__init__(modules=[])
-
if assign_to_key is None:
assign_to_key = read_key
# make reference histogram(s)
- hist_collector = ApplyFunc(apply_to_key=read_key, assign_to_key=assign_to_key)
+ hist_collector = ApplyFunc(
+ apply_to_key=read_key,
+ assign_to_key=assign_to_key
+ )
hist_collector.add_apply_func(
- func=func_hist_collector, hist_name=hist_col, suffix="", *args, **kwargs
+ func=func_hist_collector,
+ hist_name=hist_col,
+ suffix="",
+ *args,
+ **kwargs
)
# do histogram comparison
@@ -399,7 +406,7 @@ def __init__(
],
)
- self.modules = [hist_collector, hist_comparer]
+ super().__init__(modules=[hist_collector, hist_comparer])
class RollingNormHistComparer(NormHistComparer):
diff --git a/popmon/analysis/functions.py b/popmon/analysis/functions.py
index 75a2938b..bc7054e6 100644
--- a/popmon/analysis/functions.py
+++ b/popmon/analysis/functions.py
@@ -46,7 +46,7 @@ def pull(row, suffix_mean="_mean", suffix_std="_std", cols=None):
"""
x = pd.Series()
if cols is None or len(cols) == 0:
- # if no columns are given, find colums for which pulls can be calculated.
+ # if no columns are given, find columns for which pulls can be calculated.
# e.g. to calculate x_pull, need to have [x, x_mean, x_std] present. If so, put x in cols.
cols = []
for m in row.index.to_list()[:]:
diff --git a/popmon/analysis/merge_statistics.py b/popmon/analysis/merge_statistics.py
index 188158b3..3d6eb3be 100644
--- a/popmon/analysis/merge_statistics.py
+++ b/popmon/analysis/merge_statistics.py
@@ -18,6 +18,8 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+from typing import List
+
import pandas as pd
from ..base import Module
@@ -25,22 +27,20 @@
class MergeStatistics(Module):
"""Merging dictionaries of features containing dataframes with statistics as its values."""
+ _input_keys = ("read_keys", )
+ _output_keys = ("store_key", )
- def __init__(self, read_keys, store_key):
+ def __init__(self, read_keys: List[str], store_key: str):
"""Initialize an instance of MergeStatistics.
- :param str read_keys: list of keys of input data to read from the datastore
+ :param list read_keys: list of keys of input data to read from the datastore
:param str store_key: key of output data to store in the datastore
"""
super().__init__()
self.read_keys = read_keys
self.store_key = store_key
- def transform(self, datastore):
- dicts = [
- self.get_datastore_object(datastore, read_key, dtype=dict)
- for read_key in self.read_keys
- ]
+ def transform(self, dicts: list):
merged_stats = {}
for dict_ in dicts:
for feature in dict_.keys():
@@ -53,5 +53,4 @@ def transform(self, datastore):
)
else:
merged_stats[feature] = dict_[feature]
- datastore[self.store_key] = merged_stats
- return datastore
+ return merged_stats
diff --git a/popmon/analysis/profiling/hist_profiler.py b/popmon/analysis/profiling/hist_profiler.py
index afd5862a..45571ac8 100644
--- a/popmon/analysis/profiling/hist_profiler.py
+++ b/popmon/analysis/profiling/hist_profiler.py
@@ -57,6 +57,8 @@ class HistProfiler(Module):
:param str index_col: key for index in split dictionary
:param dict stats_functions: function_name, function(bin_labels, bin_counts) dictionary
"""
+ _input_keys = ("read_key", )
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -72,12 +74,12 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
+
self.features = features or []
self.ignore_features = ignore_features or []
self.var_timestamp = var_timestamp or []
self.hist_col = hist_col
self.index_col = index_col
-
self.general_stats_1d = [
"count",
"filled",
@@ -89,7 +91,6 @@ def __init__(
]
self.general_stats_2d = ["count", "phik"]
self.category_stats_1d = ["fraction_true"]
-
self.stats_functions = stats_functions
if self.stats_functions is None:
self.stats_functions = DEFAULT_STATS
@@ -222,15 +223,13 @@ def _profile_hist(self, split, hist_name):
return profile_list
- def transform(self, datastore):
+ def transform(self, data: dict) -> dict:
self.logger.info(
f'Profiling histograms "{self.read_key}" as "{self.store_key}"'
)
- data = self.get_datastore_object(datastore, self.read_key, dtype=dict)
- profiled = {}
-
- features = self.get_features(data.keys())
+ features = self.get_features(list(data.keys()))
+ profiled = {}
for feature in features[:]:
df = self.get_datastore_object(data, feature, dtype=pd.DataFrame)
hist_split_list = df.reset_index().to_dict("records")
@@ -242,5 +241,4 @@ def transform(self, datastore):
[self.index_col]
)
- datastore[self.store_key] = profiled
- return datastore
+ return profiled
diff --git a/popmon/analysis/profiling/pull_calculator.py b/popmon/analysis/profiling/pull_calculator.py
index 17936872..3e266545 100644
--- a/popmon/analysis/profiling/pull_calculator.py
+++ b/popmon/analysis/profiling/pull_calculator.py
@@ -131,8 +131,11 @@ def __init__(
:param args: (tuple, optional): residual args passed on to mean and std functions
:param kwargs: (dict, optional): residual kwargs passed on to mean and std functions
"""
- kws = {"window": window, "shift": shift}
- kws.update(kwargs)
+ kws = {
+ "window": window,
+ "shift": shift,
+ **kwargs
+ }
super().__init__(
rolling_mean,
rolling_std,
@@ -183,8 +186,10 @@ def __init__(
:param args: (tuple, optional): residual args passed on to mean and std functions
:param kwargs: (dict, optional): residual kwargs passed on to mean and std functions
"""
- kws = {"shift": shift}
- kws.update(kwargs)
+ kws = {
+ "shift": shift,
+ **kwargs
+ }
super().__init__(
expanding_mean,
expanding_std,
diff --git a/popmon/base/module.py b/popmon/base/module.py
index 150db6b6..56b5f33c 100644
--- a/popmon/base/module.py
+++ b/popmon/base/module.py
@@ -19,10 +19,13 @@
import logging
+from abc import ABC, abstractmethod
-class Module:
+class Module(ABC):
"""Base class used for modules in a pipeline."""
+ _input_keys = None
+ _output_keys = None
def __init__(self):
"""Module initialization"""
@@ -31,6 +34,26 @@ def __init__(self):
self.feature_begins_with = []
self.ignore_features = []
+ def get_inputs(self):
+ in_keys = {}
+ for x in self._input_keys:
+ in_key = self.__dict__[x]
+ if in_key != "" and in_key is not None and in_key not in in_keys:
+ in_keys[x] = in_key
+ return in_keys
+
+ def get_outputs(self):
+ out_keys = {}
+ for x in self._output_keys:
+ out_key = self.__dict__[x]
+ if out_key != "" and out_key is not None and out_key not in out_keys:
+ out_keys[x] = out_key
+ return out_keys
+
+ # @abstractmethod
+ def get_description(self):
+ return ""
+
def set_logger(self, logger):
"""Set logger of module
@@ -38,7 +61,8 @@ def set_logger(self, logger):
"""
self.logger = logger
- def get_datastore_object(self, datastore, feature, dtype, default=None):
+ @staticmethod
+ def get_datastore_object(datastore, feature, dtype, default=None):
"""Get object from datastore.
Bit more advanced than dict.get()
@@ -49,17 +73,19 @@ def get_datastore_object(self, datastore, feature, dtype, default=None):
:param obj default: object to default to in case key not found.
:return: retrieved object
"""
- obj = datastore.get(feature)
- if obj is None:
- if default is not None:
- obj = default
- else:
+ if default is not None:
+ obj = datastore.get(feature, default)
+ else:
+ try:
+ obj = datastore[feature]
+ except KeyError:
raise ValueError(f"`{feature}` not found in the datastore!")
+
if not isinstance(obj, dtype):
raise TypeError(f"obj `{feature}` is not an instance of `{dtype}`!")
return obj
- def get_features(self, all_features):
+ def get_features(self, all_features: list) -> list:
"""Get all features that meet feature_begins_with and ignore_features requirements
:param list all_features: input features list
@@ -67,25 +93,65 @@ def get_features(self, all_features):
:rtype: list
"""
all_features = sorted(all_features)
- features = self.features
- if not self.features:
- features = all_features
+ features = self.features or all_features
+
if self.feature_begins_with:
features = [k for k in features if k.startswith(self.feature_begins_with)]
if self.ignore_features:
features = [k for k in features if k not in self.ignore_features]
features_not_in_input = [
- feature for feature in features if feature not in all_features
+ feature
+ for feature in features
+ if feature not in all_features
]
- features = [feature for feature in features if feature in all_features]
-
for feature in features_not_in_input:
self.logger.warning(f'Feature "{feature}" not in input data; skipping.')
+ features = [
+ feature
+ for feature in features
+ if feature in all_features
+ ]
return features
- def transform(self, datastore):
+ def _transform(self, datastore):
+ """Transformation helper function"""
+
+ inputs = {}
+ self.logger.debug(f"load from: {type(self)}")
+ for key in self._input_keys:
+ key_value = self.__dict__[key]
+ if key_value and len(key_value) > 0:
+ if isinstance(key_value, list):
+ inputs[key] = [datastore.get(k) for k in key_value]
+ else:
+ inputs[key] = datastore.get(key_value)
+ else:
+ inputs[key] = None
+
+ self.logger.debug(f"load(key={key}, key_value={key_value}, value={str(inputs[key]):.100s})")
+
+ # cache datastore
+ self._datastore = datastore
+
+ # transformation
+ outputs = self.transform(*list(inputs.values()))
+
+ # transform returns None if no update needs to be made
+ if outputs is not None:
+ if len(self._output_keys) == 1:
+ outputs = (outputs,)
+
+ for k, v in zip(self._output_keys, outputs):
+ key_value = self.__dict__[k]
+ self.logger.debug(f"store(key={k}, key_value={key_value}, value={str(v):.100s})")
+ if key_value and len(key_value) > 0: # and v is not None:
+ datastore[key_value] = v
+
+ return datastore
+
+ def transform(self, *args):
"""Central function of the module.
Typically transform() takes something from the datastore, does something to it, and puts the results
@@ -95,4 +161,4 @@ def transform(self, datastore):
:return: updated output datastore
:rtype: dict
"""
- return datastore
+ raise NotImplementedError
diff --git a/popmon/base/pipeline.py b/popmon/base/pipeline.py
index 31a83afe..3995235a 100644
--- a/popmon/base/pipeline.py
+++ b/popmon/base/pipeline.py
@@ -17,13 +17,14 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
+import json
import logging
+from pathlib import Path
from ..base import Module
-class Pipeline(Module):
+class Pipeline:
"""Base class used for to run modules in a pipeline."""
def __init__(self, modules, logger=None):
@@ -32,7 +33,6 @@ def __init__(self, modules, logger=None):
:param list modules: modules of the pipeline.
:param logger: logger to be used by each module.
"""
- super().__init__()
self.modules = modules
self.set_logger(logger)
@@ -68,5 +68,76 @@ def transform(self, datastore):
"""
for module in self.modules:
- datastore = module.transform(datastore)
+ self.logger.debug(f"transform {module.__class__.__name__}")
+ if isinstance(module, Pipeline):
+ datastore = module.transform(datastore)
+ else:
+ datastore = module._transform(datastore)
return datastore
+
+ def visualize(self, versioned=True, funcs=None, dsets=None):
+ if dsets is None:
+ dsets = {}
+ if funcs is None:
+ funcs = {}
+
+ modules = []
+ for module in self.modules:
+ name = module.__class__.__name__
+ if isinstance(module, Pipeline):
+ modules.append(
+ module.visualize(versioned, funcs, dsets)
+ )
+ else:
+ in_keys = module.get_inputs()
+
+ if versioned:
+ new_ins = {}
+ for k, in_key in in_keys.items():
+ if in_key not in dsets:
+ dsets[in_key] = 1
+ in_key += f" (v{dsets[in_key]})"
+ new_ins[k] = in_key
+ in_keys = new_ins
+
+ out_keys = module.get_outputs()
+ if versioned:
+ new_outs = {}
+ for k, out_key in out_keys.items():
+ if out_key in dsets:
+ dsets[out_key] += 1
+ else:
+ dsets[out_key] = 1
+ out_key += f" (v{dsets[out_key]})"
+ new_outs[k] = out_key
+ out_keys = new_outs
+
+ self.logger.debug(f"{name}(inputs={in_keys}, outputs={out_keys})")
+
+ # add unique id
+ if name not in funcs:
+ funcs[name] = {}
+ if id(module) not in funcs[name]:
+ funcs[name][id(module)] = len(funcs[name]) + 1
+
+ modules.append(
+ {
+ 'type': 'module',
+ 'name': f'{name}',
+ 'i': f'{funcs[name][id(module)]}',
+ 'desc': module.get_description(),
+ 'in': in_keys,
+ 'out': out_keys
+ }
+ )
+ data = {
+ 'type': 'subgraph',
+ 'name': self.__class__.__name__,
+ 'modules': modules
+ }
+ return data
+
+ def to_json(self, file_name, versioned=True):
+ d = self.visualize(versioned=versioned)
+ data = json.dumps(d, indent=4, sort_keys=True)
+ Path(file_name).write_text(data)
diff --git a/popmon/hist/hist_splitter.py b/popmon/hist/hist_splitter.py
index 4d11260e..43163414 100644
--- a/popmon/hist/hist_splitter.py
+++ b/popmon/hist/hist_splitter.py
@@ -37,6 +37,9 @@ class HistSplitter(Module):
where time is the index and each row is a x:y histogram.
"""
+ _input_keys = ("read_key", )
+ _output_keys = ("store_key", )
+
def __init__(
self,
read_key,
@@ -70,6 +73,7 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
+
self.features = features or []
self.ignore_features = ignore_features or []
self.feature_begins_with = feature_begins_with
@@ -86,6 +90,9 @@ def __init__(
"flatten_output requires short_keys attribute to be False."
)
+ def get_description(self):
+ return ""
+
def update_divided(self, divided, split, yname):
if self.flatten_output:
divided.update(split)
@@ -95,18 +102,16 @@ def update_divided(self, divided, split, yname):
]
return divided
- def transform(self, datastore):
- divided = {}
-
+ def transform(self, data: dict) -> dict:
self.logger.info(
f'Splitting histograms "{self.read_key}" as "{self.store_key}"'
)
- data = self.get_datastore_object(datastore, self.read_key, dtype=dict)
# determine all possible features, used for comparison below
- features = self.get_features(data.keys())
+ features = self.get_features(list(data.keys()))
# if so requested split selected histograms along first axis, and then divide
+ divided = {}
for feature in features[:]:
self.logger.debug(f'Now splitting histogram "{feature}"')
hist = get_histogram(data[feature])
@@ -147,9 +152,8 @@ def transform(self, datastore):
self.update_divided(divided=divided, split=split, yname=yname)
# turn divided dicts into dataframes with index
- keys = list(divided.keys())
- for k in keys:
- divided[k] = pd.DataFrame(divided.pop(k)).set_index(self.index_col)
-
- datastore[self.store_key] = divided
- return datastore
+ divided = {
+ k: pd.DataFrame(v).set_index(self.index_col)
+ for k, v in divided.items()
+ }
+ return divided
diff --git a/popmon/io/file_reader.py b/popmon/io/file_reader.py
index 06c5e0f6..929bec0b 100644
--- a/popmon/io/file_reader.py
+++ b/popmon/io/file_reader.py
@@ -28,6 +28,9 @@
class FileReader(Module):
"""Module to read contents from a file, transform the contents with a function and write them to the datastore."""
+ _input_keys = tuple()
+ _output_keys = ("store_key", )
+
def __init__(
self,
store_key: str,
@@ -45,9 +48,7 @@ def __init__(
super().__init__()
if not isinstance(file_path, (str, Path)):
raise TypeError("file_path should be of type `str` or `pathlib.Path`")
- if apply_func is not None and not isinstance(
- apply_func, collections.abc.Callable
- ):
+ if apply_func is not None and not callable(apply_func):
raise TypeError("transformation function must be a callable object")
self.store_key = store_key
@@ -55,7 +56,10 @@ def __init__(
self.apply_func = apply_func
self.kwargs = kwargs
- def transform(self, datastore):
+ def get_description(self):
+ return self.file_path
+
+ def transform(self):
with open(self.file_path) as file:
data = file.read()
@@ -68,5 +72,4 @@ def transform(self, datastore):
)
# store the transformed/original contents
- datastore[self.store_key] = data
- return datastore
+ return data
diff --git a/popmon/io/file_writer.py b/popmon/io/file_writer.py
index 2408b032..800729c2 100644
--- a/popmon/io/file_writer.py
+++ b/popmon/io/file_writer.py
@@ -28,6 +28,8 @@
class FileWriter(Module):
"""Module transforms specific datastore content and writes it to a file."""
+ _input_keys = ("read_key", )
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -48,18 +50,20 @@ def __init__(
super().__init__()
if file_path is not None and not isinstance(file_path, (str, Path)):
raise TypeError("file_path should be of type `str` or `pathlib.Path`")
- if apply_func is not None and not isinstance(
- apply_func, collections.abc.Callable
- ):
+ if apply_func is not None and not callable(apply_func):
raise TypeError("transformation function must be a callable object")
self.read_key = read_key
- self.store_key = store_key
+ self.store_key = store_key or read_key
+
self.file_path = file_path
self.apply_func = apply_func
self.kwargs = kwargs
- def transform(self, datastore):
- data = copy.deepcopy(datastore[self.read_key])
+ def get_description(self):
+ return self.file_path
+
+ def transform(self, data):
+ data = copy.deepcopy(data)
# if a transformation function is provided, transform the data
if self.apply_func is not None:
@@ -67,14 +71,11 @@ def transform(self, datastore):
# if file path is provided, write data to a file. Otherwise, write data into the datastore
if self.file_path is None:
- datastore[
- self.read_key if self.store_key is None else self.store_key
- ] = data
+ return data
else:
with open(self.file_path, "w+") as file:
file.write(data)
self.logger.info(
f'Object "{self.read_key}" written to file "{self.file_path}".'
)
-
- return datastore
+ return None
diff --git a/popmon/io/json_reader.py b/popmon/io/json_reader.py
index 6fe4f7f2..aaf0c492 100644
--- a/popmon/io/json_reader.py
+++ b/popmon/io/json_reader.py
@@ -36,5 +36,5 @@ def __init__(self, file_path: Union[str, Path], store_key: str):
"""
super().__init__(store_key, file_path, apply_func=json.loads)
- def transform(self, datastore):
- return super().transform(datastore)
+ def transform(self, *args):
+ return super().transform(*args)
diff --git a/popmon/pipeline/metrics_pipelines.py b/popmon/pipeline/metrics_pipelines.py
index 3de19b23..ba0bff9d 100644
--- a/popmon/pipeline/metrics_pipelines.py
+++ b/popmon/pipeline/metrics_pipelines.py
@@ -382,7 +382,13 @@ def metrics_rolling_reference(
),
ApplyFunc(
apply_to_key="traffic_lights",
- apply_funcs=[{"func": traffic_light_summary, "axis": 1, "suffix": ""}],
+ apply_funcs=[
+ {
+ "func": traffic_light_summary,
+ "axis": 1,
+ "suffix": ""
+ }
+ ],
assign_to_key="alerts",
msg="Generating traffic light alerts summary.",
),
diff --git a/popmon/pipeline/report.py b/popmon/pipeline/report.py
index 9329924b..7b31a9bf 100644
--- a/popmon/pipeline/report.py
+++ b/popmon/pipeline/report.py
@@ -27,7 +27,7 @@
make_histograms,
)
-from ..base import Module
+from ..base import Module, Pipeline
from ..config import config
from ..pipeline.report_pipelines import (
ReportPipe,
@@ -425,6 +425,8 @@ class StabilityReport(Module):
after running the pipeline and generating the report. Report can be represented
as a HTML string, HTML file or Jupyter notebook's cell output.
"""
+ _input_keys = ("read_key", )
+ _output_keys = tuple()
def __init__(self, read_key="html_report"):
"""Initialize an instance of StabilityReport.
diff --git a/popmon/pipeline/report_pipelines.py b/popmon/pipeline/report_pipelines.py
index 2b66c8ac..ad71727f 100644
--- a/popmon/pipeline/report_pipelines.py
+++ b/popmon/pipeline/report_pipelines.py
@@ -90,6 +90,8 @@ def self_reference(
]
pipeline = Pipeline(modules)
+ # pipeline.to_json("pipeline_self_reference_versioned.json", versioned=True)
+ # pipeline.to_json("pipeline_self_reference_unversioned.json", versioned=False)
return pipeline
diff --git a/popmon/stitching/hist_stitcher.py b/popmon/stitching/hist_stitcher.py
index 7843842e..8b482682 100644
--- a/popmon/stitching/hist_stitcher.py
+++ b/popmon/stitching/hist_stitcher.py
@@ -28,6 +28,9 @@
class HistStitcher(Module):
"""Module stitches histograms by date"""
+ _input_keys = ("read_key", "delta_key")
+ _output_keys = ("store_key", )
+
def __init__(
self,
mode="add",
@@ -51,28 +54,25 @@ def __init__(
(only required when calling transform(datastore) as module)
"""
super().__init__()
- self.mode = mode
- self.time_axis = time_axis
- self.time_bin_idx = time_bin_idx
self.read_key = read_key
self.delta_key = delta_key
self.store_key = store_key
+ self.mode = mode
+ self.time_axis = time_axis
+ self.time_bin_idx = time_bin_idx
self.allowed_modes = ["add", "replace"]
- assert self.mode in self.allowed_modes
+ if self.mode not in self.allowed_modes:
+ raise ValueError("mode should be either 'add' or 'replace'")
+
+ def get_description(self):
+ return f"{self.mode}"
- def transform(self, datastore):
- # --- get input dict lists
+ def transform(self, hists_basis: dict, hists_delta: dict) -> dict:
self.logger.info(
f'Stitching histograms "{self.read_key}" and "{self.delta_key}" as "{self.store_key}"'
)
-
- hists_basis = self.get_datastore_object(datastore, self.read_key, dtype=dict)
- hists_delta = self.get_datastore_object(datastore, self.delta_key, dtype=dict)
-
stitched = self.stitch_histograms(self.mode, hists_basis, hists_delta)
-
- datastore[self.store_key] = stitched
- return datastore
+ return stitched
def stitch_histograms(
self,
diff --git a/popmon/visualization/alert_section_generator.py b/popmon/visualization/alert_section_generator.py
index 23cc5a58..ca5712da 100644
--- a/popmon/visualization/alert_section_generator.py
+++ b/popmon/visualization/alert_section_generator.py
@@ -18,6 +18,8 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+from typing import Optional
+
import numpy as np
import pandas as pd
from tqdm import tqdm
@@ -34,6 +36,8 @@ class AlertSectionGenerator(Module):
combines all the plots into a list which is stored together with the section name in a dictionary
which later will be used for the report generation.
"""
+ _input_keys = ("read_key", "static_bounds", "dynamic_bounds", "store_key")
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -76,14 +80,15 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
+ self.dynamic_bounds = dynamic_bounds
+ self.static_bounds = static_bounds
+
self.features = features or []
self.ignore_features = ignore_features or []
self.section_name = section_name
self.last_n = last_n
self.skip_first_n = skip_first_n
self.skip_last_n = skip_last_n
- self.dynamic_bounds = dynamic_bounds
- self.static_bounds = static_bounds
self.prefix = prefix
self.suffices = suffices
self.ignore_stat_endswith = ignore_stat_endswith or []
@@ -93,17 +98,28 @@ def __init__(
self.plot_overview = True
self.plot_metrics = False
- def transform(self, datastore):
- data_obj = self.get_datastore_object(datastore, self.read_key, dtype=dict)
-
- static_bounds = self.get_datastore_object(
- datastore, self.static_bounds, dtype=dict, default={}
- )
- dynamic_bounds = self.get_datastore_object(
- datastore, self.dynamic_bounds, dtype=dict, default={}
- )
+ def get_description(self):
+ return self.section_name
- features = self.get_features(data_obj.keys())
+ def transform(
+ self,
+ data_obj: dict,
+ static_bounds: Optional[dict] = None,
+ dynamic_bounds: Optional[dict] = None,
+ sections: Optional[list] = None
+ ):
+ assert isinstance(data_obj, dict)
+ if static_bounds is None:
+ static_bounds = {}
+ assert isinstance(static_bounds, dict)
+ if dynamic_bounds is None:
+ dynamic_bounds = {}
+ assert isinstance(dynamic_bounds, dict)
+ if sections is None:
+ sections = []
+ assert isinstance(sections, list)
+
+ features = self.get_features(list(data_obj.keys()))
features_w_metrics = []
self.logger.info(
@@ -170,18 +186,14 @@ def transform(self, datastore):
{"name": feature, "plots": sorted(plots, key=lambda plot: plot["name"])}
)
- params = {
- "section_title": self.section_name,
- "section_description": self.description,
- "features": features_w_metrics,
- }
-
- if self.store_key in datastore:
- datastore[self.store_key].append(params)
- else:
- datastore[self.store_key] = [params]
-
- return datastore
+ sections.append(
+ {
+ "section_title": self.section_name,
+ "section_description": self.description,
+ "features": features_w_metrics,
+ }
+ )
+ return sections
def _plot_metric(
diff --git a/popmon/visualization/histogram_section.py b/popmon/visualization/histogram_section.py
index 3be3dc38..e5cb75ee 100644
--- a/popmon/visualization/histogram_section.py
+++ b/popmon/visualization/histogram_section.py
@@ -18,6 +18,8 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+from typing import Optional
+
import pandas as pd
from histogrammar.util import get_hist_props
from tqdm import tqdm
@@ -35,6 +37,8 @@
class HistogramSection(Module):
"""This module plots histograms of all selected features for the last 'n' periods."""
+ _input_keys = ("read_key", "store_key")
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -63,6 +67,7 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
+
self.features = features or []
self.ignore_features = ignore_features or []
self.section_name = section_name
@@ -71,10 +76,14 @@ def __init__(
self.hist_name_starts_with = hist_name_starts_with
self.description = description
- def transform(self, datastore):
- data_obj = self.get_datastore_object(datastore, self.read_key, dtype=dict)
+ def get_description(self):
+ return self.section_name
- features = self.get_features(data_obj.keys())
+ def transform(self, data_obj: dict, sections: Optional[list] = None):
+ if sections is None:
+ sections = []
+
+ features = self.get_features(list(data_obj.keys()))
features_w_metrics = []
self.logger.info(f'Generating section "{self.section_name}".')
@@ -110,18 +119,14 @@ def transform(self, datastore):
{"name": feature, "plots": sorted(plots, key=lambda plot: plot["name"])}
)
- params = {
- "section_title": self.section_name,
- "section_description": self.description,
- "features": features_w_metrics,
- }
-
- if self.store_key in datastore:
- datastore[self.store_key].append(params)
- else:
- datastore[self.store_key] = [params]
-
- return datastore
+ sections.append(
+ {
+ "section_title": self.section_name,
+ "section_description": self.description,
+ "features": features_w_metrics,
+ }
+ )
+ return sections
def _plot_histograms(feature, date, hc_list, hist_names):
diff --git a/popmon/visualization/report_generator.py b/popmon/visualization/report_generator.py
index eec0f158..b95ac0b3 100644
--- a/popmon/visualization/report_generator.py
+++ b/popmon/visualization/report_generator.py
@@ -29,6 +29,8 @@ class ReportGenerator(Module):
"""This module takes already prepared section data, renders HTML section template with the data and
glues sections together into one compressed report which is created based on the provided template.
"""
+ _input_keys = ("read_key", )
+ _output_keys = ("store_key", )
def __init__(self, read_key, store_key):
"""Initialize an instance of ReportGenerator.
@@ -40,9 +42,10 @@ def __init__(self, read_key, store_key):
self.read_key = read_key
self.store_key = store_key
- def transform(self, datastore):
- sections = self.get_datastore_object(datastore, self.read_key, dtype=list)
+ def get_description(self):
+ return "HTML Report"
+ def transform(self, sections: list) -> str:
# concatenate HTML sections' code
sections_html = ""
for i, section_info in enumerate(sections):
@@ -51,11 +54,10 @@ def transform(self, datastore):
)
# get HTML template for the final report, insert placeholder data and compress the code
- datastore[self.store_key] = htmlmin.minify(
+ return htmlmin.minify(
templates_env(
filename="core.html",
generator=f"popmon {version}",
sections=sections_html,
)
)
- return datastore
diff --git a/popmon/visualization/section_generator.py b/popmon/visualization/section_generator.py
index 3f94bca3..f1049858 100644
--- a/popmon/visualization/section_generator.py
+++ b/popmon/visualization/section_generator.py
@@ -18,6 +18,8 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+from typing import Optional
+
import numpy as np
import pandas as pd
from tqdm import tqdm
@@ -33,6 +35,8 @@ class SectionGenerator(Module):
combines all the plots into a list which is stored together with the section name in a dictionary
which later will be used for the report generation.
"""
+ _input_keys = ("read_key", "static_bounds", "dynamic_bounds", "store_key")
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -75,14 +79,15 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
+ self.dynamic_bounds = dynamic_bounds
+ self.static_bounds = static_bounds
+
self.features = features or []
self.ignore_features = ignore_features or []
self.section_name = section_name
self.last_n = last_n
self.skip_first_n = skip_first_n
self.skip_last_n = skip_last_n
- self.dynamic_bounds = dynamic_bounds
- self.static_bounds = static_bounds
self.prefix = prefix
self.suffices = suffices
self.ignore_stat_endswith = ignore_stat_endswith or []
@@ -90,17 +95,18 @@ def __init__(
self.description = description
self.show_stats = show_stats
- def transform(self, datastore):
- data_obj = self.get_datastore_object(datastore, self.read_key, dtype=dict)
+ def get_description(self):
+ return self.section_name
- static_bounds = self.get_datastore_object(
- datastore, self.static_bounds, dtype=dict, default={}
- )
- dynamic_bounds = self.get_datastore_object(
- datastore, self.dynamic_bounds, dtype=dict, default={}
- )
+ def transform(self, data_obj: dict, static_bounds: Optional[dict] = None, dynamic_bounds: Optional[dict] = None, sections: Optional[list] = None):
+ if static_bounds is None:
+ static_bounds = {}
+ if dynamic_bounds is None:
+ dynamic_bounds = {}
+ if sections is None:
+ sections = []
- features = self.get_features(data_obj.keys())
+ features = self.get_features(list(data_obj.keys()))
features_w_metrics = []
self.logger.info(
@@ -151,18 +157,14 @@ def transform(self, datastore):
{"name": feature, "plots": sorted(plots, key=lambda plot: plot["name"])}
)
- params = {
- "section_title": self.section_name,
- "section_description": self.description,
- "features": features_w_metrics,
- }
-
- if self.store_key not in datastore:
- datastore[self.store_key] = []
-
- datastore[self.store_key].append(params)
-
- return datastore
+ sections.append(
+ {
+ "section_title": self.section_name,
+ "section_description": self.description,
+ "features": features_w_metrics,
+ }
+ )
+ return sections
def _plot_metric(
diff --git a/popmon/visualization/traffic_light_section_generator.py b/popmon/visualization/traffic_light_section_generator.py
index ca5ce1d7..56d19d26 100644
--- a/popmon/visualization/traffic_light_section_generator.py
+++ b/popmon/visualization/traffic_light_section_generator.py
@@ -18,6 +18,8 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+from typing import Optional
+
import numpy as np
import pandas as pd
from tqdm import tqdm
@@ -38,6 +40,8 @@ class TrafficLightSectionGenerator(Module):
combines all the plots into a list which is stored together with the section name in a dictionary
which later will be used for the report generation.
"""
+ _input_keys = ("read_key", "dynamic_bounds", "store_key")
+ _output_keys = ("store_key", )
def __init__(
self,
@@ -84,14 +88,15 @@ def __init__(
super().__init__()
self.read_key = read_key
self.store_key = store_key
+ self.dynamic_bounds = dynamic_bounds
+ self.static_bounds = static_bounds
+
self.features = features or []
self.ignore_features = ignore_features or []
self.section_name = section_name
self.last_n = last_n
self.skip_first_n = skip_first_n
self.skip_last_n = skip_last_n
- self.dynamic_bounds = dynamic_bounds
- self.static_bounds = static_bounds
self.prefix = prefix
self.suffices = suffices
self.ignore_stat_endswith = ignore_stat_endswith or []
@@ -101,14 +106,19 @@ def __init__(
self.plot_overview = plot_overview
self.plot_metrics = plot_metrics
- def transform(self, datastore):
- data_obj = self.get_datastore_object(datastore, self.read_key, dtype=dict)
+ def get_description(self):
+ return self.section_name
- dynamic_bounds = self.get_datastore_object(
- datastore, self.dynamic_bounds, dtype=dict, default={}
- )
+ def transform(self, data_obj: dict, dynamic_bounds: Optional[dict] = None, sections: Optional[list] = None):
+ assert isinstance(data_obj, dict)
+ if dynamic_bounds is None:
+ dynamic_bounds = {}
+ assert isinstance(dynamic_bounds, dict)
+ if sections is None:
+ sections = []
+ assert isinstance(sections, list)
- features = self.get_features(data_obj.keys())
+ features = self.get_features(list(data_obj.keys()))
features_w_metrics = []
self.logger.info(
@@ -170,18 +180,14 @@ def transform(self, datastore):
{"name": feature, "plots": sorted(plots, key=lambda plot: plot["name"])}
)
- params = {
- "section_title": self.section_name,
- "section_description": self.description,
- "features": features_w_metrics,
- }
-
- if self.store_key in datastore:
- datastore[self.store_key].append(params)
- else:
- datastore[self.store_key] = [params]
-
- return datastore
+ sections.append(
+ {
+ "section_title": self.section_name,
+ "section_description": self.description,
+ "features": features_w_metrics,
+ }
+ )
+ return sections
def _plot_metric(metric, dates, values, last_n, skip_first_n, skip_last_n, skip_empty):
diff --git a/tests/popmon/alerting/test_compute_tl_bounds.py b/tests/popmon/alerting/test_compute_tl_bounds.py
index c9b392dc..b2211866 100644
--- a/tests/popmon/alerting/test_compute_tl_bounds.py
+++ b/tests/popmon/alerting/test_compute_tl_bounds.py
@@ -35,14 +35,13 @@ def test_compute_traffic_light_bounds():
monitoring_rules=conf["monitoring_rules"],
)
- output = module.transform(datastore)["output_data"]
+ output = module._transform(datastore)["output_data"]
assert "dummy_feature:mae" not in output.keys()
assert output["the_feature:mae"] == [8, 4, 2, 2]
assert output["the_feature:mse"] == [0.2, 0.11, 0.09, 0]
def test_compute_traffic_light_funcs():
-
datastore = {"test_data": pytest.test_comparer_df}
conf = {
@@ -61,7 +60,7 @@ def test_compute_traffic_light_funcs():
monitoring_rules=conf["monitoring_rules"],
)
- output = module.transform(datastore)["output_data"]
+ output = module._transform(datastore)["output_data"]
assert len(output) == 3
assert output[0]["features"] == ["dummy_feature"]
diff --git a/tests/popmon/analysis/profiling/test_apply_func.py b/tests/popmon/analysis/profiling/test_apply_func.py
index 8a53e87e..4adff82e 100644
--- a/tests/popmon/analysis/profiling/test_apply_func.py
+++ b/tests/popmon/analysis/profiling/test_apply_func.py
@@ -60,7 +60,7 @@ def func(x):
module.add_apply_func(np.mean, entire=True)
module.add_apply_func(func)
- datastore = module.transform(datastore)
+ datastore = module._transform(datastore)
p = datastore["profiled"]["asc_numbers"]
diff --git a/tests/popmon/analysis/test_merge_statistics.py b/tests/popmon/analysis/test_merge_statistics.py
index cc7c1a54..ff474311 100644
--- a/tests/popmon/analysis/test_merge_statistics.py
+++ b/tests/popmon/analysis/test_merge_statistics.py
@@ -40,7 +40,7 @@ def test_merge_statistics():
}
datastore = MergeStatistics(
read_keys=["first_df", "second_df"], store_key="output_df"
- ).transform(datastore)
+ )._transform(datastore)
pd.testing.assert_frame_equal(df1.combine_first(df2), out)
pd.testing.assert_frame_equal(datastore["output_df"]["feature_1"], out)
diff --git a/tests/popmon/base/test_pipeline.py b/tests/popmon/base/test_pipeline.py
index 613182e1..650a1c71 100644
--- a/tests/popmon/base/test_pipeline.py
+++ b/tests/popmon/base/test_pipeline.py
@@ -6,66 +6,63 @@
class LogTransformer(Module):
+ _input_keys = ("input_key", )
+ _output_keys = ("output_key", )
+
def __init__(self, input_key, output_key):
super().__init__()
self.input_key = input_key
self.output_key = output_key
- def transform(self, datastore):
- input_array = self.get_datastore_object(
- datastore, self.input_key, dtype=np.ndarray
- )
- datastore[self.output_key] = np.log(input_array)
+ def transform(self, input_array: np.ndarray):
+ output = np.log(input_array)
self.logger.info(f"{self.__class__.__name__} is calculated.")
- return datastore
+ return output
class PowerTransformer(Module):
+ _input_keys = ("input_key",)
+ _output_keys = ("output_key",)
+
def __init__(self, input_key, output_key, power):
super().__init__()
self.input_key = input_key
self.output_key = output_key
self.power = power
- def transform(self, datastore):
- input_array = self.get_datastore_object(
- datastore, self.input_key, dtype=np.ndarray
- )
- datastore[self.output_key] = np.power(input_array, self.power)
- return datastore
+ def transform(self, input_array: np.ndarray):
+ result = np.power(input_array, self.power)
+ return result
class SumNormalizer(Module):
+ _input_keys = ("input_key",)
+ _output_keys = ("output_key",)
+
def __init__(self, input_key, output_key):
super().__init__()
self.input_key = input_key
self.output_key = output_key
- def transform(self, datastore):
- input_array = self.get_datastore_object(
- datastore, self.input_key, dtype=np.ndarray
- )
- datastore[self.output_key] = input_array / input_array.sum()
- return datastore
+ def transform(self, input_array: np.ndarray):
+ result = input_array / input_array.sum()
+ return result
class WeightedSum(Module):
+ _input_keys = ("input_key", "weight_key")
+ _output_keys = ("output_key",)
+
def __init__(self, input_key, weight_key, output_key):
super().__init__()
self.input_key = input_key
self.weight_key = weight_key
self.output_key = output_key
- def transform(self, datastore):
- input_array = self.get_datastore_object(
- datastore, self.input_key, dtype=np.ndarray
- )
- weights = self.get_datastore_object(
- datastore, self.weight_key, dtype=np.ndarray
- )
- datastore[self.output_key] = np.sum(input_array * weights)
+ def transform(self, input_array: np.ndarray, weights: np.ndarray):
+ result = np.sum(input_array * weights)
self.logger.info(f"{self.__class__.__name__} is calculated.")
- return datastore
+ return result
def test_popmon_pipeline():
diff --git a/tests/popmon/io/test_file_reader.py b/tests/popmon/io/test_file_reader.py
index 9ad91703..d953d3d2 100644
--- a/tests/popmon/io/test_file_reader.py
+++ b/tests/popmon/io/test_file_reader.py
@@ -10,7 +10,7 @@ def test_file_reader_json():
store_key="example",
apply_func=json.loads,
)
- datastore = fr.transform(datastore={})
+ datastore = fr._transform(datastore={})
assert datastore["example"]["boolean"]
assert len(datastore["example"]["array"]) == 3
diff --git a/tests/popmon/io/test_file_writer.py b/tests/popmon/io/test_file_writer.py
index c00fa308..b505b4d0 100644
--- a/tests/popmon/io/test_file_writer.py
+++ b/tests/popmon/io/test_file_writer.py
@@ -23,25 +23,25 @@ def to_pandas(data):
def test_file_writer_json():
datastore = get_ready_ds()
- FileWriter("my_data", apply_func=to_json).transform(datastore)
+ FileWriter("my_data", apply_func=to_json)._transform(datastore)
assert datastore["my_data"] == to_json(DATA)
def test_file_writer_json_with_kwargument():
datastore = get_ready_ds()
- FileWriter("my_data", apply_func=to_json, indent=4).transform(datastore)
+ FileWriter("my_data", apply_func=to_json, indent=4)._transform(datastore)
assert datastore["my_data"] == to_json(DATA, indent=4)
def test_file_writer_not_a_func():
datastore = get_ready_ds()
with pytest.raises(TypeError):
- FileWriter("my_data", apply_func={}).transform(datastore)
+ FileWriter("my_data", apply_func={})._transform(datastore)
def test_file_writer_df():
datastore = get_ready_ds()
- FileWriter("my_data", store_key="transformed_data", apply_func=to_pandas).transform(
+ FileWriter("my_data", store_key="transformed_data", apply_func=to_pandas)._transform(
datastore
)
assert datastore["my_data"] == DATA
diff --git a/tests/popmon/io/test_json_reader.py b/tests/popmon/io/test_json_reader.py
index 4a46651b..d47e155b 100644
--- a/tests/popmon/io/test_json_reader.py
+++ b/tests/popmon/io/test_json_reader.py
@@ -4,7 +4,7 @@
def test_json_reader():
jr = JsonReader(file_path=resources.data("example.json"), store_key="example")
- datastore = jr.transform(datastore={})
+ datastore = jr._transform(datastore={})
assert datastore["example"]["boolean"]
assert len(datastore["example"]["array"]) == 3
diff --git a/tools/pipeline_viz.py b/tools/pipeline_viz.py
new file mode 100644
index 00000000..69f2f117
--- /dev/null
+++ b/tools/pipeline_viz.py
@@ -0,0 +1,97 @@
+import json
+from pathlib import Path
+
+import networkx as nx
+import pygraphviz
+from networkx.drawing.nx_agraph import to_agraph
+
+
+def generate_pipeline_vizualisation(input_file, output_file, include_subgraphs: bool = False, include_labels: bool = False):
+ data = Path(input_file).read_text()
+ data = json.loads(data)
+
+ subgraphs = []
+ modules = []
+
+ def populate(item):
+ if item['type'] == 'subgraph':
+ mods = []
+ for m in item['modules']:
+ mods += populate(m)
+
+ subgraphs.append(
+ {
+ 'modules': mods,
+ 'name': item['name']
+ }
+ )
+ return mods
+ elif item['type'] == 'module':
+ modules.append(item)
+ name = f"{item['name']}_{item['i']}"
+ return [name]+list(item["out"].values())
+ else:
+ raise ValueError()
+
+ populate(data)
+
+ G = nx.DiGraph()
+ for module in modules:
+ label = f"<{module['name']}"
+ d = module.get('desc', '')
+ if len(d) > 0:
+ label += f"
{d}"
+ label += ">"
+
+ # unique name
+ name = f"{module['name']}_{module['i']}"
+
+ G.add_node(name, shape='rectangle', fillcolor='chartreuse', style='filled', label=label)
+
+
+ for k, v in module['in'].items():
+ kwargs = {}
+ if include_labels:
+ kwargs['headlabel'] = k
+ G.add_edge(v, name, **kwargs)
+ for k, v in module['out'].items():
+ kwargs = {}
+ if include_labels:
+ kwargs['taillabel'] = k
+ G.add_edge(name, v, **kwargs)
+
+ # set defaults
+ G.graph['graph'] = {'rankdir':'TD'}
+ G.graph['node'] = {'shape':'oval', 'fillcolor': 'orange', 'style': 'filled'}
+ G.graph['edge'] = {'fontcolor':"gray50"}
+
+ A = to_agraph(G)
+ if include_subgraphs:
+ for idx, subgraph in enumerate(subgraphs):
+ H = A.subgraph(subgraph["modules"], name=f'cluster_{idx}_{subgraph["name"].lower().replace(" ", "_")}')
+ H.graph_attr["color"] = "blue"
+ H.graph_attr["label"] = subgraph["name"]
+ H.graph_attr["style"] = "dotted"
+
+ A.layout('dot')
+ A.draw(output_file)
+
+
+if __name__ == "__main__":
+ data_path = Path("<...>")
+
+ input_file = data_path / "pipeline_self_reference_unversioned.json"
+ output_file = 'popmon-report-pipeline-subgraphs-unversioned.pdf'
+ generate_pipeline_vizualisation(input_file, output_file, include_subgraphs=True)
+
+ input_file = data_path / "pipeline_self_reference_unversioned.json"
+ output_file = 'popmon-report-pipeline-unversioned.pdf'
+ generate_pipeline_vizualisation(input_file, output_file, include_subgraphs=False)
+
+ input_file = data_path / "pipeline_self_reference_versioned.json"
+ output_file = 'popmon-report-pipeline-subgraphs-versioned.pdf'
+ generate_pipeline_vizualisation(input_file, output_file, include_subgraphs=True)
+
+ input_file = data_path / "pipeline_self_reference_versioned.json"
+ output_file = 'popmon-report-pipeline-versioned.pdf'
+ generate_pipeline_vizualisation(input_file, output_file, include_subgraphs=False)