Skip to content

Commit

Permalink
Anomaly Detection (anomaly model, scorer, detector, aggregator) (#1256)
Browse files Browse the repository at this point in the history
* AD first Ver

* AD first Version

* added ForecastingAnomalyModel/FilteringAnomalyModel, and scorers: KmeansAnomaly/LocalOutlierFactorAnomaly

* implemented GaussianMixtureScorer and allow multiple scorer inputs

* Added comments and possibility to input a list of scorers in AnomalyModel

* Clean whitespace

* Clean whitespace2

* Clean whitespace2

* Clean whitespace with VScode

* Changed diff() position and added characteristic_length parameters

* renamed submodule

* small changes

* small improvements

* small changes

* Accepts all types UTS, MTS, list(UTS or MTS)

* move _diff() in child, so that scorers have all the same signature

* replaced L1, L2, and Abs_diff with Norm

* add component_wise to WassersteinScorer

* add component_wise to Kmeans

* add component_wise to LOF

* add component_wise to GaussianMixture

* Accept num_samples for probabilistic models forecasting

* Minor changes

* add comments, add likelihood

* add laplace, + window parameter + parameter alllow_retrain

* add cauchy and gamma likelihood

* add utils.py, detectors, aggregators

* removed show function for now

* add show_anomalies() and show_anomalies_from_scores()

* small changes

* Some docstring improvements to AnomalyModels

* corrected Kmeans, LFO and Gaussian Scorer + added input from PR

* test commit

* negative LFO and gaussian

* pre pull

* from prediciton structure

* improved show_anomalies, changed structure _from_prediction

* small mistake in eval_accuracy in utils.py

* return type of eval_acc

* changed way eval_acc returns in anomaly_model

* added test for agg, dect, and scorers. upgrade agg trainable

* added parameter return_UTS, and added test for scorers and anomaly_model

* small mistake in anomaly_model

* New structure in files

* Added warnings

* small change in wasserstein

* filtering_am and forecasting_am

* Small improvements

* Fix test names

* add pyod to requirements

* rename scorers

* scorers imports

* Changed handling of kwargs in AD models

* update tests

* return single TimeSeries from score() in some cases

* small naming improvements

* Some improvements to anomaly models

* Small improvements to scorers

* Some small improvements

* Fix tests

* Norm scorer docstring

* test toy example agg and detectors

* small docstring improvements

* Add vectorization todos

* test toy example scorers

* test toy example scorers

* test toy example PyOD

* test toy example NLL scorers

* test toy example poisson nll scorer

* test toy example univariate anomaly_models

* test toy example univariate covariates forecasting_anomaly_models

* update threshold detector docstring

* change way to output string messages

* first implementation of julien H's PR review

* first implementation of julien H's PR review 2

* anomaly_model forecasting multivariate test

* anomaly_model multivariate, w=1,2, len()=2 test for NLL scorers

* changed NLL scorers: call scipy.stats function

* changed in anomaly_models (inner to outer for series and scorers)

* Small changes to PyOD detector

* improvements to wasserstein scorer docstring

* change in eval acc

* change in eval acc, new function _eval_accuracy_from_scores

* Small improvements to aggregators

* Small docstrings improvements

* Utils docstring

* change in detectors (vectorization and accepts list of param if multivariate)

* remove exp in PyODScorer... and updated test

* new test with np.testing

* agg accept only MTS or sequence of MTS

* removed old detectors

* new multivariate test for filtering anomaly model

* small changes to utils docstrings

* test assert_array_almost_equal decimal 2

* test assert_array_almost_equal decimal 1

* test assert_array_almost_equal decimal 1

* second implementation of julien H's PR review

* vectorization of NLL scorers

* problem with test_univariate_FilteringAnomalyModel

* replace abs by __abs__ in test_univariate_covariate_ForecastingAnomalyModel

* replace abs by __abs__ in ALL test_univariate_covariate_ForecastingAnomalyModel

* Increase coverage of scorers tests

* Imports in submodules

* Some improvements to utils

* Some improvements

* significant rework of quantile detector

* Rework threshold detector

* Rework NLL scorers

* Rename NLL scorers files

* vectorize windowing in k-means

* vectorization of windowing in PyOD and Wasserstein

* Docstring improvements

* Update darts/ad/anomaly_model/filtering_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/filtering_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/filtering_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/filtering_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/filtering_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/forecasting_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/__init__.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/__init__.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/forecasting_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/forecasting_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/forecasting_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/forecasting_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/anomaly_model/forecasting_am.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/scorers/__init__.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/scorers/scorers.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/scorers/scorers.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/scorers/scorers.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Update darts/ad/scorers/kmeans_scorer.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* PR comments

* Formatting

* Update darts/ad/scorers/pyod_scorer.py

Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>

* Small docstring improvement

Co-authored-by: Julien Adda <julienadda@Juliens-MacBook-Pro.local>
Co-authored-by: Julien Adda <julienadda@Juliens-MBP.localdomain>
Co-authored-by: Julien Herzen <julien@unit8.co>
Co-authored-by: Julien Adda <julienadda@juliens-macbook-pro.home>
Co-authored-by: eliane-maalouf <112691612+eliane-maalouf@users.noreply.github.com>
  • Loading branch information
6 people committed Dec 22, 2022
1 parent 919f214 commit 67edbeb
Show file tree
Hide file tree
Showing 35 changed files with 10,664 additions and 1 deletion.
51 changes: 51 additions & 0 deletions darts/ad/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Anomaly Detection
-----------------
A suite of tools for performing anomaly detection and classification
on time series.
* `Anomaly Scorers <https://unit8co.github.io/darts/generated_api/darts.ad.scorers.html>`_
are at the core of the anomaly detection module. They
produce anomaly scores time series, either for single series (``score()``),
or for series accompanied by some predictions (``score_from_prediction()``).
Scorers can be trainable (e.g., ``KMeansScorer``) or not (e.g., ``NormScorer``).
* `Anomaly Models <https://unit8co.github.io/darts/generated_api/darts.ad.anomaly_model.html>`_
offer a convenient way to produce anomaly scores from any of Darts
forecasting models (``ForecastingAnomalyModel``) or filtering models (``FilteringAnomalyModel``),
by comparing models' predictions with actual observations.
These classes take as parameters one Darts model, and one or multiple scorers, and can be readily
used to produce anomaly scores with the ``score()`` method.
* `Anomaly Detectors <https://unit8co.github.io/darts/generated_api/darts.ad.detectors.html>`_:
transform raw time series (such as anaomly scores) into binary anomaly time series.
* `Anomaly Aggregators <https://unit8co.github.io/darts/generated_api/darts.ad.aggregators.html>`_:
combine multiple binary anomaly time series (in the form of multivariate time series)
into a single binary anomaly time series applying boolean logic.
"""

# anomaly aggregators
from .aggregators import AndAggregator, EnsembleSklearnAggregator, OrAggregator

# anomaly models
from .anomaly_model import FilteringAnomalyModel, ForecastingAnomalyModel

# anomaly detectors
from .detectors import QuantileDetector, ThresholdDetector

# anomaly scorers
from .scorers import (
CauchyNLLScorer,
DifferenceScorer,
ExponentialNLLScorer,
GammaNLLScorer,
GaussianNLLScorer,
KMeansScorer,
LaplaceNLLScorer,
NormScorer,
PoissonNLLScorer,
PyODScorer,
WassersteinScorer,
)
18 changes: 18 additions & 0 deletions darts/ad/aggregators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Anomaly Aggregators
-------------------
An anomaly aggregator can take multiple detected anomalies
(in the form of binary TimeSeries, as coming from an anomaly detector)
and combine them into one. It can typically be used to combine
the detections of multiple models into one final detection.
The key method is ``predict()``, which takes as input one (or multiple)
multivariate binary TimeSeries where each component represents the
detection of a single model, and returns one (or multiple) univariate
binary TimeSeries representing the final detection.
"""

from .and_aggregator import AndAggregator
from .ensemble_sklearn_aggregator import EnsembleSklearnAggregator
from .or_aggregator import OrAggregator
300 changes: 300 additions & 0 deletions darts/ad/aggregators/aggregators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
"""
Anomaly aggregators base classes
"""

# TODO:
# - add customize aggregators
# - add in trainable aggregators
# - log regression
# - decision tree
# - create show_all_combined (info about correlation, and from what path did
# the anomaly alarm came from)

from abc import ABC, abstractmethod
from typing import Any, Sequence, Union

import numpy as np

from darts import TimeSeries
from darts.ad.utils import _to_list, eval_accuracy_from_binary_prediction
from darts.logging import raise_if_not


class Aggregator(ABC):
def __init__(self, *args: Any, **kwargs: Any) -> None:
pass

@abstractmethod
def __str__(self):
"""returns the name of the aggregator"""
pass

@abstractmethod
def _predict_core(self):
"""returns the aggregated results"""
pass

@abstractmethod
def predict(
self, series: Union[TimeSeries, Sequence[TimeSeries]]
) -> Union[TimeSeries, Sequence[TimeSeries]]:
"""Aggregates the (sequence of) multivariate binary series given as
input into a (sequence of) univariate binary series.
Parameters
----------
series
The (sequence of) multivariate binary series to aggregate
Returns
-------
TimeSeries
(Sequence of) aggregated results
"""
pass

def _check_input(self, series: Union[TimeSeries, Sequence[TimeSeries]]):
"""
Checks for input if:
- it is a (sequence of) multivariate series (width>1)
- (sequence of) series must be:
* a deterministic TimeSeries
* binary (only values equal to 0 or 1)
"""

list_series = _to_list(series)

raise_if_not(
all([isinstance(s, TimeSeries) for s in list_series]),
"all series in `series` must be of type TimeSeries.",
)

raise_if_not(
all([s.width > 1 for s in list_series]),
"all series in `series` must be multivariate (width>1).",
)

raise_if_not(
all([s.is_deterministic for s in list_series]),
"all series in `series` must be deterministic (number of samples=1).",
)

raise_if_not(
all(
[
np.array_equal(
s.values(copy=False), s.values(copy=False).astype(bool)
)
for s in list_series
]
),
"all series in `series` must be binary (only 0 and 1 values).",
)

return list_series

def eval_accuracy(
self,
actual_anomalies: Sequence[TimeSeries],
series: Sequence[TimeSeries],
window: int = 1,
metric: str = "recall",
) -> Union[float, Sequence[float]]:
"""Aggregates the (sequence of) multivariate series given as input into one (sequence of)
series and evaluates the results against true anomalies.
Parameters
----------
actual_anomalies
The (sequence of) ground truth of the anomalies (1 if it is an anomaly and 0 if not)
series
The (sequence of) multivariate binary series to aggregate
window
(Sequence of) integer value indicating the number of past samples each point
represents in the (sequence of) series. The parameter will be used by the
function ``_window_adjustment_anomalies()`` in darts.ad.utils to transform
actual_anomalies.
metric
Metric function to use. Must be one of "recall", "precision",
"f1", and "accuracy".
Default: "recall"
Returns
-------
Union[float, Sequence[float]]
(Sequence of) score for the (sequence of) series
"""

list_actual_anomalies = _to_list(actual_anomalies)

raise_if_not(
all([isinstance(s, TimeSeries) for s in list_actual_anomalies]),
"all series in `actual_anomalies` must be of type TimeSeries.",
)

raise_if_not(
all([s.is_deterministic for s in list_actual_anomalies]),
"all series in `actual_anomalies` must be deterministic (number of samples=1).",
)

raise_if_not(
all([s.width == 1 for s in list_actual_anomalies]),
"all series in `actual_anomalies` must be univariate (width=1).",
)

raise_if_not(
len(list_actual_anomalies) == len(_to_list(series)),
"`actual_anomalies` and `series` must contain the same number of series.",
)

preds = self.predict(series)

return eval_accuracy_from_binary_prediction(
list_actual_anomalies, preds, window, metric
)


class NonFittableAggregator(Aggregator):
"Base class of Aggregators that do not need training."

def __init__(self) -> None:
super().__init__()

# indicates if the Aggregator is trainable or not
self.trainable = False

def predict(
self, series: Union[TimeSeries, Sequence[TimeSeries]]
) -> Union[TimeSeries, Sequence[TimeSeries]]:
"""Aggregates the (sequence of) multivariate binary series given as
input into a (sequence of) univariate binary series.
Parameters
----------
series
The (sequence of) multivariate binary series to aggregate
Returns
-------
TimeSeries
(Sequence of) aggregated results
"""
list_series = self._check_input(series)

if isinstance(series, TimeSeries):
return self._predict_core(list_series)[0]
else:
return self._predict_core(list_series)


class FittableAggregator(Aggregator):
"Base class of Aggregators that do need training."

def __init__(self) -> None:
super().__init__()

# indicates if the Aggregator is trainable or not
self.trainable = True

# indicates if the Aggregator has been trained yet
self._fit_called = False

def _assert_fit_called(self):
"""Checks if the Aggregator has been fitted before calling its `score()` function."""

raise_if_not(
self._fit_called,
f"The Aggregator {self.__str__()} has not been fitted yet. Call `fit()` first.",
)

def fit(
self,
actual_anomalies: Union[TimeSeries, Sequence[TimeSeries]],
series: Union[TimeSeries, Sequence[TimeSeries]],
):
"""Fit the aggregators on the (sequence of) multivariate binary series.
If a list of series is given, they must have the same number of components.
Parameters
----------
actual_anomalies
The (sequence of) ground truth of the anomalies (1 if it is an anomaly and 0 if not)
series
The (sequence of) multivariate binary series
"""
list_series = self._check_input(series)
self.width_trained_on = list_series[0].width

raise_if_not(
all([s.width == self.width_trained_on for s in list_series]),
"all series in `list_series` must have the same number of components.",
)

list_actual_anomalies = _to_list(actual_anomalies)

raise_if_not(
all([isinstance(s, TimeSeries) for s in list_actual_anomalies]),
"all series in `actual_anomalies` must be of type TimeSeries.",
)

raise_if_not(
all([s.is_deterministic for s in list_actual_anomalies]),
"all series in `actual_anomalies` must be deterministic (width=1).",
)

raise_if_not(
all([s.width == 1 for s in list_actual_anomalies]),
"all series in `actual_anomalies` must be univariate (width=1).",
)

raise_if_not(
len(list_actual_anomalies) == len(list_series),
"`actual_anomalies` and `series` must contain the same number of series.",
)

same_intersection = list(
zip(
*[
[anomalies.slice_intersect(series), series.slice_intersect(series)]
for (anomalies, series) in zip(list_actual_anomalies, list_series)
]
)
)
list_actual_anomalies = list(same_intersection[0])
list_series = list(same_intersection[1])

ret = self._fit_core(list_actual_anomalies, list_series)
self._fit_called = True
return ret

def predict(
self, series: Union[TimeSeries, Sequence[TimeSeries]]
) -> Union[TimeSeries, Sequence[TimeSeries]]:
"""Aggregates the (sequence of) multivariate binary series given as
input into a (sequence of) univariate binary series.
Parameters
----------
series
The (sequence of) multivariate binary series to aggregate
Returns
-------
TimeSeries
(Sequence of) aggregated results
"""
self._assert_fit_called()
list_series = self._check_input(series)

raise_if_not(
all([s.width == self.width_trained_on for s in list_series]),
"all series in `series` must have the same number of components as the data"
+ " used for training the detector model, number of components in training:"
+ f" {self.width_trained_on}.",
)

if isinstance(series, TimeSeries):
return self._predict_core(list_series)[0]
else:
return self._predict_core(list_series)
25 changes: 25 additions & 0 deletions darts/ad/aggregators/and_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
AND Aggregator
--------------
Aggregator that identifies a time step as anomalous if all the components
are flagged as anomalous (logical AND).
"""

from typing import Sequence

from darts import TimeSeries
from darts.ad.aggregators.aggregators import NonFittableAggregator


class AndAggregator(NonFittableAggregator):
def __init__(self) -> None:
super().__init__()

def __str__(self):
return "AndAggregator"

def _predict_core(self, series: Sequence[TimeSeries]) -> Sequence[TimeSeries]:
return [
s.sum(axis=1).map(lambda x: (x >= s.width).astype(s.dtype)) for s in series
]

0 comments on commit 67edbeb

Please sign in to comment.