# Monitoring de Production et Detection de Data Drift

## Contexte et demarche

Dans le notebook precedent (`04_data_drift_analysis.ipynb`), nous avons etudie le concept de data drift en comparant nos donnees de train et de test. Cette analyse avait une limite importante : **les deux jeux de donnees proviennent du meme dataset Kaggle**, donc toute "derive" detectee est un artefact statistique, pas du vrai drift.

En production reelle, le data drift est un phenomene **temporel** : la distribution des donnees recues par l'API change au fil du temps (evolution economique, nouveau segment de clientele, changement de fournisseur de donnees, etc.).

### Ce que fait ce notebook

Nous avons mis en place une **architecture complete de monitoring** :

1. **Stockage en production** : chaque prediction de l'API est stockee dans **Neon Postgres** avec :
   - Les **inputs** (419 features en JSONB)
   - Les **outputs** (probabilite, decision)
   - Le **temps d'execution** (inference_time_ms)
   - Un **timestamp** (dimension temporelle)

2. **Simulation de trafic** : le script `simulate_production_traffic.py` envoie des predictions par batches avec du drift progressif, reproduisant ce qui se passerait en production si la population de clients changeait.

3. **Detection de drift** : le script `run_drift_detection.py` compare les features des predictions recentes (depuis Postgres) contre la distribution de reference (training data) via le test de Kolmogorov-Smirnov.

4. **Collecte de logs** : Fluentd collecte les logs structures de l'API et les stocke dans Postgres.

5. **Visualisation** : Grafana et Streamlit permettent de suivre le drift et les metriques operationnelles en temps reel.

Ce notebook connecte directement a la base de production pour analyser les resultats.

---

## Architecture de monitoring

```
Client  -->  FastAPI (8000)
                 |
                 +-->  Neon Postgres (cloud)
                 |       +-- predictions (inputs JSONB, outputs, latence, timestamp)
                 |       +-- drift_reports (metriques par fenetre temporelle)
                 |       +-- api_logs (logs structures via Fluentd)
                 |       +-- training_reference (stats de reference)
                 |
                 +-->  stdout (JSON structure)
                 |       +-->  Fluentd --> Postgres (api_logs)
                 |
                 +-->  JSONL (backup local)

Scripts batch :
  simulate_production_traffic.py  -->  Envoie le test set par batches avec drift progressif
  run_drift_detection.py         -->  Compare predictions recentes vs reference (training)

Visualisation :
  Grafana (3000)    -->  Dashboards SQL sur Postgres (logs, drift, latence)
  Streamlit (8501)  -->  Dashboard interactif existant
```

---

## 1. Connexion a la base de donnees de production

In [None]:
import json
import os
import sys
import warnings
from pathlib import Path

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import psycopg2
from dotenv import load_dotenv

sys.path.insert(0, str(Path.cwd().parent))
from monitoring.drift import compute_drift_report, simulate_drift

warnings.filterwarnings("ignore", category=FutureWarning)

# Charger les variables d'environnement
load_dotenv(Path.cwd().parent / ".env")
DATABASE_URL = os.environ.get("DATABASE_URL", "")

ARTIFACTS_DIR = Path("../artifacts")
DATA_DIR = Path("../data")

print(f"Base de donnees configuree : {'Oui' if DATABASE_URL else 'Non'}")

In [None]:
# Connexion a Neon Postgres
conn = psycopg2.connect(DATABASE_URL)

# Vue d'ensemble des donnees de production
stats = pd.read_sql("""
    SELECT 
        count(*) AS total_predictions,
        min(created_at) AS premiere_prediction,
        max(created_at) AS derniere_prediction,
        round(avg(inference_time_ms)::numeric, 2) AS latence_moyenne_ms,
        round(avg(probability)::numeric, 4) AS probabilite_moyenne,
        round(avg(CASE WHEN decision = 'REFUSED' THEN 1.0 ELSE 0.0 END)::numeric * 100, 1) AS taux_refus_pct
    FROM predictions
""", conn)

print("=== Donnees de production en base ===")
for col in stats.columns:
    print(f"  {col}: {stats[col].iloc[0]}")

### Que stocke-t-on ?

Chaque appel a l'endpoint `/predict` de l'API enregistre dans la table `predictions` :

| Colonne | Description | Pourquoi |
|---------|------------|----------|
| `sk_id_curr` | Identifiant du client | Tracabilite |
| `features` | 419 features en JSONB | Detection de drift (inputs) |
| `probability` | Score de defaut predit | Monitoring des outputs |
| `decision` | APPROVED / REFUSED | Suivi des decisions |
| `inference_time_ms` | Temps d'inference | Detection d'anomalies operationnelles |
| `created_at` | Timestamp | Dimension temporelle pour le drift |

C'est le `created_at` qui nous donne la **dimension temporelle** : en regroupant les predictions par fenetre de temps, on peut observer l'evolution de la distribution des features et detecter le drift.

---

## 2. Extraction et analyse des features de production

Chaque prediction stockee contient les 419 features envoyees a l'API en format JSONB. On les extrait pour les comparer a la reference.

In [None]:
# Charger les predictions avec leurs features depuis Postgres
prod_raw = pd.read_sql("""
    SELECT features, probability, decision, inference_time_ms, created_at
    FROM predictions
    WHERE features IS NOT NULL
    ORDER BY created_at
""", conn)

print(f"Predictions recuperees : {len(prod_raw)}")

# Extraire les features JSONB en colonnes pandas
prod_features = pd.json_normalize(prod_raw["features"])
prod_features.index = prod_raw.index

print(f"Features extraites : {prod_features.shape[1]} colonnes")
print(f"\nApercu :")
prod_features.iloc[:3, :5]

In [None]:
# Charger les donnees de reference (echantillon du training data)
reference = pd.read_csv(DATA_DIR / "train_preprocessed.csv", nrows=5000)
drop_cols = [c for c in ["SK_ID_CURR", "TARGET"] if c in reference.columns]
reference = reference.drop(columns=drop_cols)

# Aligner les colonnes entre reference et production
common_cols = sorted(set(reference.columns) & set(prod_features.columns))
reference = reference[common_cols]
prod_aligned = prod_features[common_cols]

print(f"Reference (training data) : {reference.shape[0]} lignes, {reference.shape[1]} features")
print(f"Production (predictions)  : {prod_aligned.shape[0]} lignes, {prod_aligned.shape[1]} features")
print(f"\nLes features sont alignees : on peut comparer les distributions.")

---

## 3. Detection de drift : Reference vs Production

On compare la distribution des features d'entrainement (reference) avec celles recues en production via le **test de Kolmogorov-Smirnov** (KS).

- **KS statistic** : mesure la distance maximale entre les deux distributions (0 = identiques, 1 = completement differentes)
- **p-value < 0.05** : on rejette l'hypothese que les deux distributions sont identiques → drift detecte

Le module `monitoring/drift.py` (fonction `compute_drift_report`) encapsule cette logique.

In [None]:
# KS test sur les top 30 features (reutilise monitoring/drift.py)
drift_report = compute_drift_report(reference, prod_aligned, top_n=30)

n_analyzed = len(drift_report)
n_drifted = int(drift_report["drift_detected"].sum())
drift_pct = n_drifted / n_analyzed * 100

# Determiner le status
if drift_pct > 30:
    status = "ALERT"
elif drift_pct > 10:
    status = "WARNING"
else:
    status = "OK"

print(f"=== Resultats de la detection de drift ===")
print(f"Features analysees : {n_analyzed}")
print(f"Features en drift  : {n_drifted} ({drift_pct:.1f}%)")
print(f"Status             : {status}")
print(f"")
print(f"Rappel des seuils :")
print(f"  < 10%  → OK (surveillance normale)")
print(f"  10-30% → WARNING (investigation necessaire)")
print(f"  > 30%  → ALERT (envisager un re-entrainement)")
print(f"\nTop 15 features par KS statistic :")
drift_report.head(15)

In [None]:
# Visualisation des KS statistics
top_20 = drift_report.head(20)

fig, ax = plt.subplots(figsize=(12, 8))
colors = ['#D32F2F' if d else '#388E3C' for d in top_20['drift_detected']]
ax.barh(range(len(top_20)), top_20['ks_statistic'], color=colors)
ax.set_yticks(range(len(top_20)))
ax.set_yticklabels([f[:35] for f in top_20['feature']], fontsize=9)
ax.set_xlabel('KS Statistic')
ax.set_title('Top 20 Features — Test KS (Reference vs Production)', fontweight='bold', fontsize=13)
ax.axvline(x=0.1, color='orange', linestyle='--', alpha=0.7, label='Seuil modere')
ax.legend()
ax.invert_yaxis()
plt.tight_layout()
plt.show()

print("Rouge = drift detecte (p-value < 0.05), Vert = pas de drift")

### Interpretation

Le drift detecte ici est **attendu et voulu** : notre script de simulation `simulate_production_traffic.py` a envoye 8 batches de predictions a l'API :

- **Batch 1-3** : donnees propres du test set (pas de drift)
- **Batch 4-6** : drift graduel avec intensite croissante (0.1, 0.2, 0.3)
- **Batch 7-8** : drift soudain avec forte intensite (0.5, 0.7)

Comme on analyse toutes les predictions ensemble (y compris les batches driftes), le drift global est eleve. En production reelle, on analyserait par **fenetre temporelle** (ex: les predictions des dernieres 24h) pour detecter le drift au moment ou il apparait.

---

## 4. Rapport Evidently AI

Evidently AI genere un rapport interactif detaille comparant les distributions feature par feature.

In [None]:
from evidently import Report
from evidently.presets import DataDriftPreset

# Rapport Evidently : reference (training) vs production (predictions recentes)
ev_report = Report([DataDriftPreset()])
snapshot = ev_report.run(reference, prod_aligned)

# Sauvegarder en HTML
report_path = Path("../monitoring/drift_report_evidently.html")
snapshot.save_html(str(report_path))
print(f"Rapport Evidently sauvegarde : {report_path}")

# Afficher dans le notebook
snapshot

---

## 5. Distributions comparees — Features critiques du modele

Toutes les features n'ont pas la meme importance. Les **top features** (celles qui influencent le plus les predictions du modele LightGBM) meritent une surveillance prioritaire.

In [None]:
# Top features du modele par importance
model = joblib.load(ARTIFACTS_DIR / "model.pkl")
with open(ARTIFACTS_DIR / "feature_names.json") as f:
    feature_names = json.load(f)

importances = sorted(
    zip(feature_names, model.feature_importances_),
    key=lambda x: x[1], reverse=True
)
top_features = [name for name, _ in importances[:6] if name in common_cols]

print("Top 10 features du modele (par importance) :\n")
for i, (name, imp) in enumerate(importances[:10], 1):
    flag = " <-- critique" if i <= 3 else ""
    print(f"  {i:2d}. {name:<35s} importance = {imp}{flag}")

In [None]:
# Distributions comparees : reference (vert) vs production (rouge)
n_plots = min(6, len(top_features))
fig, axes = plt.subplots(2, 3, figsize=(16, 8))
axes = axes.flatten()

for i, feat in enumerate(top_features[:n_plots]):
    ax = axes[i]
    ax.hist(reference[feat].dropna(), bins=50, alpha=0.6, label='Reference (train)',
            color='#1D6A4B', density=True)
    ax.hist(prod_aligned[feat].dropna(), bins=50, alpha=0.6, label='Production',
            color='#8B2D2D', density=True)
    
    # Ajouter la KS stat
    ks_row = drift_report[drift_report['feature'] == feat]
    ks_val = f"{ks_row['ks_statistic'].iloc[0]:.3f}" if len(ks_row) > 0 else 'N/A'
    ax.set_title(f"{feat}\nKS = {ks_val}", fontsize=10, fontweight='bold')
    ax.legend(fontsize=8)

for j in range(n_plots, len(axes)):
    axes[j].set_visible(False)

plt.suptitle('Distributions comparees — Top Features (Reference vs Production)',
             fontsize=14, fontweight='bold', y=1.02)
plt.tight_layout()
plt.show()

---

## 6. Analyse operationnelle

Au-dela du drift sur les features, le cahier des charges demande de detecter des **problemes operationnels** : taux d'erreur et latence anormale.

In [None]:
# Metriques operationnelles
operational = pd.read_sql("""
    SELECT probability, decision, inference_time_ms, created_at
    FROM predictions
    ORDER BY created_at
""", conn)

fig, axes = plt.subplots(1, 3, figsize=(16, 4))

# 1. Latence au fil du temps
axes[0].plot(range(len(operational)), operational['inference_time_ms'],
             alpha=0.5, linewidth=0.8, color='#1B2A4A')
p95 = operational['inference_time_ms'].quantile(0.95)
axes[0].axhline(y=p95, color='red', linestyle='--', label=f'P95 = {p95:.1f} ms')
axes[0].set_title('Latence d\'inference (ms)', fontweight='bold')
axes[0].set_xlabel('# prediction')
axes[0].set_ylabel('ms')
axes[0].legend()

# 2. Distribution des scores
approved = operational[operational['decision'] == 'APPROVED']['probability']
refused = operational[operational['decision'] == 'REFUSED']['probability']
axes[1].hist(approved, bins=30, alpha=0.7, label='APPROVED', color='#388E3C')
axes[1].hist(refused, bins=30, alpha=0.7, label='REFUSED', color='#D32F2F')
axes[1].axvline(x=0.494, color='black', linestyle='--', label='Seuil (0.494)')
axes[1].set_title('Distribution des scores', fontweight='bold')
axes[1].legend(fontsize=9)

# 3. Repartition des decisions
decision_counts = operational['decision'].value_counts()
colors = ['#388E3C' if d == 'APPROVED' else '#D32F2F' for d in decision_counts.index]
axes[2].pie(decision_counts, labels=decision_counts.index,
            autopct='%1.1f%%', colors=colors)
axes[2].set_title('Repartition des decisions', fontweight='bold')

plt.tight_layout()
plt.show()

print(f"Metriques operationnelles :")
print(f"  Latence moyenne : {operational['inference_time_ms'].mean():.2f} ms")
print(f"  Latence P50     : {operational['inference_time_ms'].median():.2f} ms")
print(f"  Latence P95     : {p95:.2f} ms")
print(f"  Latence max     : {operational['inference_time_ms'].max():.2f} ms")
print(f"  Taux de refus   : {(operational['decision'] == 'REFUSED').mean() * 100:.1f}%")

---

## 7. Historique des rapports de drift

Les rapports de drift sont stockes dans la table `drift_reports` de Postgres. Le script `run_drift_detection.py` les genere automatiquement en comparant les predictions recentes contre la reference.

In [None]:
# Historique des rapports de drift
drift_history = pd.read_sql("""
    SELECT report_date, n_predictions, n_features_analyzed,
           n_features_drifted, drift_percentage, status
    FROM drift_reports
    ORDER BY report_date
""", conn)

if len(drift_history) > 0:
    print(f"Rapports de drift en base : {len(drift_history)}")
    display(drift_history)
else:
    print("Aucun rapport de drift encore. Executez :")
    print("  DATABASE_URL=... python scripts/run_drift_detection.py")

---

## 8. Statistiques de reference (training data)

Les stats descriptives du training data sont pre-calculees et stockees dans la table `training_reference`. Cela permet au script de drift detection de comparer sans recharger le CSV complet a chaque fois.

In [None]:
# Stats de reference stockees en base
ref_stats = pd.read_sql("""
    SELECT feature_name, mean, std, median, q25, q75, n_samples
    FROM training_reference
    ORDER BY feature_name
    LIMIT 10
""", conn)

print(f"Features de reference en base : {pd.read_sql('SELECT count(*) FROM training_reference', conn).iloc[0, 0]}")
print(f"\nApercu (10 premieres) :")
ref_stats

---

## 9. Comment reproduire cette analyse

### Etape 1 : Initialiser la base
```bash
DATABASE_URL=... python scripts/init_db.py
```
Cree les tables et insere les stats de reference du training data.

### Etape 2 : Lancer l'API
```bash
DATABASE_URL=... uvicorn api.app:app --port 8000
```
L'API stocke maintenant chaque prediction dans Postgres.

### Etape 3 : Simuler du trafic avec drift
```bash
python scripts/simulate_production_traffic.py --batch-size 50 --n-batches 8 --delay 2
```
Envoie 400 predictions avec drift progressif :
- Batch 1-3 : donnees propres
- Batch 4-6 : drift graduel (intensite 0.1 → 0.3)
- Batch 7-8 : drift soudain (intensite 0.5 → 0.7)

### Etape 4 : Analyser le drift
```bash
DATABASE_URL=... python scripts/run_drift_detection.py
```
Compare les predictions recentes vs la reference et stocke le rapport dans `drift_reports`.

### Etape 5 : Visualiser
```bash
docker compose up  # Lance API + Dashboard + Fluentd + Grafana
```
- **Grafana** : http://localhost:3000 (admin/admin)
- **Streamlit** : http://localhost:8501

---

## 10. Points de vigilance et recommandations

### Resultats cles

| Aspect | Constat |
|--------|--------|
| **Stockage** | Chaque prediction stockee avec inputs (419 features JSONB), outputs, latence, timestamp |
| **Drift temporel** | Le timestamp des predictions permet une analyse par fenetre temporelle |
| **Features critiques** | EXT_SOURCE_*, AMT_CREDIT, DAYS_BIRTH doivent etre monitorees en priorite |
| **Detection** | Le drift simule est correctement detecte, proportionnellement a l'intensite |

### Seuils d'alerte

| Drift % | Status | Action |
|---------|--------|--------|
| < 10% | OK | Surveillance normale |
| 10-30% | WARNING | Investigation necessaire |
| > 30% | ALERT | Envisager un re-entrainement |

### Stack de monitoring

| Composant | Role |
|-----------|------|
| **Neon Postgres** | Stockage : predictions, drift_reports, api_logs, training_reference |
| **Fluentd** | Collecte des logs API structures → Postgres |
| **Evidently AI** | Rapports de drift detailles (KS test, distributions) |
| **Grafana** | Dashboards temps reel : latence, volume, drift |
| **Streamlit** | Dashboard interactif existant |

### Limites

- **Drift != degradation** : un drift statistique ne signifie pas forcement que le modele performe moins bien. Les deux doivent etre surveilles conjointement.
- **Drift simule vs reel** : nos simulations sont artificielles. En production, le drift est souvent plus subtil et multidimensionnel.
- **Pas de concept drift** : le test set Kaggle n'a pas de TARGET, donc on ne peut mesurer que le data drift, pas l'impact sur la performance du modele.
- **Conformite RGPD** : les features sont anonymisees, les rapports analysent des distributions statistiques, pas des donnees individuelles.

In [None]:
conn.close()
print("Analyse terminee.")
print("\nFichiers generes :")
print("  - monitoring/drift_report_evidently.html")
print("\nTables Postgres :")
print("  - predictions  (inputs, outputs, latence, timestamp)")
print("  - drift_reports (rapports par fenetre temporelle)")
print("  - training_reference (stats de reference du training data)")
print("  - api_logs (logs structures via Fluentd)")