In [None]:
%load_ext autoreload
%autoreload 2

import env

import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta

# --- 1. Загрузка конфигурации ---
service = env.get_gservice()

if service:
    df_sheet = env.read_df_from_spreadsheet(service, env.SHEET_ID, env.SHEET_NAME)
    print("Данные из Google Sheets загружены")
else:
    raise ConnectionError("Не удалось подключиться к Google API")

RS_TABLE = 'incent_opex_check_universal'
RS_SCHEMA = 'ma_data'
ALERT_NAME = "05-incent.ret"

try:
    config_row = df_sheet[df_sheet['name'] == ALERT_NAME].iloc[0]
except IndexError:
    raise ValueError(f"Алерт '{ALERT_NAME}' не найден в Google Sheet")

if config_row['active_flag'] != 'Enabled':
    print(f"Алерт '{ALERT_NAME}' отключен. Пропуск.")
else:
    print(f"Запуск алерта '{ALERT_NAME}'...")

# --- 2. Парсинг параметров ---
ALERT_ACTIVE_FLAG = config_row['active_flag']
N_SIGMAS = abs(float(config_row['n_sigmas']))
MIN_INSTALLS = int(config_row['threshold_installs'])
THRESHOLD_FIXED = int(config_row['threshold_conv'])
ALERT_CATEGORY = config_row['metric_crit_category']

# Критерий формирования алертов: 'ci' или 'change'
_criteria = config_row.get('criteria', 'ci')
CRITERIA = str(_criteria).strip().lower() if pd.notna(_criteria) else 'ci'

if CRITERIA == 'change':
    _tw = config_row.get('threshold_warning', 0)
    _tc = config_row.get('threshold_crit', 0)
    THRESHOLD_WARNING_PCT = abs(float(_tw)) if pd.notna(_tw) else 0
    THRESHOLD_CRIT_PCT = abs(float(_tc)) if pd.notna(_tc) else 0
else:
    CRITERIA = 'ci'
    THRESHOLD_WARNING_PCT = 0
    THRESHOLD_CRIT_PCT = 0

# Хелпер для SQL списков
def to_sql_list(items):
    if not isinstance(items, list):
        items = [items]
    if not items:
        return "()"
    
    formatted = []
    for x in items:
        if isinstance(x, str):
            formatted.append(f"'{x}'")
        else:
            formatted.append(str(x))
            
    return f"({', '.join(formatted)})"

try:
    params = json.loads(config_row['config_json'])
    PARTNER_NAMES = params['partners']
    CONFIG_PARTNERS = to_sql_list(list(PARTNER_NAMES.keys()))
    
except json.JSONDecodeError as e:
    raise ValueError(f"Ошибка JSON в ячейке config_json: {e}")
except KeyError as e:
    raise ValueError(f"В JSON отсутствует обязательный ключ: {e}")

print(f"Настройки: N_SIGMAS={N_SIGMAS}, MIN_INSTALLS={MIN_INSTALLS}, THRESHOLD_FIXED={THRESHOLD_FIXED}")
print(f"Partners: {CONFIG_PARTNERS}")
if CRITERIA == 'change':
    print(f"Критерий: CHANGE (warning={THRESHOLD_WARNING_PCT:.1%}, crit={THRESHOLD_CRIT_PCT:.1%})")
else:
    print(f"Критерий: CI (n_sigmas={N_SIGMAS})")

# --- 3. Функции расчёта периодов ---

def get_metric_periods(maturity_days):
    """
    Возвращает периоды для расчёта метрик.
    
    Args:
        maturity_days: 4 или 8 - окно созревания метрики
    
    Returns:
        dict с ключами:
        - current_start, current_end: период current (3 дня когорт)
        - reference_start, reference_end: период reference (14 дней когорт)
    """
    today = datetime.now().date()
    
    current_end = today - timedelta(days=maturity_days)
    current_start = current_end - timedelta(days=2)
    
    reference_end = current_start - timedelta(days=1)
    reference_start = reference_end - timedelta(days=13)
    
    return {
        'current_start': current_start,
        'current_end': current_end,
        'reference_start': reference_start,
        'reference_end': reference_end
    }

# --- 4. Функции расчёта CI ---

def calc_binomial_ci(p, n, z):
    """CI для биномиального распределения."""
    p_float = pd.to_numeric(p, errors='coerce').fillna(0.0)
    n_float = pd.to_numeric(n, errors='coerce').fillna(0.0)
    numerator = p_float * (1 - p_float)
    variance = np.divide(numerator, n_float, out=np.zeros_like(p_float), where=n_float != 0)
    se = np.sqrt(np.clip(variance, 0, None))
    return z * se

# --- 5. SQL запрос ---

def build_metrics_query(periods_3d, periods_7d, partners_sql):
    sql_query = f"""
    WITH raw_data AS (
        SELECT 
            app_short,
            partner_id,
            operation_segment_nm,
            country_cd,
            install_dt,
            installs_cnt,
            user_activity_3_cnt,
            user_activity_7_cnt
        FROM core.base_metrics
        WHERE 
            partner_id IN {partners_sql}
            AND install_dt BETWEEN '{periods_7d['reference_start']}' AND '{periods_3d['current_end']}'
    ),
    
    current_3d AS (
        SELECT 
            app_short, partner_id, operation_segment_nm, country_cd,
            SUM(installs_cnt) as installs,
            SUM(user_activity_3_cnt) as active_users_3
        FROM raw_data
        WHERE install_dt BETWEEN '{periods_3d['current_start']}' AND '{periods_3d['current_end']}'
        GROUP BY app_short, partner_id, operation_segment_nm, country_cd
    ),
    
    reference_3d AS (
        SELECT 
            app_short, partner_id, operation_segment_nm, country_cd,
            SUM(installs_cnt) as installs,
            SUM(user_activity_3_cnt) as active_users_3
        FROM raw_data
        WHERE install_dt BETWEEN '{periods_3d['reference_start']}' AND '{periods_3d['reference_end']}'
        GROUP BY app_short, partner_id, operation_segment_nm, country_cd
    ),
    
    current_7d AS (
        SELECT 
            app_short, partner_id, operation_segment_nm, country_cd,
            SUM(installs_cnt) as installs,
            SUM(user_activity_7_cnt) as active_users_7
        FROM raw_data
        WHERE install_dt BETWEEN '{periods_7d['current_start']}' AND '{periods_7d['current_end']}'
        GROUP BY app_short, partner_id, operation_segment_nm, country_cd
    ),
    
    reference_7d AS (
        SELECT 
            app_short, partner_id, operation_segment_nm, country_cd,
            SUM(installs_cnt) as installs,
            SUM(user_activity_7_cnt) as active_users_7
        FROM raw_data
        WHERE install_dt BETWEEN '{periods_7d['reference_start']}' AND '{periods_7d['reference_end']}'
        GROUP BY app_short, partner_id, operation_segment_nm, country_cd
    )
    
    SELECT 
        COALESCE(c3.app_short, r3.app_short, c7.app_short, r7.app_short) as app_short,
        COALESCE(c3.partner_id, r3.partner_id, c7.partner_id, r7.partner_id) as partner_id,
        COALESCE(c3.operation_segment_nm, r3.operation_segment_nm, c7.operation_segment_nm, r7.operation_segment_nm) as operation_segment_nm,
        COALESCE(c3.country_cd, r3.country_cd, c7.country_cd, r7.country_cd) as country_cd,
        
        COALESCE(c3.installs, 0) as curr_installs_3d,
        COALESCE(c3.active_users_3, 0) as curr_active_3,
        
        COALESCE(r3.installs, 0) as ref_installs_3d,
        COALESCE(r3.active_users_3, 0) as ref_active_3,
        
        COALESCE(c7.installs, 0) as curr_installs_7d,
        COALESCE(c7.active_users_7, 0) as curr_active_7,
        
        COALESCE(r7.installs, 0) as ref_installs_7d,
        COALESCE(r7.active_users_7, 0) as ref_active_7
        
    FROM current_3d c3
    FULL OUTER JOIN reference_3d r3 ON c3.app_short = r3.app_short 
        AND c3.partner_id = r3.partner_id 
        AND c3.operation_segment_nm = r3.operation_segment_nm 
        AND c3.country_cd = r3.country_cd
    FULL OUTER JOIN current_7d c7 ON COALESCE(c3.app_short, r3.app_short) = c7.app_short 
        AND COALESCE(c3.partner_id, r3.partner_id) = c7.partner_id 
        AND COALESCE(c3.operation_segment_nm, r3.operation_segment_nm) = c7.operation_segment_nm 
        AND COALESCE(c3.country_cd, r3.country_cd) = c7.country_cd
    FULL OUTER JOIN reference_7d r7 ON COALESCE(c3.app_short, r3.app_short, c7.app_short) = r7.app_short 
        AND COALESCE(c3.partner_id, r3.partner_id, c7.partner_id) = r7.partner_id 
        AND COALESCE(c3.operation_segment_nm, r3.operation_segment_nm, c7.operation_segment_nm) = r7.operation_segment_nm 
        AND COALESCE(c3.country_cd, r3.country_cd, c7.country_cd) = r7.country_cd
    """
    
    return sql_query

# --- 6. Основная логика проверки ---

def run_metrics_check():
    periods_3d = get_metric_periods(maturity_days=4)
    periods_7d = get_metric_periods(maturity_days=8)
    
    print(f"\nПериоды для Ret3: {periods_3d['current_start']} - {periods_3d['current_end']}")
    print(f"Периоды для Ret7: {periods_7d['current_start']} - {periods_7d['current_end']}")
    
    sql = build_metrics_query(periods_3d, periods_7d, CONFIG_PARTNERS)
    df = env.execute_sql(sql)
    
    if df.empty:
        print("Нет данных.")
        return pd.DataFrame(), None, None
    
    print(f"Получено строк: {len(df)}")
    
    keys = ['app_short', 'partner_id', 'operation_segment_nm', 'country_cd']
    numeric_cols = [c for c in df.columns if c not in keys]
    
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0)
    
    # Агрегация ALL по странам
    group_cols = ['app_short', 'partner_id', 'operation_segment_nm']
    cols_to_sum = [c for c in numeric_cols if c in df.columns]
    
    df_all = df.groupby(group_cols, as_index=False)[cols_to_sum].sum()
    df_all['country_cd'] = 'ALL'
    
    df = pd.concat([df, df_all], ignore_index=True)
    
    def safe_div(num, den):
        return np.divide(num, den, out=np.zeros_like(num), where=den!=0)
    
    # RET = active / installs
    df['curr_ret3'] = safe_div(df['curr_active_3'], df['curr_installs_3d'])
    df['ref_ret3']  = safe_div(df['ref_active_3'],  df['ref_installs_3d'])
    
    df['curr_ret7'] = safe_div(df['curr_active_7'], df['curr_installs_7d'])
    df['ref_ret7']  = safe_div(df['ref_active_7'],  df['ref_installs_7d'])
    
    return df, periods_3d, periods_7d

# --- 7. Формирование результатов для записи в БД ---

def build_results_dataframe(df, periods_3d, periods_7d):
    metrics_config = {
        'ret3': {
            'current_col': 'curr_ret3',
            'reference_col': 'ref_ret3',
            'n_col': 'ref_installs_3d',
            'ci_type': 'binomial',
            'cohort_date': periods_3d['current_end'],
            'installs_col': 'curr_installs_3d',
            'ref_installs_col': 'ref_installs_3d',
            'threshold_col': 'curr_active_3',
            'ref_threshold_col': 'ref_active_3'
        },
        'ret7': {
            'current_col': 'curr_ret7',
            'reference_col': 'ref_ret7',
            'n_col': 'ref_installs_7d',
            'ci_type': 'binomial',
            'cohort_date': periods_7d['current_end'],
            'installs_col': 'curr_installs_7d',
            'ref_installs_col': 'ref_installs_7d',
            'threshold_col': 'curr_active_7',
            'ref_threshold_col': 'ref_active_7'
        }
    }
    
    all_rows = []
    
    for metric_name, config in metrics_config.items():
        # Фильтруем по MIN_INSTALLS (current И reference)
        df_filtered = df[
            (df[config['installs_col']] >= MIN_INSTALLS) &
            (df[config['ref_installs_col']] >= MIN_INSTALLS)
        ].copy()
        
        # Фильтруем по THRESHOLD_FIXED (current И reference)
        if THRESHOLD_FIXED > 0:
            df_filtered = df_filtered[
                (df_filtered[config['threshold_col']] >= THRESHOLD_FIXED) &
                (df_filtered[config['ref_threshold_col']] >= THRESHOLD_FIXED)
            ].copy()
        
        if df_filtered.empty:
            continue
        
        # CI для binomial (Retention) — рассчитываем всегда для записи в БД
        df_filtered['reference_value_ci'] = calc_binomial_ci(
            df_filtered[config['reference_col']],
            df_filtered[config['n_col']],
            N_SIGMAS
        )
        
        # Расчет change_perc
        ref_values = df_filtered[config['reference_col']].astype(float).values
        curr_values = df_filtered[config['current_col']].astype(float).values
        diff = curr_values - ref_values
        df_filtered['change_perc'] = np.divide(
            diff, ref_values, out=np.zeros_like(diff), where=ref_values != 0
        )
        
        # Определение алертов
        if CRITERIA == 'change':
            abs_change = np.abs(df_filtered['change_perc'].values)
            df_filtered['is_alert'] = abs_change >= THRESHOLD_WARNING_PCT
            df_filtered['is_critical'] = (THRESHOLD_CRIT_PCT > 0) & (abs_change >= THRESHOLD_CRIT_PCT)
        else:
            lower_bound = df_filtered[config['reference_col']] - df_filtered['reference_value_ci']
            upper_bound = df_filtered[config['reference_col']] + df_filtered['reference_value_ci']
            df_filtered['is_alert'] = (
                (df_filtered[config['current_col']] < lower_bound) |
                (df_filtered[config['current_col']] > upper_bound)
            )
            df_filtered['is_critical'] = False
        
        for _, row in df_filtered.iterrows():
            is_alert = bool(row['is_alert'])
            if is_alert and bool(row['is_critical']):
                alert_cat = 'CRITICAL'
            elif is_alert:
                alert_cat = 'WARNING'
            else:
                alert_cat = None
            
            all_rows.append({
                'date': datetime.now(),
                'check_name': ALERT_NAME,
                'metric': metric_name,
                'partner_id': int(row['partner_id']),
                'app_short': row['app_short'],
                'country': row['country_cd'],
                'segment': row['operation_segment_nm'],
                'slice1': None,
                'slice2': None,
                'slice3': None,
                'slice4': None,
                'cohort_date': config['cohort_date'],
                'metric_crit_category': ALERT_CATEGORY,
                'current_value': float(row[config['current_col']]),
                'reference_value': float(row[config['reference_col']]),
                'reference_value_ci': float(row['reference_value_ci']),
                'change_perc': float(row['change_perc']),
                'is_alert': is_alert,
                'alert_category': alert_cat
            })
    
    return pd.DataFrame(all_rows)

# --- 8. Запуск проверки ---

if ALERT_ACTIVE_FLAG == 'Enabled':
    df_metrics, periods_3d, periods_7d = run_metrics_check()
    
    if not df_metrics.empty:
        all_results = build_results_dataframe(df_metrics, periods_3d, periods_7d)
        
        if not all_results.empty:
            alerts_count = all_results['is_alert'].sum()
            print(f"\n[{ALERT_CATEGORY.upper()}] Всего записей: {len(all_results)}, из них алертов: {alerts_count}")
            
            db_cols = [
                'date', 'check_name', 'metric',
                'partner_id', 'app_short', 'country', 'segment',
                'slice1', 'slice2', 'slice3', 'slice4',
                'cohort_date', 'metric_crit_category',
                'current_value', 'reference_value', 'reference_value_ci',
                'change_perc', 'is_alert', 'alert_category'
            ]
            
            df_to_write = all_results[db_cols].copy()
            
            if not df_to_write.empty:
                print(f"Запись {len(df_to_write)} строк в Redshift...")
                env.insert_table_into_rs(df_to_write, RS_TABLE, RS_SCHEMA, 10000)
                print("Успешно записано.")
            
            alerts_final = all_results[all_results['is_alert'] == True].copy()
            if not alerts_final.empty:
                display_cols = [
                    'date', 'check_name', 'metric',
                    'partner_id', 'app_short', 'country', 'segment',
                    'current_value', 'reference_value', 'reference_value_ci',
                    'change_perc', 'is_alert', 'alert_category'
                ]
                
                styled_df = alerts_final[display_cols].style.hide(axis='index').format({
                    'current_value': '{:.2%}',
                    'reference_value': '{:.2%}',
                    'reference_value_ci': '{:.2%}',
                    'change_perc': '{:+.1%}'
                })
                display(styled_df)
            else:
                print("Алертов не найдено.")
        else:
            print("Нет данных для записи.")
    else:
        print("Нет данных для анализа.")