From 780f2e23cccf3648b385499c589347c3bed0461e Mon Sep 17 00:00:00 2001 From: Avik Basu Date: Mon, 5 Feb 2024 17:33:06 -0500 Subject: [PATCH] fix merge issues Signed-off-by: Avik Basu --- numalogic/backtest/_prom.py | 1 + numalogic/models/threshold/_median.py | 21 +-------------------- numalogic/transforms/__init__.py | 1 + numalogic/transforms/_stateless.py | 11 ++++++++--- numalogic/udfs/factory.py | 9 +++++---- numalogic/udfs/postprocess.py | 2 ++ tests/models/threshold/test_median.py | 19 ------------------- tests/test_backtest.py | 2 +- tests/udfs/resources/_config2.yaml | 2 +- tests/udfs/test_postprocess.py | 9 ++------- 10 files changed, 22 insertions(+), 55 deletions(-) diff --git a/numalogic/backtest/_prom.py b/numalogic/backtest/_prom.py index fa8bee60..7908f180 100644 --- a/numalogic/backtest/_prom.py +++ b/numalogic/backtest/_prom.py @@ -201,6 +201,7 @@ def generate_scores( for idx, arr in enumerate(ds): x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr) + # TODO support for multivariate thresholding functions thresh_out = postproc_udf.compute_threshold(artifacts["threshold_clf"], x_recon[idx]) raw_scores[idx] = thresh_out diff --git a/numalogic/models/threshold/_median.py b/numalogic/models/threshold/_median.py index 879e482f..f73af058 100644 --- a/numalogic/models/threshold/_median.py +++ b/numalogic/models/threshold/_median.py @@ -1,6 +1,3 @@ -from collections.abc import Sequence -from typing import Optional - import numpy as np import numpy.typing as npt from typing_extensions import Self, Final @@ -24,21 +21,17 @@ class MaxPercentileThreshold(BaseThresholdModel): feature_weights: weights to be used for each feature (used only if aggregate=True) """ - __slots__ = ("_max_percentile", "_min_thresh", "_thresh", "_agg", "_weights", "_is_fitted") + __slots__ = ("_max_percentile", "_min_thresh", "_thresh", "_is_fitted") def __init__( self, max_inlier_percentile: float = 96.0, min_threshold: float = 1e-4, - aggregate: bool = False, - feature_weights: Optional[Sequence[float]] = None, ): super().__init__() self._max_percentile = max_inlier_percentile self._min_thresh = min_threshold self._thresh = None - self._agg = aggregate - self._weights = feature_weights self._is_fitted = False @property @@ -73,15 +66,3 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]: self._validate_input(x) return x / self._thresh - - # if self._agg: - # return self.agg_score_samples(scores, weights=self._weights) - # return scores - - @staticmethod - def agg_score_samples( - y: npt.NDArray[float], weights: Optional[Sequence[float]] = None - ) -> npt.NDArray[float]: - if weights: - return np.average(y, weights=weights, axis=1, keepdims=True) - return np.mean(y, axis=1, keepdims=True) diff --git a/numalogic/transforms/__init__.py b/numalogic/transforms/__init__.py index b485da50..55186b83 100644 --- a/numalogic/transforms/__init__.py +++ b/numalogic/transforms/__init__.py @@ -21,6 +21,7 @@ DataClipper, GaussianNoiseAdder, DifferenceTransform, + FlattenVector, ) from numalogic.transforms._movavg import ExpMovingAverage, expmov_avg_aggregator from numalogic.transforms._postprocess import TanhNorm, tanh_norm diff --git a/numalogic/transforms/_stateless.py b/numalogic/transforms/_stateless.py index c3f62c5d..cb1ecfd8 100644 --- a/numalogic/transforms/_stateless.py +++ b/numalogic/transforms/_stateless.py @@ -89,9 +89,8 @@ def _validate_args( if lower is None and upper is None: raise ValueError("At least one of lower or upper should be provided.") - if isinstance(lower, Sequence) and isinstance(upper, Sequence): - if len(lower) != len(upper): - raise ValueError("lower and upper should have the same length.") + if isinstance(lower, Sequence) and isinstance(upper, Sequence) and len(lower) != len(upper): + raise ValueError("lower and upper should have the same length.") def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]: _df = pd.DataFrame(x, dtype=np.float32) @@ -129,6 +128,12 @@ def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]: class DifferenceTransform(StatelessTransformer): + """ + Apply feature wise differencing. + + Note: First value is backfilled with the first non-NAN value. + """ + def transform(self, input_: npt.NDArray, **__): diff_df = pd.DataFrame(input_).diff().bfill() return diff_df.to_numpy(dtype=np.float32) diff --git a/numalogic/udfs/factory.py b/numalogic/udfs/factory.py index 59bbd6a3..7040ee89 100644 --- a/numalogic/udfs/factory.py +++ b/numalogic/udfs/factory.py @@ -11,8 +11,7 @@ import logging import warnings -from typing import ClassVar - +from typing import ClassVar, TypeVar _LOGGER = logging.getLogger(__name__) @@ -27,6 +26,8 @@ class UDFFactory: from numalogic.udfs.preprocess import PreprocessUDF from numalogic.udfs.trainer import DruidTrainerUDF, PromTrainerUDF + nl_udf_t = TypeVar("nl_udf_t", bound=NumalogicUDF, covariant=True) + _UDF_MAP: ClassVar[dict[str, type[NumalogicUDF]]] = { "mlpipeline": PayloadTransformer, "preprocess": PreprocessUDF, @@ -37,7 +38,7 @@ class UDFFactory: } @classmethod - def get_udf_cls(cls, udf_name: str) -> type[NumalogicUDF]: + def get_udf_cls(cls, udf_name: str) -> type[nl_udf_t]: """ Get the UDF class. @@ -67,7 +68,7 @@ def get_udf_cls(cls, udf_name: str) -> type[NumalogicUDF]: raise ValueError(_msg) from err @classmethod - def get_udf_instance(cls, udf_name: str, **kwargs) -> NumalogicUDF: + def get_udf_instance(cls, udf_name: str, **kwargs) -> nl_udf_t: """ Get the UDF instance. diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index decdb36e..c1767ca1 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -196,6 +196,8 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: @staticmethod def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[str, float]: + if len(scores) != len(feat_names): + return {} return dict(zip(feat_names, scores)) @classmethod diff --git a/tests/models/threshold/test_median.py b/tests/models/threshold/test_median.py index 41072de6..4b75d85d 100644 --- a/tests/models/threshold/test_median.py +++ b/tests/models/threshold/test_median.py @@ -37,22 +37,3 @@ def test_predict(data, fitted): _, x_test = data y_pred = fitted.predict(x_test) assert y_pred.shape == (50, 3) - - -def test_agg_score_samples(data): - x_train, x_test = data - clf_1 = MaxPercentileThreshold(max_inlier_percentile=75, min_threshold=1e-3, aggregate=True) - clf_2 = MaxPercentileThreshold( - max_inlier_percentile=75, - min_threshold=1e-3, - aggregate=True, - feature_weights=[0.1, 0.7, 0.2], - ) - clf_1.fit(x_train) - clf_2.fit(x_train) - - y_scores_1 = clf_1.score_samples(x_test) - y_scores_2 = clf_2.score_samples(x_test) - - assert y_scores_1.shape == y_scores_2.shape == (50, 1) - assert np.sum(y_scores_1) > np.sum(y_scores_2) diff --git a/tests/test_backtest.py b/tests/test_backtest.py index 0e608a73..31746878 100644 --- a/tests/test_backtest.py +++ b/tests/test_backtest.py @@ -14,7 +14,7 @@ NumalogicConf( preprocess=[ModelInfo(name="LogTransformer")], model=ModelInfo(name="Conv1dVAE", conf=dict(seq_len=12, n_features=3, latent_dim=1)), - threshold=ModelInfo(name="RobustMahalanobisThreshold"), + threshold=ModelInfo(name="MaxPercentileThreshold"), trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(accelerator="cpu", max_epochs=1)), ) ) diff --git a/tests/udfs/resources/_config2.yaml b/tests/udfs/resources/_config2.yaml index db43eac9..3b01f882 100644 --- a/tests/udfs/resources/_config2.yaml +++ b/tests/udfs/resources/_config2.yaml @@ -33,7 +33,7 @@ stream_confs: "degraded": 0 pltrainer_conf: accelerator: cpu - max_epochs: 5 + max_epochs: 1 redis_conf: url: "isbsvc-redis-isbs-redis-svc.oss-analytics-numalogicosamfci-usw2-e2e.svc" diff --git a/tests/udfs/test_postprocess.py b/tests/udfs/test_postprocess.py index 86d5b869..393783e1 100644 --- a/tests/udfs/test_postprocess.py +++ b/tests/udfs/test_postprocess.py @@ -157,7 +157,8 @@ def test_postprocess_all_model_present_02(self): msg = udf(KEYS, Datum(keys=KEYS, value=orjson.dumps(data), **DATUM_KW)) payload = OutputPayload(**orjson.loads(msg[0].value)) - self.assertListEqual(data["metrics"], list(payload.data)) + print(payload) + self.assertFalse(list(payload.data)) self.assertEqual(1, len(msg)) @patch("numalogic.udfs.postprocess.PostprocessUDF.compute", Mock(side_effect=RuntimeError)) @@ -181,12 +182,6 @@ def test_preprocess_4(self): payload = TrainerPayload(**orjson.loads(msg[0].value)) self.assertEqual(payload.header, Header.TRAIN_REQUEST) - def test_postprocess_NotImplementedMethod(self): - arr = np.asarray([[1, 1], [2, 2]]) - self.assertEqual(1.5, self.udf._calculate_unified_score(arr, "mean")) - self.assertEqual(1, self.udf._calculate_unified_score(arr, "min")) - self.assertEqual(2, self.udf._calculate_unified_score(arr, "max")) - if __name__ == "__main__": unittest.main()