In [1]:
%load_ext autoreload
%autoreload 2

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns

%config InlineBackend.figure_format = 'retina'
plt.rcParams["figure.figsize"] = 8, 5
plt.rcParams["font.size"] = 12
plt.rcParams["savefig.format"] = "pdf"
sns.set_style("darkgrid")

Для удобства я оберну все шаги детектора аномалий в отдельные функции, в конце они будут применены целиком к разным рядам

## Считываем временной ряд

Формате хранения рядов: 
- timestamp (`int64`): Unix Timestamp (milliseconds)
- value_0 (`float64`): значения временного ряда

In [2]:
import pandas as pd


raw_time_series = pd.read_csv("time_series/time_series_1.csv")
raw_time_series.head()

Unnamed: 0,timestamp,value_0
0,1769787300000,7312.0
1,1769787600000,7242.6
2,1769787900000,7121.0
3,1769788200000,6990.2
4,1769788500000,6807.8


## Предобработка

Переведём временные метки в datetime, проверим на пропуски (посчитаем число уникальных интервалов между двумя последовательными точками)

In [3]:
pd.to_datetime(raw_time_series["timestamp"], unit="ms").diff()[1:].value_counts()

timestamp
0 days 00:05:00    4012
0 days 00:10:00       4
0 days 00:15:00       2
0 days 00:20:00       1
Name: count, dtype: int64

Видим, что очновная частота (гранулярность) - 5 мин., но есть интервалы в 10, 15 и 20 мин. (пропуски в данных). Математические модели ожидают на вход временной ряд с равномерной гранулярностью, поэтому нам надо восстановить её.

In [4]:
def get_granularity(time_series: pd.DataFrame):
    diffs = time_series.index.to_series().diff().dropna()
    non_zero_diffs = diffs[diffs > pd.Timedelta(0)]

    if non_zero_diffs.empty:
        granularity = "5min"
    else:
        granularity = non_zero_diffs.value_counts().index[0]
    return granularity


def preprocess_time_series(raw_time_series: pd.DataFrame) -> pd.DataFrame:
    time_series = raw_time_series.dropna(how="all")
    time_series["timestamp"] = pd.to_datetime(time_series["timestamp"], unit="ms")
    time_series = time_series.set_index("timestamp", drop=True)
    granularity = get_granularity(time_series)
    resampled = time_series.resample(granularity).mean().interpolate()
    return resampled


time_series = preprocess_time_series(raw_time_series)
time_series.head()

Unnamed: 0_level_0,value_0
timestamp,Unnamed: 1_level_1
2026-01-30 15:35:00,7312.0
2026-01-30 15:40:00,7242.6
2026-01-30 15:45:00,7121.0
2026-01-30 15:50:00,6990.2
2026-01-30 15:55:00,6807.8


Визуализируем получившийся ряд. График интерактивный, можете покрутить

In [5]:
from plots import plot_time_series

plot_time_series(time_series)

## Преобразования

Рассмотрим применение нескольких распространенных преобразований:
- Resample - ...
- Normalization - ...
- Smoothing - ...

In [6]:
def resample(time_series: pd.DataFrame, granularity: str = "5min") -> pd.DataFrame:
    resampled = time_series.resample(granularity).mean().interpolate()
    return resampled


def normalize(time_series: pd.DataFrame) -> pd.DataFrame:
    return (time_series - time_series.mean()) / (time_series.std() + 1e-7)


def smooth(time_series: pd.DataFrame, n_steps: int = 1) -> pd.DataFrame:
    return time_series.rolling(window=n_steps, min_periods=1).mean()


resampled = resample(time_series, "1h")
normalized = normalize(resampled)
smoothed = smooth(normalized, 5)

plot_time_series(smoothed, title="Transformed Time Series")

## Детекция Аномалий

Теперь рассмотрим как применять детекторы аномалий. Для этого используем z_scores


### Основная идея
Строим прогнозирование временного ряда в каждой точке по предыдущим значениям и анализируем отклонения (остатки)

### Прогнозирующая модель
$\hat{y}_t$ — прогноз модели в точке $t$ на основе предыдущих $p$ значений

### Расчёт остатков
$$r_t = y_t - \hat{y}_t$$

### Z-score для детекции аномалий
$$z_t = \frac{|r_t|}{\sigma_r}$$

где $\sigma_r$ — стандартное отклонение остатков. Для устойчивости его можно оценить через MAD (абсолютное медианное отклонение)

### Статистический тест
- **Гипотеза**: остатки распределены нормально $r_t \sim \mathcal{N}(0, \sigma_r^2)$
- **Правило**: точка аномальна, если $z_t > \text{threshold}$ (по умолчанию 3)
- **Интерпретация**: значение выходит за пределы 3-сигма интервала (99.7% вероятности для нормального распределения)

### Доверительные границы
$$[\hat{y}_t - \text{threshold} \cdot \sigma_r, \quad \hat{y}_t + \text{threshold} \cdot \sigma_r]$$
***


модель должна возвращать ...

In [7]:
from dataclasses import dataclass
import numpy as np
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.tsatools import detrend


@dataclass
class DetectionResult:
    is_anomaly: np.array
    anomaly_scores: np.array
    expected_values: np.array
    expected_bounds: np.array


def autoreg_detector(
    time_series: pd.DataFrame, threshold: float = 3.0, order: int = 20
) -> DetectionResult:
    time_series_values = time_series["value_0"]
    model = AutoReg(time_series_values, lags=order)
    model_fit = model.fit()
    prediction = np.concatenate((time_series_values[:order], model_fit.fittedvalues))

    residuals = time_series_values - prediction
    residuals_std = np.sqrt(np.mean(residuals**2)) + 1e-8  # assume that E(residuals) = 0

    z_scores = np.abs(residuals / residuals_std)
    expected_bounds = np.column_stack(
        (
            prediction - residuals_std * threshold,
            prediction + residuals_std * threshold,
        )
    )

    return DetectionResult(
        is_anomaly=(z_scores > threshold),
        anomaly_scores=z_scores,
        expected_values=prediction,
        expected_bounds=expected_bounds,
    )

In [8]:
THRESHOLD = 5.0

detection_result = autoreg_detector(time_series, threshold=THRESHOLD)

отрисуем чё вышло

In [9]:
from plots import add_anomalies


fig = plot_time_series(time_series)
add_anomalies(
    fig,
    time_series,
    detection_result.is_anomaly,
    detection_result.expected_values,
    detection_result.expected_bounds,
)

## Постобработка

Последний штрих. Если мы попробуем записать полученные предсказания в таблицу оригинального временного ряда, то у нас ничего не получится. Мы обрабатывали оригинальный временной ряд, добавляли туда пропущенные значения, поэтому его форма может отличаться от оригинальной. Надо интерполировать предсказания в оригинальную форму. 

In [10]:
print("Length of detection result:", detection_result.expected_values.shape[0])
print("Length of processed time series:", time_series.shape[0])
print("Length of original time series:", raw_time_series.shape[0])

Length of detection result: 4031
Length of processed time series: 4031
Length of original time series: 4020


In [11]:
def interpolate_detection_result(
    detection_result: DetectionResult,
    original_time_series: pd.DataFrame,
    processed_time_series: pd.DataFrame,
) -> tuple[np.ndarray, np.ndarray]:
    anomaly_scores = np.interp(
        original_time_series.index.astype(int),
        processed_time_series.index.astype(int),
        detection_result.anomaly_scores,
    )

    expected_values = np.interp(
        original_time_series.index.astype(int),
        processed_time_series.index.astype(int),
        detection_result.expected_values,
    )

    expected_bounds = np.column_stack(
        [
            np.interp(
                original_time_series.index.astype(int),
                processed_time_series.index.astype(int),
                detection_result.expected_bounds[:, i],
            )
            for i in range(2)
        ]
    )

    return anomaly_scores, expected_values, expected_bounds


anomaly_scores, expected_values, expected_bounds = interpolate_detection_result(
    detection_result,
    raw_time_series.set_index(pd.to_datetime(raw_time_series["timestamp"], unit="ms")),
    time_series,
)

is_anomaly = anomaly_scores > THRESHOLD

In [12]:
print("Length of detection result:", expected_values.shape[0])
print("Length of processed time series:", time_series.shape[0])
print("Length of original time series:", raw_time_series.shape[0])

Length of detection result: 4020
Length of processed time series: 4031
Length of original time series: 4020


Топчик. Теперь результаты соответствуют оригинальному временному ряду, который пришёл к нам as is

## Пайплайн целиком

прогоним пайплайн на других временных рядах

In [13]:
def anomaly_detection_pipeline(time_series_path: str):
    raw_time_series = pd.read_csv(time_series_path)
    time_series = preprocess_time_series(raw_time_series)
    detection_result = autoreg_detector(time_series, threshold=5)

    fig = plot_time_series(time_series)
    add_anomalies(
        fig,
        time_series,
        detection_result.is_anomaly,
        detection_result.expected_values,
        detection_result.expected_bounds,
    )
    fig.show()

    anomaly_scores, expected_values, expected_bounds = interpolate_detection_result(
        detection_result,
        raw_time_series.set_index(
            pd.to_datetime(raw_time_series["timestamp"], unit="ms")
        ),
        time_series,
    )

    is_anomaly = anomaly_scores > THRESHOLD

    return {
        "anomaly_scores": anomaly_scores,
        "expected_values": expected_values,
        "expected_bounds": expected_bounds,
        "is_anomaly": is_anomaly,
    }

In [14]:
import os

for root, dirs, files in os.walk("time_series"):
    for file in files:
        if file.endswith(".csv"):
            file_path = os.path.join(root, file)
            anomaly_detection_pipeline(file_path)

## Дополнительно: сезональный детектор аномалий

In [None]:
def seasonal_component(series, period):
    series = np.asarray(series)
    n_periods = len(series) // period
    trimmed = series[: n_periods * period]
    reshaped = trimmed.reshape(-1, period)
    seasonality = np.median(reshaped, axis=0)
    repeated = np.tile(seasonality, n_periods + 1)[: len(series)]
    return repeated


def seasonal_detector(
    time_series: pd.DataFrame, lag: str = "1w", threshold: float = 3.0
) -> DetectionResult:
    time_diff = time_series.index[1] - time_series.index[0]
    lag_timedelta = pd.Timedelta(lag)
    lag_points = int(lag_timedelta / time_diff)

    values = time_series["value_0"].values
    detrended_values = detrend(values, order=1)
    residuals = detrended_values - seasonal_component(
        detrended_values, lag_points
    )

    residuals_std = np.sqrt(np.mean(residuals**2))
    z_scores = np.abs(residuals / residuals_std)
    expected_values = values - residuals
    expected_bounds = np.column_stack(
        (
            expected_values - residuals_std * threshold,
            expected_values + residuals_std * threshold,
        )
    )

    return DetectionResult(
        anomaly_scores=z_scores,
        is_anomaly=(z_scores > threshold),
        expected_values=expected_values,
        expected_bounds=expected_bounds,
    )


raw_time_series = pd.read_csv("time_series/time_series_3.csv")
time_series = preprocess_time_series(raw_time_series)
detection_result = seasonal_detector(time_series, threshold=3)

fig = plot_time_series(time_series)
add_anomalies(
    fig,
    time_series,
    detection_result.is_anomaly,
    detection_result.expected_values,
    detection_result.expected_bounds,
)
fig.show()