# Unsupervised Price Anomaly Detection using R


In [None]:
import numpy as np
import pandas as pd
import datetime as dt

import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# first dataset
df1 = pd.read_csv('../input/benchmark-labeled-anomaly-detection-ts/cpu4.csv')
df1 = df1.sort_values(by='timestamp', ascending=True)
df1 = df1.replace({'label': {0.0: False, 1.0: True}})

# second dataset
df2 = pd.read_csv('../input/benchmark-labeled-anomaly-detection-ts/g.csv')
df2 = df2.sort_values(by='timestamp', ascending=True)
df2 = df2.replace({'label': {0.0: False, 1.0: True}})

df1.head(3)

## Frequency of Anomalies



In [None]:
print('Frequencies:')
print(df1.label.value_counts(normalize=True), '\n')
print(df2.label.value_counts(normalize=True))

## Anomalies in the Dataset



In [None]:
plt.figure(figsize=(12,4))
plt.title('Dataset 1')
sns.scatterplot(data=df1, x="timestamp", y="value", hue="label")
plt.show()

plt.figure(figsize=(12,4))
plt.title('Dataset 2')
sns.scatterplot(data=df2, x="timestamp", y="value", hue="label")
plt.show()

In [None]:
df1_zoom = df1[800:1100]
df1_zoom_exp = df1[300:1800]
plt.figure(figsize=(12,4))
sns.scatterplot(data=df1_zoom, x="timestamp", y="value", hue="label")
plt.show()

df2_zoom = df2[1900:2200]
plt.figure(figsize=(12,4))
sns.scatterplot(data=df2_zoom, x="timestamp", y="value", hue="label")
plt.show()

In [None]:
def plot_anomalies(df, algorithm, parameters, dumping=False, casting=None):
    '''Plot the Streaming Data (an Anomalies)'''
    Y = df.value
    X = df.timestamp
    X_pred = df.timestamp if casting is None else X.apply(casting)
    # predict anomalies
    model = algorithm(**parameters)
    preds = [model.detect(i, v, dumping=True) for i, v in zip(X_pred, Y)]
    pred, values, stds = tuple(zip(*preds))
    # plot the results
    plt.figure(figsize=(12,4))
    model_name = algorithm.__name__
    plt.title(f'Anomaly Detection - {model_name}')
    af  = pd.DataFrame(data={'x':X, 'value':Y, 'pred':pred})
    af2 = pd.DataFrame(data={'x':X, 'value':values, 'pred':pred, 'std': stds})
    af2['ymin'] = af2['value'] - af2['std']
    af2['ymax'] = af2['value'] + af2['std']
    size = (af.pred.astype(int)+1) * 40
    sns.scatterplot(data=af, x='x', y='value', hue='pred', s=size)
    if dumping: plt.fill_between(af2.x, af2.ymin, af2.ymax, facecolor='green', alpha=0.2)
    plt.show()

<a id="ma"></a>
## Moving Average


In [None]:
class StreamingMovingAverage:
    '''Moving Average algorithm'''
    # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.rolling.html

    def __init__(self, threshold=1.0) -> None:
        # Parameters
        self.max_deviation_from_expected = threshold
        self.min_nof_records_in_model = 3
        self.max_nof_records_in_model = 3 * self.min_nof_records_in_model

    def detect(self, timestamp: int, value: float, dumping: bool=False) -> bool:
        '''Detect if is a Anomaly'''
        self._update_state(timestamp, value)
        expected_value = self._expected_value(timestamp)
        # is there enough data and is not NaN value
        response, curr_value, deviation = False, value, 0.0
        if self._enough_data() and not np.isnan(expected_value):
            # is the value out of the boundary? when it decrease
            curr_value = expected_value
            deviation = self._standard_deviation() * self.max_deviation_from_expected
            # when it is higher than expected
            if expected_value + deviation < value or\
               expected_value - deviation > value:
                response = True
        # dumping or not
        if dumping: return (response, curr_value, deviation)
        else: return response

    def _update_state(self, timestamp: int, value: float) -> None:
        '''Update the model state'''
        # check if it is the first time the model is run or if there is a big interval between the timestamps
        if not hasattr(self, 'previous_timestamp'):
            self._init_state(timestamp)
        # update the model state
        self.previous_timestamp = timestamp
        self.data_streaming.append(value)
        # is there a lot of data? remove one record
        if len(self.data_streaming) > self.max_nof_records_in_model:
            self.data_streaming.pop(0)

    def _init_state(self, timestamp: int) -> None:
        '''Reset the parameters'''
        self.previous_timestamp = timestamp
        self.data_streaming = list()

    def _enough_data(self) -> bool:
        '''Check if there is enough data'''
        return len(self.data_streaming) >= self.min_nof_records_in_model

    def _expected_value(self, timestamp: int) -> float:
        '''Return the expected value'''
        data = self.data_streaming
        data = pd.Series(data=data, dtype=float)
        many = self.min_nof_records_in_model
        return data.rolling(many, min_periods=1).mean().iloc[-1]

    def _standard_deviation(self) -> float:
        '''Return the standard deviation'''
        data = self.data_streaming
        return np.std(data, axis=0)

    def get_state(self) -> dict:
        '''Get the state'''
        self_dict = {key: value for key, value in self.__dict__.items()}
        return pickle.dumps(self_dict, 4)

    def set_state(self, state) -> None:
        '''Set the state'''
        _self = self
        ad = pickle.loads(state)
        for key, value in ad.items():
            setattr(_self, key, value)

# Example
# ad = StreamingMovingAverage(threshold=1.5)
# ad.detect(timestamp=123, value=10)
# ad.detect(timestamp=124, value=7)

In [None]:
parameters = {'threshold': 1.5}
plot_anomalies(df1_zoom, StreamingMovingAverage, parameters, dumping=True)

parameters = {'threshold': 1.5}
plot_anomalies(df2_zoom, StreamingMovingAverage, parameters, dumping=True)

-----
<a id="ema"></a>

## Exponential Moving Average



In [None]:
class StreamingExponentialMovingAverage(StreamingMovingAverage):
    '''Exponential Weighted Moving Average (EWMA) algorithm'''
    # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.ewm.html

    def __init__(self, threshold=1.0, alpha=0.3) -> None:
        super().__init__()
        # Parameters
        self.max_deviation_from_expected = threshold
        self.alpha = alpha

    def _enough_data(self) -> bool:
        '''Check if there is enough data'''
        return len(self.data_streaming) > 0

    def _expected_value(self, timestamp: int) -> float:
        '''Return the expected value'''
        data = self.data_streaming
        data = pd.Series(data=data, dtype=float)
        return data.ewm(alpha=self.alpha, adjust=True).mean().iloc[-1]

# Example
# ad = StreamingExponentialMovingAverage(threshold=1.5, alpha=0.45)
# ad.detect(timestamp=123, value=10)
# ad.detect(timestamp=124, value=7)

In [None]:
parameters = {'threshold': 1.5, 'alpha': 0.45}
plot_anomalies(df1_zoom, StreamingExponentialMovingAverage, parameters, dumping=True)

parameters = {'threshold': 1.5, 'alpha': 0.45}
plot_anomalies(df2_zoom, StreamingExponentialMovingAverage, parameters, dumping=True)

-----
<a id="sma"></a>

## Seasonal Moving Average



In [None]:
import numpy as np
import pandas as pd
import datetime as dt

from typing import Union

MINUTES_HOUR = 60 # minutes per hour
MINUTES_DAY = 24 * MINUTES_HOUR # minutes per day
NUMBER_DAYS_WEEK = 7 # number days a week


class AnomalyDetectionSeasonalBucket(StreamingMovingAverage):

    def __init__(self, min_buckets: int, window: int=4, min_value: int=10, threshold: float=2) -> None:
        '''Min number of messages is 1 per minute'''
        super().__init__()
        # parameters
        self.min_buckets = min_buckets
        self._num_bucket = int(np.floor(MINUTES_DAY / min_buckets))
        self.window_size = window
        self.min_value = min_value
        self.max_deviation_from_expected = threshold

    def _get_cday_cbucket(self, timestamp: dt.datetime) -> tuple:
        '''Get cday, cbucket values from timestamp'''
        # compute bucket indexes
        cday, chour, cminute = timestamp.weekday(), timestamp.hour, timestamp.minute
        cbucket = int(np.floor((chour * MINUTES_HOUR + cminute) / self.min_buckets))
        return cday, cbucket
    
    def _create_copy_data(self, timestamp: dt.datetime) -> np.array:
        '''Get data based on timestamp'''
        # compute bucket indexes, get data
        cday, cbucket = self._get_cday_cbucket(timestamp = timestamp)
        data = np.copy(self._buckets[cday, cbucket])
        data[data == 0.0] = np.nan
        return data
    
    def _update_state(self, timestamp: dt.datetime, value: float) -> None:
        '''Update the model state'''
        # check if it is the first run
        if not hasattr(self, 'previous_timestamp'):
            self._init_state(timestamp)
        # compute bucket indexes
        cday, cbucket = self._get_cday_cbucket(timestamp = timestamp)
        # shift values, empty days
        pass_days = int(np.ceil((timestamp - self._buckets_timestamp[cday, cbucket]).days / NUMBER_DAYS_WEEK))
        self._buckets[cday, cbucket] = np.roll(self._buckets[cday, cbucket], pass_days)
        self._buckets[cday, cbucket, 0:pass_days] = 0
        # update the model state
        self.previous_timestamp = timestamp
        self._buckets_timestamp[cday, cbucket] = timestamp
        self._buckets[cday, cbucket, 0] = self._buckets[cday, cbucket, 0] + value

    def _init_state(self, timestamp: dt.datetime) -> None:
        '''Reset the parameters'''
        self.previous_timestamp = timestamp
        self._buckets = np.zeros((NUMBER_DAYS_WEEK, self._num_bucket, self.window_size))
        self._buckets_timestamp = np.full((NUMBER_DAYS_WEEK, self._num_bucket), timestamp, dtype=dt.datetime)

    def _expected_value(self, timestamp: dt.datetime) -> float:
        '''Return the expected value'''
        data = self._create_copy_data(timestamp = timestamp)
        return np.nanmean(data)

    def _enough_data(self) -> bool:
        '''Check if there is enough data'''
        data = self._create_copy_data(timestamp = self.previous_timestamp)
        records = self.window_size - np.sum(np.isnan(data))
        return records >= min(self.min_nof_records_in_model, self.window_size)

    def _standard_deviation(self) -> float:
        '''Return the standard deviation'''
        # compute bucket indexes, get data
        data = self._create_copy_data(timestamp = self.previous_timestamp)
        return np.nanstd(data)
    
# Example
# ad = AnomalyDetectionSeasonalBucket(min_buckets=60, window=5)
# ad.detect(timestamp=dt.datetime(2023, 1, 23, 23, 40), value=10)
# ad.detect(timestamp=dt.datetime(2023, 1, 30, 23, 40), value=7)

In [None]:
parameters = {'min_buckets': 30, 'window': 2}
plot_anomalies(df1_zoom, AnomalyDetectionSeasonalBucket, parameters, dumping=True, casting=dt.datetime.fromtimestamp)

parameters = {'min_buckets': 30, 'window': 2}
plot_anomalies(df2_zoom, AnomalyDetectionSeasonalBucket, parameters, dumping=True, casting=dt.datetime.fromtimestamp)

In [None]:
parameters = {'min_buckets': 30, 'window': 2}
plot_anomalies(df1.iloc[0:2050], AnomalyDetectionSeasonalBucket, parameters, dumping=True, casting=dt.datetime.fromtimestamp)

-----
<a id="arima"></a>

## Auto Regressive Integrated Moving Average - ARIMA

**ARIMA** is an acronym that stands for AutoRegressive Integrated Moving Average. It is a generalization of the simpler AutoRegressive Moving Average and adds the notion of integration. ARIMA is capable of predict values according to its previous observations. Further, we simple check if the new record is far from the expected value. The expected value range is computed using the formula `ARIMA + standard deviation * threshold`; if it is out of the expected value range, we report as an anomaly.


In [None]:
%%capture
!pip install statsmodels==0.13.1

In [None]:
import warnings
from statsmodels.tsa.arima.model import ARIMA

warnings.filterwarnings('ignore')

class StreamingARIMA(StreamingMovingAverage):
    '''ARIMA algorithm'''

    def __init__(self, threshold=1.0, order:tuple=(2, 1, 2)) -> None:
        # Parameters
        self.order = order
        self.model = None
        self.res = None
        self.max_deviation_from_expected = threshold
        self.min_nof_records_in_model = 10
        self.max_nof_records_in_model = 3 * self.min_nof_records_in_model

    def _expected_value(self, timestamp: int) -> float:
        '''Return the expected value'''
        if self._enough_data():
            data = self.data_streaming
            self.model = ARIMA(data, order=self.order)
            self.res = self.model.fit()
            output = self.res.forecast()
            return output[0]
        return np.nan

    def _standard_deviation(self) -> float:
        '''Return the standard deviation'''
        much = self.min_nof_records_in_model
        data = self.data_streaming
        if len(data) > much:
            data = data[-much:]
        return np.std(data, axis=0)

    def summary(self) -> None:
        '''Print the ARIMA summary'''
        if pd.notnull(self.res):
            print(self.res.summary())

In [None]:
from math import sqrt
from sklearn.metrics import mean_squared_error

# walk-forward validation
X = df1_zoom.iloc[0:50].value.tolist()
tests, predictions = list(), list()
model = StreamingARIMA()

for t in range(len(X)):
    obs = X[t]
    model.detect(t, obs)
    yhat = model._expected_value(0)
    if pd.notna(yhat):
        predictions.append(yhat)
        tests.append(obs)
# evaluate forecasts
rmse = sqrt(mean_squared_error(tests, predictions))

print('Test RMSE: %.3f' % rmse)
# plot forecasts against actual outcomes
plt.plot(tests, label='data')
plt.plot(predictions, color='gray', label='ARIMA')
plt.legend()
plt.show()

In [None]:
model.summary()

In [None]:
%%time
parameters = {'threshold': 1.5, 'order': (2, 1, 2)}
plot_anomalies(df1_zoom, StreamingARIMA, parameters, dumping=True)

parameters = {'threshold': 1.5, 'order': (2, 1, 2)}
plot_anomalies(df2_zoom, StreamingARIMA, parameters, dumping=True)

<a id="conclusion"></a>

---
# Conclusion

(1) We can see that **Exponential Moving Average** smooths the prediction better than the simple **Moving Average**. For the first dataset, the algorithm was able to detect the exact point of the anomaly and quickly adjust your prediction behaviour, avoiding multiples "sequential anomaly alerts".


In [None]:
parameters = {'threshold': 1.5}
plot_anomalies(df1_zoom, StreamingMovingAverage, parameters, dumping=True)

parameters = {'threshold': 1.5, 'alpha': 0.45}
plot_anomalies(df1_zoom, StreamingExponentialMovingAverage, parameters, dumping=True)

(2) Again, we can see that **Exponential Moving Average** smooths the prediction better than the simple **Moving Average**. However, the second dataset was more difficult than the first one, due to the high variation in the data. Anyway, the algorithm was able to detect the anomaly point, also produce a few false negative.


In [None]:
parameters = {'threshold': 1.5}
plot_anomalies(df2_zoom, StreamingMovingAverage, parameters, dumping=True)

parameters = {'threshold': 1.5, 'alpha': 0.45}
plot_anomalies(df2_zoom, StreamingExponentialMovingAverage, parameters, dumping=True)

(3) the **ARIMA** seems to be promising. However it still underperforming compared to **Exponential Moving Average**.

In [None]:
parameters = {'threshold': 1.5, 'alpha': 0.45}
plot_anomalies(df1_zoom, StreamingExponentialMovingAverage, parameters, dumping=True)

parameters = {'threshold': 1.5, 'order': (4, 1, 2)}
plot_anomalies(df1_zoom, StreamingARIMA, parameters, dumping=True)

(4) **Seasonal Moving Average** is a time-based algorithm that relies on previous days to make predictions. It did not fit for these two sample use cases because it is small amount of non-seasonal data.

(5) In summary, it is important to use the appropriate algorithm for the type of data being analyzed. In the case of non-seasonal data, algorithms such as the **Exponential Moving Average** or the **Holt-Winters** algorithm may be more suitable for anomaly detection. These algorithms are able to adapt to changes in the data and do not rely on seasonal patterns for their analysis. Related to seasonal data, you can explore **Seasonal Moving Average** or **Seasonal Exponential Moving Average**.