Skip to content

Commit

Permalink
fix merge issues
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <ab93@users.noreply.github.com>
  • Loading branch information
ab93 committed Feb 5, 2024
1 parent 114d6e3 commit 780f2e2
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 55 deletions.
1 change: 1 addition & 0 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def generate_scores(
for idx, arr in enumerate(ds):
x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr)

# TODO support for multivariate thresholding functions
thresh_out = postproc_udf.compute_threshold(artifacts["threshold_clf"], x_recon[idx])
raw_scores[idx] = thresh_out

Expand Down
21 changes: 1 addition & 20 deletions numalogic/models/threshold/_median.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from collections.abc import Sequence
from typing import Optional

import numpy as np
import numpy.typing as npt
from typing_extensions import Self, Final
Expand All @@ -24,21 +21,17 @@ class MaxPercentileThreshold(BaseThresholdModel):
feature_weights: weights to be used for each feature (used only if aggregate=True)
"""

__slots__ = ("_max_percentile", "_min_thresh", "_thresh", "_agg", "_weights", "_is_fitted")
__slots__ = ("_max_percentile", "_min_thresh", "_thresh", "_is_fitted")

def __init__(
self,
max_inlier_percentile: float = 96.0,
min_threshold: float = 1e-4,
aggregate: bool = False,
feature_weights: Optional[Sequence[float]] = None,
):
super().__init__()
self._max_percentile = max_inlier_percentile
self._min_thresh = min_threshold
self._thresh = None
self._agg = aggregate
self._weights = feature_weights
self._is_fitted = False

@property
Expand Down Expand Up @@ -73,15 +66,3 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:

self._validate_input(x)
return x / self._thresh

# if self._agg:
# return self.agg_score_samples(scores, weights=self._weights)
# return scores

@staticmethod
def agg_score_samples(
y: npt.NDArray[float], weights: Optional[Sequence[float]] = None
) -> npt.NDArray[float]:
if weights:
return np.average(y, weights=weights, axis=1, keepdims=True)
return np.mean(y, axis=1, keepdims=True)
1 change: 1 addition & 0 deletions numalogic/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
DataClipper,
GaussianNoiseAdder,
DifferenceTransform,
FlattenVector,
)
from numalogic.transforms._movavg import ExpMovingAverage, expmov_avg_aggregator
from numalogic.transforms._postprocess import TanhNorm, tanh_norm
Expand Down
11 changes: 8 additions & 3 deletions numalogic/transforms/_stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ def _validate_args(
if lower is None and upper is None:
raise ValueError("At least one of lower or upper should be provided.")

Check warning on line 90 in numalogic/transforms/_stateless.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_stateless.py#L90

Added line #L90 was not covered by tests

if isinstance(lower, Sequence) and isinstance(upper, Sequence):
if len(lower) != len(upper):
raise ValueError("lower and upper should have the same length.")
if isinstance(lower, Sequence) and isinstance(upper, Sequence) and len(lower) != len(upper):
raise ValueError("lower and upper should have the same length.")

Check warning on line 93 in numalogic/transforms/_stateless.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_stateless.py#L93

Added line #L93 was not covered by tests

def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]:
_df = pd.DataFrame(x, dtype=np.float32)
Expand Down Expand Up @@ -129,6 +128,12 @@ def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]:


class DifferenceTransform(StatelessTransformer):
"""
Apply feature wise differencing.
Note: First value is backfilled with the first non-NAN value.
"""

def transform(self, input_: npt.NDArray, **__):
diff_df = pd.DataFrame(input_).diff().bfill()
return diff_df.to_numpy(dtype=np.float32)
Expand Down
9 changes: 5 additions & 4 deletions numalogic/udfs/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

import logging
import warnings
from typing import ClassVar

from typing import ClassVar, TypeVar

_LOGGER = logging.getLogger(__name__)

Expand All @@ -27,6 +26,8 @@ class UDFFactory:
from numalogic.udfs.preprocess import PreprocessUDF
from numalogic.udfs.trainer import DruidTrainerUDF, PromTrainerUDF

nl_udf_t = TypeVar("nl_udf_t", bound=NumalogicUDF, covariant=True)

_UDF_MAP: ClassVar[dict[str, type[NumalogicUDF]]] = {
"mlpipeline": PayloadTransformer,
"preprocess": PreprocessUDF,
Expand All @@ -37,7 +38,7 @@ class UDFFactory:
}

@classmethod
def get_udf_cls(cls, udf_name: str) -> type[NumalogicUDF]:
def get_udf_cls(cls, udf_name: str) -> type[nl_udf_t]:
"""
Get the UDF class.
Expand Down Expand Up @@ -67,7 +68,7 @@ def get_udf_cls(cls, udf_name: str) -> type[NumalogicUDF]:
raise ValueError(_msg) from err

@classmethod
def get_udf_instance(cls, udf_name: str, **kwargs) -> NumalogicUDF:
def get_udf_instance(cls, udf_name: str, **kwargs) -> nl_udf_t:
"""
Get the UDF instance.
Expand Down
2 changes: 2 additions & 0 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

@staticmethod
def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[str, float]:
if len(scores) != len(feat_names):
return {}
return dict(zip(feat_names, scores))

@classmethod
Expand Down
19 changes: 0 additions & 19 deletions tests/models/threshold/test_median.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,3 @@ def test_predict(data, fitted):
_, x_test = data
y_pred = fitted.predict(x_test)
assert y_pred.shape == (50, 3)


def test_agg_score_samples(data):
x_train, x_test = data
clf_1 = MaxPercentileThreshold(max_inlier_percentile=75, min_threshold=1e-3, aggregate=True)
clf_2 = MaxPercentileThreshold(
max_inlier_percentile=75,
min_threshold=1e-3,
aggregate=True,
feature_weights=[0.1, 0.7, 0.2],
)
clf_1.fit(x_train)
clf_2.fit(x_train)

y_scores_1 = clf_1.score_samples(x_test)
y_scores_2 = clf_2.score_samples(x_test)

assert y_scores_1.shape == y_scores_2.shape == (50, 1)
assert np.sum(y_scores_1) > np.sum(y_scores_2)
2 changes: 1 addition & 1 deletion tests/test_backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
NumalogicConf(
preprocess=[ModelInfo(name="LogTransformer")],
model=ModelInfo(name="Conv1dVAE", conf=dict(seq_len=12, n_features=3, latent_dim=1)),
threshold=ModelInfo(name="RobustMahalanobisThreshold"),
threshold=ModelInfo(name="MaxPercentileThreshold"),
trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(accelerator="cpu", max_epochs=1)),
)
)
Expand Down
2 changes: 1 addition & 1 deletion tests/udfs/resources/_config2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ stream_confs:
"degraded": 0
pltrainer_conf:
accelerator: cpu
max_epochs: 5
max_epochs: 1

redis_conf:
url: "isbsvc-redis-isbs-redis-svc.oss-analytics-numalogicosamfci-usw2-e2e.svc"
Expand Down
9 changes: 2 additions & 7 deletions tests/udfs/test_postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ def test_postprocess_all_model_present_02(self):

msg = udf(KEYS, Datum(keys=KEYS, value=orjson.dumps(data), **DATUM_KW))
payload = OutputPayload(**orjson.loads(msg[0].value))
self.assertListEqual(data["metrics"], list(payload.data))
print(payload)
self.assertFalse(list(payload.data))
self.assertEqual(1, len(msg))

@patch("numalogic.udfs.postprocess.PostprocessUDF.compute", Mock(side_effect=RuntimeError))
Expand All @@ -181,12 +182,6 @@ def test_preprocess_4(self):
payload = TrainerPayload(**orjson.loads(msg[0].value))
self.assertEqual(payload.header, Header.TRAIN_REQUEST)

def test_postprocess_NotImplementedMethod(self):
arr = np.asarray([[1, 1], [2, 2]])
self.assertEqual(1.5, self.udf._calculate_unified_score(arr, "mean"))
self.assertEqual(1, self.udf._calculate_unified_score(arr, "min"))
self.assertEqual(2, self.udf._calculate_unified_score(arr, "max"))


if __name__ == "__main__":
unittest.main()

0 comments on commit 780f2e2

Please sign in to comment.