# Задача: построение системы алертов

## Цель задачи

Реализовать автоматизированную **систему алертов**, которая каждые **15 минут** будет проверять ключевые метрики приложения и **уведомлять в Telegram** в случае обнаружения аномалий.

---

## Ключевые метрики для мониторинга

Следующие метрики должны проверяться каждые 15 минут:

| Метрика | Описание |
|--------|----------|
| **DAU ленты** | Число уникальных пользователей новостной ленты за 15-минутный интервал |
| **DAU мессенджера** | Число уникальных пользователей мессенджера за 15-минутный интервал |
| **Просмотры** | Общее количество просмотров постов |
| **Лайки** | Общее количество лайков |
| **CTR** | Отношение лайков к просмотрам |
| **Количество отправленных сообщений** | Число отправленных сообщений за период |

---

## Как определять аномалии?

### Основные подходы:
- **Сравнение с предыдущим днём:**  
  Сравнивать текущее значение метрики с аналогичным 15-минутным интервалом вчера.  
  Например: если сегодня в 10:00 было 5000 просмотров, а вчера в 10:00 — 10000, можно считать это аномалией (в зависимости от порога).

- **Скользящее среднее / стандартное отклонение:**  
  Использовать среднее значение метрики за последние N дней и сигнализировать, если текущее значение выходит за границы доверительного интервала.

- **Seasonal decomposition (STL, SARIMA и т.д.):**  
  Для более сложных случаев можно использовать разложение временного ряда и анализ остатков.

В моем проекте я использую метод трех сигм и IQR

файл формата py

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import telegram
import pandahouse as ph
from datetime import date
import io
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from airflow.operators.python import get_current_context
# Подключение к ClickHouse
connection = {
    'host': '*****',
    'password': '*****',
    'user': '*****',
    'database': '*****'
}

default_args = {
    'owner': 'm_kruzhalin',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=2),
    'start_date': datetime(2025, 7, 5),
}

schedule_interval = '*/15 * * * *'

# Метод 3 сигмы
def check_anomaly_3q(df, metric, a=3, n=5):
    df = df.copy()
    df['mean'] = df[metric].shift(1).rolling(n).mean()
    df['std'] = df[metric].shift(1).rolling(n).std()

    df['mean'] = df['mean'].rolling(n, center=True, min_periods=1).mean()
    df['std'] = df['std'].rolling(n, center=True, min_periods=1).mean()

    df['up'] = df['mean'] + a * df['std']
    df['low'] = df['mean'] - a * df['std']

    current_value = df[metric].iloc[-1]
    upper_bound = df['up'].iloc[-1]
    lower_bound = df['low'].iloc[-1]

    is_alert = 1 if (current_value < lower_bound or current_value > upper_bound) else 0

    return is_alert, df

def check_anomaly_IQR(df, metric, a=3, n=5):
    df = df.copy()
    df['q25'] = df[metric].shift(1).rolling(n).quantile(0.25)
    df['q75'] = df[metric].shift(1).rolling(n).quantile(0.75)
    df['iqr'] = df['q75'] - df['q25']
    df['up'] = df['q75'] + a * df['iqr']
    df['low'] = df['q25'] - a * df['iqr']

    df['up'] = df['up'].rolling(n, center=True, min_periods=1).mean()
    df['low'] = df['low'].rolling(n, center=True, min_periods=1).mean()

    current_value = df[metric].iloc[-1]
    upper_bound = df['up'].iloc[-1]
    lower_bound = df['low'].iloc[-1]

    is_alert = 1 if (current_value < lower_bound or current_value > upper_bound) else 0

    return is_alert, df

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def maximkruzhalin_alert_bot():
    
    @task()
    def run_alerts(chat=None):
        chat_id = chat or '*****'
        bot = telegram.Bot(token='*****')

        q = """
        SELECT
            toStartOfFifteenMinutes(f.time) AS ts,
            toDate(ts) AS date,
            formatDateTime(ts, '%R') AS hm,
            uniqExact(f.user_id) AS users_feed,
            countIf(f.user_id, f.action = 'view') AS views,
            countIf(f.user_id, f.action = 'like') AS likes,
            (likes / views) AS ctr,
            sum(m.messages_per_user) AS messages
        FROM simulator_20250520.feed_actions f
        LEFT JOIN (
            SELECT
                user_id,
                toStartOfFifteenMinutes(time) AS ts,
                count(*) AS messages_per_user
            FROM simulator_20250520.message_actions
            WHERE time >= today() - 1 AND time < toStartOfFifteenMinutes(now())
            GROUP BY user_id, ts
        ) m ON f.user_id = m.user_id AND toStartOfFifteenMinutes(f.time) = m.ts
        WHERE f.time >= today() - 1 AND f.time < toStartOfFifteenMinutes(now())
        GROUP BY ts, date, hm
        ORDER BY ts
        """

        data = ph.read_clickhouse(q, connection=connection)

        metrics_list = ['users_feed', 'views', 'likes', 'ctr', 'messages']

        for metric in metrics_list:
            df_metric = data[['ts', 'date', 'hm', metric]].copy()

            is_alert_3q, df_3q = check_anomaly_3q(df_metric, metric)
            is_alert_iqr, df_iqr = check_anomaly_IQR(df_metric, metric)

            if is_alert_3q or is_alert_iqr:
                last_val = df_metric[metric].iloc[-2] if len(df_metric) >= 2 else 0
                current_val = df_metric[metric].iloc[-1]

                try:
                    diff_percent = abs(current_val / last_val - 1)
                except ZeroDivisionError:
                    diff_percent = float('inf')

                msg = f'''🚨 Аномалия обнаружена!
    Метрика: {metric}
    Текущее значение: {current_val:.2f}
    Предыдущее значение: {last_val:.2f}
    Отклонение: {diff_percent:.2%}'''

                sns.set(rc={'figure.figsize': (16, 10)})
                plt.figure(figsize=(16, 10))
                ax = sns.lineplot(x=df_metric['ts'], y=df_metric[metric], label=metric)
                sns.lineplot(x=df_metric['ts'], y=df_3q['up'], label='Верхняя граница (3σ)')
                sns.lineplot(x=df_metric['ts'], y=df_3q['low'], label='Нижняя граница (3σ)')
                sns.lineplot(x=df_metric['ts'], y=df_iqr['up'], label='Верхняя граница (IQR)')
                sns.lineplot(x=df_metric['ts'], y=df_iqr['low'], label='Нижняя граница (IQR)')

                ax.set_title(metric)
                ax.set_xlabel('Время')
                ax.set_ylabel(metric)
                ax.set_ylim(0, None)

                for ind, label in enumerate(ax.get_xticklabels()):
                    if ind % 2 == 0:
                        label.set_visible(True)
                    else:
                        label.set_visible(False)

                plt.tight_layout()

                plot_object = io.BytesIO()
                plt.savefig(plot_object)
                plot_object.seek(0)
                plt.close()

                bot.send_message(chat_id=chat_id, text=msg)
                bot.send_photo(chat_id=chat_id, photo=plot_object)

        return

    run_alerts()

maximkruzhalin_alert_bot = maximkruzhalin_alert_bot()

Пример сообщения в телеграм:

🚨 Аномалия обнаружена!
    Метрика: messages
    Текущее значение: 15.00
    Предыдущее значение: 21.00
    Отклонение: 28.57%
    
![tg_bot3](screenshots/alert3.jpg)

🚨 Аномалия обнаружена!
    Метрика: users_feed
    Текущее значение: 491.00
    Предыдущее значение: 474.00
    Отклонение: 3.59%

![tg_bot3_1](screenshots/alert2.jpg)

🚨 Аномалия обнаружена!
    Метрика: ctr
    Текущее значение: 0.21
    Предыдущее значение: 0.21
    Отклонение: 1.87%
    
![tg_bot3_2](screenshots/alert1.jpg)