# Комплексное тестирование библиотеки dynamic_refitting

**Цель:** пошаговая проверка всех модулей библиотеки на крупном внешнем датасете с замерами производительности и проверками корректности.

**Датасет:** [Adult (Census Income)](https://www.openml.org/d/1590) — 48 842 записи, бинарная классификация (доход >50K), смесь числовых и категориальных признаков.

### Содержание
1. Загрузка данных и подготовка
2. Модуль метрик ()
3. Шаги валидации ()
4. Feature Engineering ()
5. AutoPipeBoost (LightGBM + Optuna)
6. AutoPipeLogreg (Logistic Regression)
7. Мониторинг дрифта ()
8. Динамический рефиттинг ()
9. Объясняемость ()
10. Model Registry и ExperimentTracker
11. Plugin Architecture ()
12. Сериализация (save / load)
13. Сводная таблица по времени

In [None]:
import time
import warnings
import tempfile
import shutil
from pathlib import Path
from contextlib import contextmanager

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore")

# ── timing infrastructure ───────────────────────────────────
TIMING: dict = {}

@contextmanager
def timer(label: str):
    """Context-manager for measuring wall-clock time."""
    t0 = time.perf_counter()
    yield
    elapsed = time.perf_counter() - t0
    TIMING[label] = elapsed
    print(f"⏱  {label}: {elapsed:.2f} s")

print("Imports OK")

## 1. Загрузка внешнего датасета и подготовка данных

Загружаем датасет **Adult / Census Income** через .
Он содержит ~48 000 строк, 14 признаков (числовые + категориальные) и бинарную цель.

In [None]:
from sklearn.datasets import fetch_openml

with timer("1. Загрузка датасета (fetch_openml)"):
    data = fetch_openml("adult", version=2, as_frame=True, parser="auto")
    df_raw = data.data.copy()
    # target → binary 0/1
    df_raw["target"] = (data.target.astype(str).str.strip().str.replace(".", "", regex=False) == ">50K").astype(int)

print(f"Размер: {df_raw.shape}")
print(f"Доля target=1: {df_raw['target'].mean():.3f}")
df_raw.head()

In [None]:
# Добавляем синтетический столбец даты (12 месяцев) — нужен для time-based логики
rng = np.random.RandomState(42)
dates = pd.date_range("2023-01-01", periods=12, freq="MS")
df_raw["date"] = rng.choice(dates, size=len(df_raw))

# Разбивка: train (первые 8 мес.) / test (9-10) / monitor (11-12)
sorted_dates = sorted(dates)
train_dates = set(sorted_dates[:8])
test_dates  = set(sorted_dates[8:10])
mon_dates   = set(sorted_dates[10:])

df_train   = df_raw[df_raw["date"].isin(train_dates)].reset_index(drop=True)
df_test    = df_raw[df_raw["date"].isin(test_dates)].reset_index(drop=True)
df_monitor = df_raw[df_raw["date"].isin(mon_dates)].reset_index(drop=True)

X_train, y_train = df_train.drop(columns=["target"]), df_train["target"]
X_test,  y_test  = df_test.drop(columns=["target"]),  df_test["target"]
X_mon,   y_mon   = df_monitor.drop(columns=["target"]), df_monitor["target"]

print(f"Train: {len(df_train):,}  |  Test: {len(df_test):,}  |  Monitor: {len(df_monitor):,}")
print(f"Train target rate: {y_train.mean():.3f}")
print(f"Числовые: {X_train.select_dtypes(include=[np.number]).shape[1]}")
print(f"Категориальные: {X_train.select_dtypes(include=['object','category']).shape[1]}")

## 2. Модуль метрик ()

In [None]:
from dynamic_refitting.utils.metrics import (
    calc_auc, calc_ks, calc_brier, calc_psi, calc_metrics,
)

# Создаём простые прогнозы для проверки метрик
rng = np.random.RandomState(0)
y_dummy   = y_test.values
p_dummy   = rng.rand(len(y_dummy))
ref_dummy = rng.rand(len(y_train))

with timer("2. calc_metrics (AUC + KS + Brier + PSI)"):
    m = calc_metrics(y_dummy, p_dummy,
                     metric_names=["auc", "ks", "brier", "psi"],
                     reference_scores=ref_dummy)

# Проверки корректности
assert "auc" in m and 0 <= m["auc"] <= 1, "AUC вне диапазона [0,1]"
assert "ks" in m and 0 <= m["ks"] <= 1,   "KS вне диапазона [0,1]"
assert "brier" in m and 0 <= m["brier"] <= 1, "Brier вне диапазона [0,1]"
assert "psi" in m and m["psi"] >= 0,       "PSI < 0"

print("Метрики:", {k: round(v, 4) for k, v in m.items()})

# Отдельная проверка PSI на идентичных распределениях → ≈ 0
psi_same = calc_psi(ref_dummy, ref_dummy)
assert psi_same < 0.01, f"PSI одинаковых распределений слишком велик: {psi_same}"
print(f"PSI(identical) = {psi_same:.6f}  ✓")
print("Все проверки метрик пройдены ✓")

## 3. Шаги валидации данных ()

In [None]:
from dynamic_refitting.validation_steps import (
    FeatureCleanerConst,
    FeatureCleanerNan,
    FeatureCleanerUnivariate,
    HitrateChecker,
    PopulationStabilityIndex,
    WoEStabChecker,
)

# ── FeatureCleanerConst ─────────────────────────────────────
with timer("3a. FeatureCleanerConst"):
    cleaner_const = FeatureCleanerConst()
    cleaner_const.fit(X_train, y_train)
    X_cc = cleaner_const.transform(X_train)

print(f"  Удалено константных: {len(cleaner_const.constant_cols_)}  →  {cleaner_const.constant_cols_}")
assert X_cc.shape[0] == X_train.shape[0], "Число строк изменилось"
assert X_cc.shape[1] <= X_train.shape[1], "Столбцов стало больше"

# ── FeatureCleanerNan ───────────────────────────────────────
with timer("3b. FeatureCleanerNan"):
    cleaner_nan = FeatureCleanerNan(nan_threshold=0.5, fill_strategy="median")
    cleaner_nan.fit(X_cc, y_train)
    X_cn = cleaner_nan.transform(X_cc)

print(f"  Удалено (>50% NaN): {len(cleaner_nan.drop_cols_)}  |  Заполнено: {len(cleaner_nan._fill_values)}")
num_nans_after = X_cn.select_dtypes(include=[np.number]).isna().sum().sum()
print(f"  NaN в числовых после обработки: {num_nans_after}")

# ── FeatureCleanerUnivariate ────────────────────────────────
with timer("3c. FeatureCleanerUnivariate"):
    cleaner_uni = FeatureCleanerUnivariate(min_auc=0.52)
    cleaner_uni.fit(X_cn, y_train)
    X_cu = cleaner_uni.transform(X_cn)

print(f"  Отобрано признаков: {len(cleaner_uni.selected_features_)} / {len(cleaner_uni.feature_aucs_)}")
assert len(cleaner_uni.selected_features_) > 0, "Ни один признак не прошёл фильтр"

# ── HitrateChecker ──────────────────────────────────────────
with timer("3d. HitrateChecker"):
    hr = HitrateChecker(min_rate=0.01, max_rate=0.5)
    hr.fit(X_cn, y_train)

print(f"  Event rate: {hr.actual_rate_:.4f}")
assert hr.actual_rate_ > 0, "Event rate = 0"

# ── PopulationStabilityIndex ────────────────────────────────
with timer("3e. PopulationStabilityIndex"):
    psi_step = PopulationStabilityIndex(n_bins=10, threshold=0.25)
    psi_step.fit(X_cn)
    psi_step.transform(X_test.reindex(columns=X_cn.columns, fill_value=0))

n_high = sum(1 for v in psi_step.psi_values_.values() if v > 0.25)
print(f"  Проверено фичей: {len(psi_step.psi_values_)}  |  Высокий PSI: {n_high}")

# ── WoEStabChecker ──────────────────────────────────────────
with timer("3f. WoEStabChecker"):
    woe_stab = WoEStabChecker(time_col="date", target_col="target", psi_threshold=0.25)
    df_for_stab = X_cn.copy()
    df_for_stab["date"] = df_train["date"].values
    woe_stab.fit(df_for_stab, y_train)

print(f"  Фичей в отчёте: {len(woe_stab.stability_report_)}")
print("Все проверки валидации пройдены ✓")

## 4. Feature Engineering

In [None]:
from dynamic_refitting.feature_engineering import (
    TargetEncoderCV,
    FrequencyEncoder,
    CategoryEmbedder,
    InteractionGenerator,
    DatetimeFeatures,
    LagFeatureGenerator,
    RollingStatGenerator,
)

cat_cols = X_train.select_dtypes(include=["object", "category"]).columns.tolist()
num_cols = X_train.select_dtypes(include=[np.number]).columns.tolist()

# ── TargetEncoderCV ─────────────────────────────────────────
with timer("4a. TargetEncoderCV fit_transform"):
    te = TargetEncoderCV(cols=cat_cols, n_folds=5, smoothing=10.0)
    X_te = te.fit_transform(X_train.copy(), y_train)

assert X_te[cat_cols].dtypes.apply(lambda d: np.issubdtype(d, np.floating)).all(),     "Не все кат. столбцы стали float"
print(f"  Кодировано {len(cat_cols)} категориальных столбцов")

# ── FrequencyEncoder ────────────────────────────────────────
with timer("4b. FrequencyEncoder"):
    fe = FrequencyEncoder(cols=cat_cols, normalize=True)
    fe.fit(X_train, y_train)
    X_fe = fe.transform(X_train.copy())

assert X_fe[cat_cols].max().max() <= 1.0, "Нормализованные частоты > 1"
print(f"  Кодировано {len(fe._freq_map)} столбцов")

# ── CategoryEmbedder ────────────────────────────────────────
with timer("4c. CategoryEmbedder"):
    ce = CategoryEmbedder(cols=cat_cols)
    ce.fit(X_train)
    X_ce = ce.transform(X_train.copy())

assert X_ce[cat_cols].dtypes.apply(lambda d: np.issubdtype(d, np.integer)).all(),     "Не все столбцы стали int"
print(f"  Кодировано {len(ce._mapping)} столбцов")

# ── InteractionGenerator ────────────────────────────────────
with timer("4d. InteractionGenerator"):
    ig = InteractionGenerator(cols=num_cols[:5], max_pairs=10, interaction_type="multiply")
    ig.fit(X_train)
    X_ig = ig.transform(X_train.copy())

new_int_cols = [c for c in X_ig.columns if "_x_" in c]
print(f"  Создано интеракций: {len(new_int_cols)}")
assert len(new_int_cols) == len(ig._pairs), "Число интеракций не совпадает"

# ── DatetimeFeatures ────────────────────────────────────────
with timer("4e. DatetimeFeatures"):
    dtf = DatetimeFeatures(datetime_col="date", features=["month", "dayofweek", "quarter"])
    dtf.fit(X_train)
    X_dt = dtf.transform(X_train.copy())

assert "date_month" in X_dt.columns, "date_month не создан"
assert "date_quarter" in X_dt.columns, "date_quarter не создан"
print(f"  Создано dt-признаков: {sum(1 for c in X_dt.columns if c.startswith('date_'))}")

# ── LagFeatureGenerator ─────────────────────────────────────
with timer("4f. LagFeatureGenerator"):
    lfg = LagFeatureGenerator(lag_cols=num_cols[:3], lags=[1, 3], sort_col="date")
    lfg.fit(X_train)
    X_lag = lfg.transform(X_train.copy())

lag_cols_created = [c for c in X_lag.columns if "_lag" in c]
print(f"  Создано лаговых признаков: {len(lag_cols_created)}")
assert len(lag_cols_created) == 3 * 2, "Ожидалось 6 лаговых признаков"

# ── RollingStatGenerator ────────────────────────────────────
with timer("4g. RollingStatGenerator"):
    rsg = RollingStatGenerator(
        stat_cols=num_cols[:2], windows=[3, 5], funcs=["mean", "std"], sort_col="date"
    )
    rsg.fit(X_train)
    X_roll = rsg.transform(X_train.copy())

roll_cols = [c for c in X_roll.columns if "_roll" in c]
print(f"  Создано rolling-признаков: {len(roll_cols)}")
assert len(roll_cols) == 2 * 2 * 2, "Ожидалось 8 rolling-признаков"

print("Все проверки Feature Engineering пройдены ✓")

## 5. AutoPipeBoost (LightGBM + Optuna)

Обучение полного бустингового пайплайна: очистка → предселекция → корреляции → хвосты → Optuna-LightGBM.
Используем  и  для ускорения теста.

In [None]:
from dynamic_refitting.config import PipelineConfig
from dynamic_refitting.autopipe import AutoPipeBoost

config_boost = PipelineConfig(
    random_state=42,
    target_col="target",
    time_col="date",
    metrics=["auc", "ks", "brier"],
)

pipe_boost = AutoPipeBoost(config=config_boost, optuna_n_trials=10)

with timer("5. AutoPipeBoost.fit (10 Optuna trials, 3-fold CV)"):
    pipe_boost.fit(X_train, y_train, run_cv=True, n_splits=3)

# ── Проверки ────────────────────────────────────────────────
assert pipe_boost.pipeline_ is not None, "Pipeline не создан"
assert pipe_boost.pipeline_._fitted, "Pipeline не обучен"
assert "auc" in pipe_boost.train_metrics_, "AUC не посчитан"
assert pipe_boost.train_metrics_["auc"] > 0.5, "Train AUC <= 0.5"
assert len(pipe_boost.cv_results_) == 3, "CV results не 3 фолда"

print(f"Train metrics: { {k: round(v, 4) for k, v in pipe_boost.train_metrics_.items()} }")
cv_aucs = [r["auc"] for r in pipe_boost.cv_results_]
print(f"CV AUCs: {[round(a, 4) for a in cv_aucs]}")
print(f"CV AUC mean: {np.mean(cv_aucs):.4f}  std: {np.std(cv_aucs):.4f}")

# Предсказание на тесте
with timer("5b. AutoPipeBoost.predict_proba (test)"):
    proba_boost_test = pipe_boost.predict_proba(X_test)[:, 1]

test_auc_boost = calc_auc(y_test.values, proba_boost_test)
test_ks_boost  = calc_ks(y_test.values, proba_boost_test)
assert test_auc_boost > 0.5, f"Test AUC={test_auc_boost:.4f} <= 0.5"
print(f"Test AUC: {test_auc_boost:.4f}  |  KS: {test_ks_boost:.4f}")
print("AutoPipeBoost ✓")

## 6. AutoPipeLogreg (Logistic Regression)

WoE-кодирование, монотонный биннинг, StandardScaler, LogisticRegression.

In [None]:
from dynamic_refitting.autopipe import AutoPipeLogreg

config_lr = PipelineConfig(
    random_state=42,
    target_col="target",
    time_col="date",
    metrics=["auc", "ks", "brier"],
)

pipe_lr = AutoPipeLogreg(config=config_lr, C=1.0, penalty="l2")

with timer("6. AutoPipeLogreg.fit (3-fold CV)"):
    pipe_lr.fit(X_train, y_train, run_cv=True, n_splits=3)

assert pipe_lr.pipeline_ is not None, "Pipeline не создан"
assert pipe_lr.train_metrics_["auc"] > 0.5, "Train AUC <= 0.5"

print(f"Train metrics: { {k: round(v, 4) for k, v in pipe_lr.train_metrics_.items()} }")
cv_aucs_lr = [r["auc"] for r in pipe_lr.cv_results_]
print(f"CV AUCs: {[round(a, 4) for a in cv_aucs_lr]}")
print(f"CV AUC mean: {np.mean(cv_aucs_lr):.4f}")

with timer("6b. AutoPipeLogreg.predict_proba (test)"):
    proba_lr_test = pipe_lr.predict_proba(X_test)[:, 1]

test_auc_lr = calc_auc(y_test.values, proba_lr_test)
test_ks_lr  = calc_ks(y_test.values, proba_lr_test)
print(f"Test AUC: {test_auc_lr:.4f}  |  KS: {test_ks_lr:.4f}")
print("AutoPipeLogreg ✓")

## 7. Мониторинг дрифта

Тестируем , , , .

In [None]:
from dynamic_refitting.monitoring import (
    FeatureDriftDetector,
    PredictionDriftMonitor,
    ModelPerformanceMonitor,
    DriftReportGenerator,
)

# ── FeatureDriftDetector ────────────────────────────────────
with timer("7a. FeatureDriftDetector (fit + transform)"):
    drift_det = FeatureDriftDetector(psi_threshold=0.2, ks_threshold=0.05)
    drift_det.fit(X_train)
    drift_det.transform(X_mon)

print(f"  Проверено фичей: {len(drift_det.drift_report_)}")
print(f"  Дрифтовых: {len(drift_det.drifted_features)}")
if drift_det.drifted_features:
    print(f"  Список: {drift_det.drifted_features[:5]}...")

# Проверяем формат отчёта
for col, info in list(drift_det.drift_report_.items())[:1]:
    assert "psi" in info and "ks_stat" in info and "ks_pvalue" in info and "drifted" in info,         "Неполный drift_report"
    assert info["psi"] >= 0, "PSI < 0"

# ── PredictionDriftMonitor ──────────────────────────────────
proba_train_boost = pipe_boost.predict_proba(X_train)[:, 1]
proba_mon_boost   = pipe_boost.predict_proba(X_mon)[:, 1]

with timer("7b. PredictionDriftMonitor"):
    pred_drift = PredictionDriftMonitor(psi_threshold=0.2)
    pred_drift.fit(X_train, reference_scores=proba_train_boost)
    check = pred_drift.check(proba_mon_boost)

print(f"  Prediction PSI: {check['psi']:.4f}  |  Drifted: {check['drifted']}")
assert "psi" in check and "drifted" in check, "Неполный check"

# ── ModelPerformanceMonitor ─────────────────────────────────
with timer("7c. ModelPerformanceMonitor"):
    perf_mon = ModelPerformanceMonitor(
        time_col="date", metrics=["auc", "ks", "brier"], auc_threshold=0.6
    )
    perf_mon.fit(X_mon)
    perf_results = perf_mon.evaluate(
        y_test, proba_boost_test,
        time_values=df_test["date"]
    )

print(f"  Периодов: {len(perf_results)}")
for r in perf_results[:3]:
    print(f"    {r['period']}: AUC={r.get('auc', 'N/A'):.4f}")
assert len(perf_mon.performance_history_) > 0, "History пуст"

rolling_auc = perf_mon.get_rolling_auc(window=2)
print(f"  Rolling AUC (w=2): {[round(a, 4) for a in rolling_auc]}")
print("Monitoring ✓")

In [None]:
# ── DriftReportGenerator ─────────────────────────────────────
with timer("7d. DriftReportGenerator"):
    reporter = DriftReportGenerator()
    reporter.fit(X_train)
    report = reporter.generate(
        feature_drift=drift_det,
        prediction_drift=pred_drift,
        performance_monitor=perf_mon,
        metadata={"dataset": "adult", "split": "monitor"},
    )

assert "timestamp" in report, "Нет timestamp"
assert "feature_drift" in report, "Нет feature_drift"
assert "prediction_drift" in report, "Нет prediction_drift"
assert "performance" in report, "Нет performance"
assert "summary" in report, "Нет summary"

print(f"Needs refit: {report['summary']['needs_refit']}")
print(f"Reasons: {report['summary']['reasons']}")
print(f"Feature drift — total: {report['feature_drift']['total_features_checked']}, "
      f"drifted: {report['feature_drift']['n_drifted']}")
print("DriftReportGenerator ✓")

## 8. Динамический рефиттинг ()

Тестируем триггеры (Performance / Time / DataVolume) и .

In [None]:
from dynamic_refitting.refit import (
    PerformanceTriggeredRefit,
    TimeBasedRefit,
    DataVolumeTriggeredRefit,
    RefitManager,
    RefitScheduler,
)

# ── PerformanceTriggeredRefit ───────────────────────────────
with timer("8a. PerformanceTriggeredRefit"):
    perf_trig = PerformanceTriggeredRefit(auc_threshold=0.99, psi_threshold=0.01)
    # Порог AUC=0.99 — любая реальная модель его не пройдёт → trigger fires
    fired = perf_trig.should_refit(
        y_true=y_test.values,
        y_score=proba_boost_test,
        reference_scores=proba_train_boost,
    )

print(f"  Fired (AUC<0.99): {fired}  |  Reason: {perf_trig.get_reason()}")
assert fired, "Trigger должен был сработать при AUC<0.99"

# Нормальный порог — не должен сработать
perf_trig2 = PerformanceTriggeredRefit(auc_threshold=0.5, psi_threshold=10.0)
fired2 = perf_trig2.should_refit(
    y_true=y_test.values, y_score=proba_boost_test, reference_scores=proba_train_boost,
)
print(f"  Fired (AUC<0.5): {fired2}  → ожидаем False")
assert not fired2, "Trigger не должен был сработать при AUC<0.5"

# ── TimeBasedRefit ──────────────────────────────────────────
with timer("8b. TimeBasedRefit"):
    time_trig = TimeBasedRefit(interval_days=0)  # 0 дней → сразу fire
    fired_t = time_trig.should_refit()

print(f"  Fired (0 days): {fired_t}")
assert fired_t, "TimeBasedRefit(0 days) должен сработать"

time_trig.reset()
time_trig2 = TimeBasedRefit(interval_days=999)
assert not time_trig2.should_refit(), "999 days не должен сработать сразу"
print("  TimeBasedRefit(999 days): False ✓")

# ── DataVolumeTriggeredRefit ────────────────────────────────
with timer("8c. DataVolumeTriggeredRefit"):
    vol_trig = DataVolumeTriggeredRefit(min_new_samples=100)
    assert not vol_trig.should_refit(), "Не должен сработать без данных"
    vol_trig.add_samples(50)
    assert not vol_trig.should_refit(), "50 < 100"
    vol_trig.add_samples(60)
    assert vol_trig.should_refit(), "110 >= 100, должен сработать"
    vol_trig.reset()
    assert not vol_trig.should_refit(), "После reset не должен"

print("  DataVolumeTriggeredRefit ✓")
print("Все триггеры пройдены ✓")

In [None]:
# ── RefitManager ─────────────────────────────────────────────
triggers = [
    PerformanceTriggeredRefit(auc_threshold=0.99, psi_threshold=0.01),
    DataVolumeTriggeredRefit(min_new_samples=10),
]
triggers[1].add_samples(1000)  # наполняем триггер

assert pipe_boost.pipeline_ is not None
manager = RefitManager(
    pipeline=pipe_boost.pipeline_,
    triggers=triggers,
    config=config_boost,
)
manager.set_reference_scores(proba_train_boost)

# check_triggers → должен вернуть причины
reasons = manager.check_triggers(
    y_true=y_test.values,
    y_score=proba_boost_test,
)
print(f"Reasons: {reasons}")
assert len(reasons) > 0, "Должны быть причины для рефита"

# Выполняем ручной refit
with timer("8d. RefitManager.refit"):
    refitted = manager.refit(X_train, y_train)

assert refitted is not None, "refit вернул None"
assert len(manager.refit_history_) == 1, "History не обновлён"
rec = manager.refit_history_[0]
print(f"  Refit: {rec['n_samples']} samples, {rec['elapsed_seconds']:.2f}s")
print(f"  Metrics after refit: { {k: round(v, 4) for k, v in rec['metrics'].items()} }")

# auto_refit с низким порогом → должен сработать
triggers2 = [PerformanceTriggeredRefit(auc_threshold=0.99, psi_threshold=0.01)]
manager2 = RefitManager(pipeline=pipe_boost.pipeline_, triggers=triggers2, config=config_boost)
manager2.set_reference_scores(proba_train_boost)

with timer("8e. RefitManager.auto_refit"):
    result = manager2.auto_refit(
        X_monitor=X_test, y_monitor=y_test,
        X_train=X_train, y_train=y_train,
    )

assert result is not None, "auto_refit должен был сработать"
print(f"  auto_refit: {len(manager2.refit_history_)} записей в истории")

# ── RefitScheduler ──────────────────────────────────────────
with timer("8f. RefitScheduler"):
    scheduler = RefitScheduler(check_interval_seconds=0, max_checks=3)
    call_count = 0
    def cb():
        global call_count
        call_count += 1
        return call_count == 2  # refit на втором вызове
    scheduler.register_callback(cb)
    scheduler.run()

assert len(scheduler.history_) == 3, f"Ожидалось 3 проверки, получено {len(scheduler.history_)}"
assert scheduler.history_[1]["refit_triggered"], "Второй вызов должен был trigger"
print(f"  Scheduler: {len(scheduler.history_)} checks done")
print("RefitManager + Scheduler ✓")

## 9. Объясняемость ()

, , , .

In [None]:
from dynamic_refitting.explainability import (
    PermutationImportance,
    PartialDependence,
    CounterfactualGenerator,
    ModelCardGenerator,
)

# Достаём обученную LightGBM-модель из пайплайна
lgb_step = pipe_boost.pipeline_.estimator
lgb_model = lgb_step.model_
feat_names = lgb_step._feature_names

# Готовим числовой X для explainability
X_test_transformed = pipe_boost.pipeline_.transform(X_test)
X_test_num = X_test_transformed[feat_names].fillna(0)

# ── PermutationImportance ───────────────────────────────────
with timer("9a. PermutationImportance (5 repeats)"):
    perm_imp = PermutationImportance(n_repeats=5, scoring="roc_auc")
    perm_imp.fit(X_test_transformed, y=y_test, model=lgb_model)

top5 = perm_imp.get_top_features(5)
print(f"  Top-5 features: {top5}")
assert len(top5) == 5, "Ожидалось 5 фичей"
assert perm_imp.importances_ is not None
assert perm_imp.importances_["mean"].iloc[0] >= 0, "Top importance < 0"

# ── PartialDependence ───────────────────────────────────────
pdp_features = feat_names[:3]  # первые 3 фичи
with timer("9b. PartialDependence (3 features, grid=30)"):
    pdp = PartialDependence(features=pdp_features, grid_resolution=30)
    pdp.fit(X_test_transformed, model=lgb_model)

assert len(pdp.pdp_results_) > 0, "PDP пуст"
for feat, res in pdp.pdp_results_.items():
    assert "grid" in res and "avg_prediction" in res
    assert len(res["grid"]) == 30
    print(f"  PDP {feat}: pred range [{res['avg_prediction'].min():.4f}, {res['avg_prediction'].max():.4f}]")

# ── CounterfactualGenerator ─────────────────────────────────
with timer("9c. CounterfactualGenerator"):
    cf_gen = CounterfactualGenerator(target_class=0, max_features_to_change=5, step_size=0.5, max_iterations=50)
    cf_gen.fit(X_test_transformed, model=lgb_model)
    instance = X_test_num.iloc[0]
    cf_result = cf_gen.explain(instance, feature_names=feat_names[:5])

print(f"  Counterfactual success: {cf_result['success']}")
print(f"  Changes: {len(cf_result['changes'])} features modified")
assert "success" in cf_result and "changes" in cf_result

# ── ModelCardGenerator ──────────────────────────────────────
with timer("9d. ModelCardGenerator"):
    mcg = ModelCardGenerator(model_name="BoostModel_Adult", model_version="1.0")
    mcg.fit(X_train)
    card = mcg.generate(
        model_type="LightGBM",
        features=feat_names,
        training_metrics=pipe_boost.train_metrics_,
        validation_metrics={"auc": test_auc_boost, "ks": test_ks_boost},
        dataset_description="Adult Census Income, 48K rows",
        intended_use="Binary classification (income >50K)",
    )

assert "model_details" in card and "metrics" in card
print(f"  Model card: {card['model_details']['name']} v{card['model_details']['version']}")
print(f"  Features: {card['model_details']['n_features']}")
print("Explainability ✓")

## 10. Model Registry и ExperimentTracker

In [None]:
from dynamic_refitting.registry import ModelRegistry
from dynamic_refitting.registry.experiment_tracker import ExperimentTracker

tmp_dir = tempfile.mkdtemp(prefix="dr_test_")

# ── ModelRegistry ───────────────────────────────────────────
with timer("10a. ModelRegistry (register + promote + rollback)"):
    registry = ModelRegistry(root_dir=Path(tmp_dir) / "registry")

    mv1 = registry.register(
        "scoring_adult", pipe_boost.pipeline_,
        metrics={"auc": test_auc_boost}, tags={"env": "test"},
    )
    assert mv1.version == 1
    assert mv1.stage == "staging"

    registry.promote("scoring_adult", version=1)
    prod = registry.get_production("scoring_adult")
    assert prod is not None and prod.version == 1

    mv2 = registry.register(
        "scoring_adult", pipe_lr.pipeline_,
        metrics={"auc": test_auc_lr}, tags={"env": "test"},
    )
    assert mv2.version == 2
    registry.promote("scoring_adult", version=2)
    assert registry.get_production("scoring_adult").version == 2
    assert registry.get_version("scoring_adult", 1).stage == "archived"

    # Rollback
    prev = registry.rollback("scoring_adult")
    assert prev is not None and prev.version == 1
    assert prev.stage == "production"

    # Load model
    loaded = registry.load_model("scoring_adult")
    assert loaded is not None

    # Tags
    registry.tag("scoring_adult", version=1, tags={"team": "risk"})
    v1 = registry.get_version("scoring_adult", 1)
    assert v1.tags["team"] == "risk"

    versions = registry.list_versions("scoring_adult")
    assert len(versions) == 2

print(f"  Versions: {[(v.version, v.stage) for v in versions]}")
print("ModelRegistry ✓")

# ── ExperimentTracker ───────────────────────────────────────
with timer("10b. ExperimentTracker"):
    tracker = ExperimentTracker(storage_dir=Path(tmp_dir) / "experiments")
    run = tracker.start_run(
        "boost_test",
        params={"n_trials": 10, "n_splits": 3},
        tags={"dataset": "adult"},
    )
    tracker.log_metrics(run.run_id, {"auc": test_auc_boost, "ks": test_ks_boost})
    tracker.end_run(run.run_id, status="completed")

    loaded_run = tracker.get_run(run.run_id)
    assert loaded_run.status == "completed"
    assert loaded_run.metrics["auc"] == test_auc_boost

    runs = tracker.list_runs("boost_test")
    assert len(runs) >= 1

print(f"  Run: {loaded_run.run_id}, status={loaded_run.status}")
print(f"  Metrics: {loaded_run.metrics}")
print("ExperimentTracker ✓")

# Cleanup
shutil.rmtree(tmp_dir, ignore_errors=True)

## 11. Plugin Architecture ()

In [None]:
from dynamic_refitting.registry import StepRegistry
from dynamic_refitting.config import BaseStep

with timer("11. StepRegistry"):
    StepRegistry.clear()  # чистим для теста

    # Определяем кастомный шаг
    class MyCustomStep(BaseStep):
        def fit(self, X, y=None, **kw):
            self._fitted = True
            return self
        def transform(self, X):
            return X

    # Регистрируем
    StepRegistry.register("custom", "my_step", MyCustomStep)
    assert "custom" in StepRegistry.list_categories()
    assert "my_step" in StepRegistry.list_steps("custom")

    # Получаем и создаём экземпляр
    cls = StepRegistry.get("custom", "my_step")
    assert cls is MyCustomStep
    step = cls(random_state=42)
    step.fit(X_train)
    X_out = step.transform(X_train)
    assert X_out.shape == X_train.shape, "Custom step изменил shape"

    # Повторная регистрация без overwrite → ошибка
    try:
        StepRegistry.register("custom", "my_step", MyCustomStep)
        assert False, "Должна была быть ошибка"
    except KeyError:
        pass  # ожидаемо

    # С overwrite → ОК
    StepRegistry.register("custom", "my_step", MyCustomStep, overwrite=True)

    StepRegistry.clear()
    assert len(StepRegistry.list_categories()) == 0

print("StepRegistry ✓")

## 12. Сериализация (save / load)

Проверяем сохранение и загрузку PipelineConfig и ScoringPipeline.

In [None]:
from dynamic_refitting.config import ScoringPipeline

tmp_dir = tempfile.mkdtemp(prefix="dr_serial_")

with timer("12a. Save/Load PipelineConfig"):
    cfg_path = Path(tmp_dir) / "config.json"
    config_boost.save(cfg_path)
    loaded_cfg = PipelineConfig.load(cfg_path)
    assert loaded_cfg.random_state == config_boost.random_state
    assert loaded_cfg.target_col == config_boost.target_col
    assert loaded_cfg.metrics == config_boost.metrics

print(f"  Config saved/loaded: random_state={loaded_cfg.random_state}, metrics={loaded_cfg.metrics}")

with timer("12b. Save/Load ScoringPipeline (boost)"):
    pipe_path = Path(tmp_dir) / "boost_pipe.joblib"
    pipe_boost.pipeline_.save(pipe_path)
    loaded_pipe = ScoringPipeline.load(pipe_path)

# Предсказываем загруженным пайплайном
with timer("12c. Predict from loaded pipeline"):
    proba_loaded = loaded_pipe.predict_proba(X_test)[:, 1]

# Проверяем что предсказания идентичны
assert np.allclose(proba_loaded, proba_boost_test, atol=1e-6),     "Предсказания загруженного пайплайна отличаются!"
print(f"  Predictions match: max diff = {np.max(np.abs(proba_loaded - proba_boost_test)):.2e}")

with timer("12d. Save/Load ScoringPipeline (logreg)"):
    lr_path = Path(tmp_dir) / "lr_pipe.joblib"
    pipe_lr.pipeline_.save(lr_path)
    loaded_lr = ScoringPipeline.load(lr_path)
    proba_lr_loaded = loaded_lr.predict_proba(X_test)[:, 1]
    assert np.allclose(proba_lr_loaded, proba_lr_test, atol=1e-6)

print(f"  LR predictions match ✓")
print("Serialization ✓")

shutil.rmtree(tmp_dir, ignore_errors=True)

## 13. Custom Pipeline (ручная сборка)

Собираем пайплайн вручную через  и проверяем работу.

In [None]:
from dynamic_refitting.config import ScoringPipeline
from dynamic_refitting.validation_steps import FeatureCleanerConst, FeatureCleanerNan
from dynamic_refitting.boost_pipeline_steps import (
    FeaturePreSelector,
    ClearCorrelatedFeatures,
    ClearTailFeatures,
    OptunaBoostingFitter,
)

custom_steps = [
    FeatureCleanerConst(random_state=42),
    FeatureCleanerNan(random_state=42, fill_strategy="median"),
    FeaturePreSelector(random_state=42, nan_threshold=0.8),
    ClearCorrelatedFeatures(random_state=42, threshold=0.95),
    ClearTailFeatures(random_state=42),
    OptunaBoostingFitter(n_trials=5, random_state=42, time_col="date"),
]

custom_pipe = ScoringPipeline(steps=custom_steps, config=config_boost)

with timer("13. Custom ScoringPipeline.fit (5 Optuna trials)"):
    custom_pipe.fit(X_train, y_train)

assert custom_pipe._fitted
proba_custom = custom_pipe.predict_proba(X_test)[:, 1]
custom_auc = calc_auc(y_test.values, proba_custom)
print(f"  Custom pipeline Test AUC: {custom_auc:.4f}")
assert custom_auc > 0.5
print("Custom Pipeline ✓")

## 14. TimeSeriesSplitter

Проверяем корректность time-based кросс-валидации.

In [None]:
from dynamic_refitting.utils.time_split import TimeSeriesSplitter

with timer("14. TimeSeriesSplitter"):
    splitter = TimeSeriesSplitter(n_splits=5, time_col="date", gap=0, expanding=True)
    splits = list(splitter.split(X_train))

print(f"  Splits: {len(splits)}")
for i, (tr_idx, val_idx) in enumerate(splits):
    tr_dates = X_train.iloc[tr_idx]["date"].unique()
    val_dates = X_train.iloc[val_idx]["date"].unique()
    # Проверяем: train dates < val dates
    assert tr_dates.max() <= val_dates.min(), f"Fold {i}: train dates overlap val dates"
    print(f"  Fold {i}: train={len(tr_idx):,} ({len(tr_dates)} periods)  |  "
          f"val={len(val_idx):,} ({len(val_dates)} periods)")

# Expanding: каждый следующий train должен быть >= предыдущего
for i in range(1, len(splits)):
    assert len(splits[i][0]) >= len(splits[i-1][0]), "Expanding window нарушен"

print("TimeSeriesSplitter ✓")

## 15. Synthetic Data Generator

Проверяем  — генератор синтетических данных.

In [None]:
from dynamic_refitting.utils.data_gen import make_scoring_dataset

with timer("15. make_scoring_dataset (50K rows)"):
    df_synth = make_scoring_dataset(
        n_samples=50_000, n_features=30, imbalance_ratio=10.0,
        n_informative=10, n_time_periods=12, random_state=42,
    )

assert len(df_synth) == 50_000
assert "target" in df_synth.columns
assert "date" in df_synth.columns
assert df_synth["target"].nunique() == 2

pos_rate = df_synth["target"].mean()
print(f"  Shape: {df_synth.shape}")
print(f"  Positive rate: {pos_rate:.3f} (expected ~{1/(1+10):.3f})")
print(f"  NaN fraction: {df_synth.isna().mean().mean():.4f}")
print(f"  Numeric cols: {df_synth.select_dtypes(include=[np.number]).shape[1]}")
print(f"  Categorical cols: {df_synth.select_dtypes(include=['object']).shape[1]}")
print(f"  Time periods: {df_synth['date'].nunique()}")

# Проверяем что информативные признаки создают сигнал
from sklearn.metrics import roc_auc_score
X_s = df_synth.drop(columns=["target", "date"]).select_dtypes(include=[np.number]).fillna(0)
y_s = df_synth["target"]
# Простая модель для проверки сигнала
from sklearn.linear_model import LogisticRegression
lr_check = LogisticRegression(max_iter=200, random_state=42)
lr_check.fit(X_s, y_s)
auc_check = roc_auc_score(y_s, lr_check.predict_proba(X_s)[:, 1])
print(f"  Sanity check AUC (LR on synthetic): {auc_check:.4f}")
assert auc_check > 0.55, "Синтетические данные не содержат достаточно сигнала"
print("make_scoring_dataset ✓")

## 16. Adversarial Validation

Проверяем  — обнаружение covariate shift между train и holdout.

In [None]:
from dynamic_refitting.boost_pipeline_steps import AdversarialValidation

with timer("16. AdversarialValidation"):
    av = AdversarialValidation(auc_threshold=0.7, n_estimators=50)
    av.fit(X_train, X_holdout=X_test)

print(f"  Adversarial AUC: {av.adversarial_auc_:.4f}")
print(f"  Top drifted features: {av.feature_importances_.head(5).to_dict()}")
assert 0 <= av.adversarial_auc_ <= 1
assert av.feature_importances_ is not None
print("AdversarialValidation ✓")

## 17. Сводная таблица по времени

Итоговый отчёт по скорости выполнения всех модулей.

In [None]:
print("=" * 72)
print("  СВОДНАЯ ТАБЛИЦА ЗАМЕРОВ ВРЕМЕНИ")
print("=" * 72)
print(f"{' Этап':<55} {'Время (с)':>10}")
print("-" * 72)

total = 0
for label, elapsed in TIMING.items():
    total += elapsed
    bar = "█" * int(min(elapsed / max(TIMING.values()) * 30, 30))
    print(f"  {label:<53} {elapsed:>8.2f}  {bar}")

print("-" * 72)
print(f"  {'ИТОГО':<53} {total:>8.2f}")
print("=" * 72)
print()

# Группировка по модулям
groups = {}
for label, elapsed in TIMING.items():
    prefix = label.split(".")[0]
    groups.setdefault(prefix, 0)
    groups[prefix] += elapsed

print("  ВРЕМЯ ПО МОДУЛЯМ:")
for g, t in sorted(groups.items(), key=lambda x: -x[1]):
    pct = t / total * 100
    print(f"    {g:>4}: {t:>8.2f} s  ({pct:>5.1f}%)")
print()
print(f"Все {len(TIMING)} тестов пройдены успешно ✓")