# Projet 7 PREMIUM - Étape 4 : Score Métier et Optimisation Seuil

**Objectif** : Optimiser le seuil selon le coût métier (10×FN + 1×FP)

## Contenu
1. Fonction de coût métier
2. Balayage des seuils (201 valeurs)
3. Identification seuil optimal
4. Comparaison vs seuil par défaut (0.5)
5. Sauvegarde pipeline final

---

In [None]:
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import joblib
import warnings
warnings.filterwarnings('ignore')

print(' Imports OK')

## Chargement des données et modèle

In [None]:
# Charger données et meilleur modèle
DOSSIER_ARTIFACTS = Path('../artifacts').resolve()
X_train, X_valid, y_train, y_valid = joblib.load(DOSSIER_ARTIFACTS / 'data_split.joblib')
pipeline = joblib.load(DOSSIER_ARTIFACTS / 'meilleur_modele.joblib')

print("="*60)
print("PRÉPARATION DES DONNÉES")
print("="*60)

print(f"\nDimensions initiales:")
print(f" X_train: {X_train.shape}")
print(f" X_valid: {X_valid.shape}")
print(f" y_train: {len(y_train)}")
print(f" y_valid: {len(y_valid)}")

## Alignement des features avec le modèle

In [None]:
# Récupérer les features attendues par le modèle
try:
 expected_features = pipeline.feature_name_
 print(" Features récupérées via feature_name_")
except:
 try:
 expected_features = pipeline.booster_.feature_name()
 print(" Features récupérées via booster_.feature_name()")
 except:
 try:
 expected_features = pipeline._Booster.feature_name()
 print(" Features récupérées via _Booster.feature_name()")
 except:
 print(" Impossible de récupérer les features du modèle")
 raise ValueError("Cannot retrieve model features")

print(f"\nFeatures attendues par le modèle: {len(expected_features)}")
print(f"Features dans X_valid: {len(X_valid.columns)}")

# Identifier les différences
current_cols = set(X_valid.columns)
expected_cols = set(expected_features)

missing_cols = list(expected_cols - current_cols)
extra_cols = list(current_cols - expected_cols)

print(f"\nColonnes manquantes: {len(missing_cols)}")
if len(missing_cols) > 0 and len(missing_cols) <= 20:
 print(f" {missing_cols}")
elif len(missing_cols) > 20:
 print(f" {missing_cols[:5]}... (+{len(missing_cols)-5} autres)")

print(f"\nColonnes en trop: {len(extra_cols)}")
if len(extra_cols) > 0 and len(extra_cols) <= 20:
 print(f" {extra_cols}")
elif len(extra_cols) > 20:
 print(f" {extra_cols[:5]}... (+{len(extra_cols)-5} autres)")

In [None]:
# OPTIMISATION: Ajouter toutes les colonnes manquantes en une seule fois
if len(missing_cols) > 0:
 print(f"\nAjout de {len(missing_cols)} colonnes manquantes...")
 
 # Créer un DataFrame avec toutes les colonnes manquantes
 missing_df_valid = pd.DataFrame(0, index=X_valid.index, columns=missing_cols)
 missing_df_train = pd.DataFrame(0, index=X_train.index, columns=missing_cols)
 
 # Concaténer en une seule fois (beaucoup plus efficace)
 X_valid = pd.concat([X_valid, missing_df_valid], axis=1)
 X_train = pd.concat([X_train, missing_df_train], axis=1)
 
 print(" Colonnes manquantes ajoutées")

# Supprimer les colonnes en trop
if len(extra_cols) > 0:
 print(f"\nSuppression de {len(extra_cols)} colonnes en trop...")
 X_valid = X_valid.drop(columns=extra_cols)
 X_train = X_train.drop(columns=extra_cols)
 print(" Colonnes en trop supprimées")

# Réordonner les colonnes dans le bon ordre
print("\nRéordonnancement des colonnes...")
X_valid = X_valid[expected_features]
X_train = X_train[expected_features]
print(" Colonnes réordonnées")

print(f"\nDimensions après alignement:")
print(f" X_train: {X_train.shape}")
print(f" X_valid: {X_valid.shape}")

## Conversion des types de données

In [None]:
print("="*60)
print("CONVERSION DES TYPES")
print("="*60)

# Identifier les colonnes object
object_cols = X_valid.select_dtypes(include=['object']).columns.tolist()
print(f"\nColonnes 'object' à convertir: {len(object_cols)}")

if len(object_cols) > 0:
 print("Exemples de valeurs:")
 for col in object_cols[:3]:
 print(f" {col}: {X_valid[col].unique()[:3]}")

# Convertir les colonnes object en numérique
if len(object_cols) > 0:
 print(f"\nConversion de {len(object_cols)} colonnes...")
 for col in object_cols:
 X_valid[col] = pd.to_numeric(X_valid[col], errors='coerce')
 X_train[col] = pd.to_numeric(X_train[col], errors='coerce')
 print(" Conversion terminée")

# Gérer les valeurs manquantes
nan_count_valid = X_valid.isna().sum().sum()
nan_count_train = X_train.isna().sum().sum()

print(f"\nValeurs NaN à traiter:")
print(f" X_valid: {nan_count_valid:,}")
print(f" X_train: {nan_count_train:,}")

if nan_count_valid > 0 or nan_count_train > 0:
 X_valid = X_valid.fillna(0)
 X_train = X_train.fillna(0)
 print(" Valeurs NaN remplacées par 0")

# Vérification finale
print(f"\nTypes de données finaux:")
print(X_valid.dtypes.value_counts())

print(f"\nVérification des NaN restants:")
print(f" X_valid: {X_valid.isna().sum().sum()}")
print(f" X_train: {X_train.isna().sum().sum()}")

## Génération des probabilités

In [None]:
print("="*60)
print("GÉNÉRATION DES PRÉDICTIONS")
print("="*60)

# Générer les probabilités
print("\nGénération des probabilités...")
proba_valid = pipeline.predict_proba(X_valid)[:, 1]

print(f"\n Probabilités générées: {len(proba_valid):,}")
print(f"\nStatistiques des probabilités:")
print(f" Min: {proba_valid.min():.4f}")
print(f" Max: {proba_valid.max():.4f}")
print(f" Moyenne: {proba_valid.mean():.4f}")
print(f" Médiane: {np.median(proba_valid):.4f}")
print(f" Q25: {np.percentile(proba_valid, 25):.4f}")
print(f" Q75: {np.percentile(proba_valid, 75):.4f}")

## Fonction de coût métier

In [None]:
def cout_metier(y_true, y_pred, cout_fn=10, cout_fp=1):
 """
 Calcule le coût métier : cout_fn × FN + cout_fp × FP
 
 Paramètres:
 -----------
 y_true : array-like
 Vraies étiquettes
 y_pred : array-like
 Prédictions
 cout_fn : float, default=10
 Coût d'un faux négatif (ne pas détecter un défaut)
 cout_fp : float, default=1
 Coût d'un faux positif (refuser à tort)
 
 Retourne:
 ---------
 cout_total : float
 Coût total
 """
 tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
 return cout_fn * fn + cout_fp * fp

print(" Fonction de coût métier définie")
print("\nFormule : Coût = 10 × FN + 1 × FP")
print(" - FN (Faux Négatif) : Ne pas détecter un défaut → Perte de 10 unités")
print(" - FP (Faux Positif) : Refuser à tort → Perte de 1 unité")

## Balayage des seuils

In [None]:
print("="*60)
print("OPTIMISATION DU SEUIL DE DÉCISION")
print("="*60)

# Définir la plage de seuils à tester
seuils = np.linspace(0, 1, 201)
couts = []

print(f"\nTest de {len(seuils)} seuils...")
print(f" Plage : {seuils.min():.3f} à {seuils.max():.3f}")
print(f" Pas : {seuils[1] - seuils[0]:.4f}")

# Calculer le coût pour chaque seuil
for seuil in seuils:
 y_pred = (proba_valid >= seuil).astype(int)
 cout = cout_metier(y_valid, y_pred)
 couts.append(cout)

# Trouver le seuil optimal
idx_optimal = np.argmin(couts)
seuil_optimal = seuils[idx_optimal]
cout_optimal = couts[idx_optimal]

print("\n" + "="*60)
print("RÉSULTATS DE L'OPTIMISATION")
print("="*60)

print(f"\n SEUIL OPTIMAL : {seuil_optimal:.4f}")
print(f" Coût optimal : {cout_optimal:,.0f} unités")
print(f" Position : {idx_optimal}/{len(seuils)} seuils testés")

## Comparaison avec le seuil par défaut

In [None]:
# Calculer les métriques pour le seuil par défaut (0.5)
y_pred_default = (proba_valid >= 0.5).astype(int)
cout_default = cout_metier(y_valid, y_pred_default)

# Calculer les métriques pour le seuil optimal
y_pred_optimal = (proba_valid >= seuil_optimal).astype(int)

# Matrices de confusion
cm_default = confusion_matrix(y_valid, y_pred_default)
cm_optimal = confusion_matrix(y_valid, y_pred_optimal)

# Calculer le gain
gain = cout_default - cout_optimal
gain_pct = (gain / cout_default) * 100

print("="*60)
print("COMPARAISON SEUIL PAR DÉFAUT vs SEUIL OPTIMAL")
print("="*60)

print(f"\n SEUIL PAR DÉFAUT (0.500):")
print(f" Coût total : {cout_default:,.0f} unités")
print(f" Matrice de confusion:")
print(f" TN={cm_default[0,0]:,} FP={cm_default[0,1]:,}")
print(f" FN={cm_default[1,0]:,} TP={cm_default[1,1]:,}")

print(f"\n SEUIL OPTIMAL ({seuil_optimal:.4f}):")
print(f" Coût total : {cout_optimal:,.0f} unités")
print(f" Matrice de confusion:")
print(f" TN={cm_optimal[0,0]:,} FP={cm_optimal[0,1]:,}")
print(f" FN={cm_optimal[1,0]:,} TP={cm_optimal[1,1]:,}")

print(f"\n GAIN:")
print(f" Réduction de coût : {gain:,.0f} unités")
print(f" Amélioration : {gain_pct:.2f}%")

if gain > 0:
 print(f"\n Le seuil optimal améliore significativement les performances!")
else:
 print(f"\n Le seuil par défaut est déjà performant")

## Visualisation

In [None]:
# Créer le graphique
plt.figure(figsize=(12, 6))

# Courbe du coût en fonction du seuil
plt.plot(seuils, couts, linewidth=2, label='Coût métier', color='steelblue')

# Marquer le seuil optimal
plt.axvline(seuil_optimal, color='green', linestyle='--', linewidth=2, 
 label=f'Seuil optimal ({seuil_optimal:.4f})')
plt.scatter([seuil_optimal], [cout_optimal], color='green', s=200, zorder=5,
 marker='*', edgecolors='darkgreen', linewidths=2)

# Marquer le seuil par défaut
plt.axvline(0.5, color='red', linestyle='--', linewidth=2, alpha=0.7,
 label=f'Seuil par défaut (0.500)')
plt.scatter([0.5], [cout_default], color='red', s=200, zorder=5,
 marker='X', edgecolors='darkred', linewidths=2)

plt.xlabel('Seuil de décision', fontsize=12)
plt.ylabel('Coût métier (10×FN + 1×FP)', fontsize=12)
plt.title('Optimisation du seuil de décision selon le coût métier', fontsize=14, fontweight='bold')
plt.legend(fontsize=10, loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()

# Sauvegarder le graphique
plt.savefig(DOSSIER_ARTIFACTS / 'optimisation_seuil.png', dpi=300, bbox_inches='tight')
print("\n Graphique sauvegardé : optimisation_seuil.png")

plt.show()

## Sauvegarde du pipeline final

In [None]:
print("="*60)
print("SAUVEGARDE DU PIPELINE FINAL")
print("="*60)

# Créer un dictionnaire avec tous les paramètres de décision
parametres_decision = {
 'seuil_optimal': seuil_optimal,
 'cout_optimal': cout_optimal,
 'cout_default': cout_default,
 'gain': gain,
 'gain_pct': gain_pct,
 'cout_fn': 10,
 'cout_fp': 1,
 'n_seuils_testes': len(seuils)
}

# Sauvegarder le pipeline
joblib.dump(pipeline, DOSSIER_ARTIFACTS / 'pipeline_final.joblib')

# Sauvegarder les paramètres de décision
joblib.dump(parametres_decision, DOSSIER_ARTIFACTS / 'parametres_decision.joblib')

# Sauvegarder les noms de colonnes
joblib.dump(list(X_valid.columns), DOSSIER_ARTIFACTS / 'feature_names.joblib')

print("\n SAUVEGARDE TERMINÉE")
print(f" Dossier: {DOSSIER_ARTIFACTS}")
print(f" Pipeline final: pipeline_final.joblib")
print(f" Paramètres métier: parametres_decision.joblib")
print(f" Feature names: feature_names.joblib")

print(f"\n PARAMÈTRES SAUVEGARDÉS:")
for k, v in parametres_decision.items():
 if isinstance(v, float):
 print(f" {k}: {v:.4f}")
 else:
 print(f" {k}: {v}")

print(f"\n POUR CHARGER EN PRODUCTION:")
print(f" pipeline = joblib.load('artifacts/pipeline_final.joblib')")
print(f" params = joblib.load('artifacts/parametres_decision.joblib')")
print(f" feature_names = joblib.load('artifacts/feature_names.joblib')")
print(f" seuil = params['seuil_optimal']")

## Résumé final

In [None]:
print("="*60)
print("RÉSUMÉ FINAL")
print("="*60)

print(f"\n Fonction coût métier : 10×FN + 1×FP")
print(f" {len(seuils)} seuils testés (0.000 à 1.000)")
print(f" Seuil optimal identifié : {seuil_optimal:.4f}")
print(f" Gain vs seuil défaut : {gain_pct:.2f}%")
print(f" Pipeline final sauvegardé")

print(f"\n IMPACT BUSINESS:")
print(f" Économie de {gain:,.0f} unités par batch de {len(y_valid):,} demandes")
if len(y_valid) > 0:
 economie_par_demande = gain / len(y_valid)
 print(f" Soit {economie_par_demande:.4f} unités par demande")

print("\n" + "="*60)
print("Prochaine étape : Interprétabilité SHAP")
print("="*60)

In [None]:
#!/usr/bin/env python3
"""
Script d'extraction des figures - Notebook 05
Optimisation du seuil métier (coût business)
"""

import json
import base64
import os
from pathlib import Path

def extract_figures_from_notebook(notebook_path, output_dir):
 """
 Extrait toutes les figures du notebook d'optimisation du seuil métier
 """
 # Créer le dossier de sortie
 output_dir = Path(output_dir)
 output_dir.mkdir(parents=True, exist_ok=True)
 
 # Charger le notebook
 with open(notebook_path, 'r', encoding='utf-8') as f:
 notebook = json.load(f)
 
 figure_count = 0
 
 # Parcourir toutes les cellules
 for cell_idx, cell in enumerate(notebook['cells']):
 # Chercher les cellules avec outputs
 if cell['cell_type'] == 'code' and 'outputs' in cell:
 for output in cell['outputs']:
 # Chercher les images PNG
 if 'data' in output and 'image/png' in output['data']:
 figure_count += 1
 
 # Décoder l'image base64
 img_data = output['data']['image/png']
 img_bytes = base64.b64decode(img_data)
 
 # Déterminer le nom du fichier selon le contexte
 cell_source = ''.join(cell['source']).lower()
 
 if 'coût' in cell_source or 'cout' in cell_source or 'cost' in cell_source:
 filename = f'nb05_fig{figure_count:02d}_cout_vs_seuil.png'
 elif 'fn' in cell_source and 'fp' in cell_source:
 filename = f'nb05_fig{figure_count:02d}_fn_fp_evolution.png'
 elif 'optimal' in cell_source or 'seuil' in cell_source:
 filename = f'nb05_fig{figure_count:02d}_seuil_optimal.png'
 elif 'business' in cell_source or 'impact' in cell_source:
 filename = f'nb05_fig{figure_count:02d}_impact_business.png'
 else:
 filename = f'nb05_fig{figure_count:02d}_figure.png'
 
 # Sauvegarder l'image
 output_path = output_dir / filename
 with open(output_path, 'wb') as img_file:
 img_file.write(img_bytes)
 
 print(f" Extrait : {filename}")
 
 print(f"\n Total : {figure_count} figures extraites du Notebook 05")
 return figure_count


if __name__ == '__main__':
 # Chemins
 NOTEBOOK_PATH = 'Barre_Stephane_P7_05_optimisation_seuil.ipynb'
 OUTPUT_DIR = 'outputs/figures_p7/notebook_05'
 
 # Extraction
 print(" Extraction des figures du Notebook 05...")
 print(f" Notebook : {NOTEBOOK_PATH}")
 print(f" Sortie : {OUTPUT_DIR}\n")
 
 extract_figures_from_notebook(NOTEBOOK_PATH, OUTPUT_DIR)