In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import (
    GradientBoostingRegressor, HistGradientBoostingRegressor,
    RandomForestRegressor, ExtraTreesRegressor,
    StackingRegressor, VotingRegressor, BaggingRegressor,
    IsolationForest
)
from sklearn.linear_model import (
    Ridge, HuberRegressor, QuantileRegressor, 
    BayesianRidge, ElasticNet
)
from sklearn.neighbors import KNeighborsRegressor
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import (
    cross_val_score, cross_val_predict, KFold,
    train_test_split, GridSearchCV, RandomizedSearchCV
)
from sklearn.metrics import (
    mean_squared_error, r2_score, mean_absolute_error,
    make_scorer
)
from sklearn.preprocessing import (
    RobustScaler, StandardScaler, QuantileTransformer,
    PowerTransformer
)
from sklearn.feature_selection import mutual_info_regression, RFECV
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, RegressorMixin, clone
from sklearn.calibration import calibration_curve
from scipy.stats import iqr, norm
from scipy.optimize import minimize_scalar
import joblib
import warnings
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import json

warnings.filterwarnings('ignore')

In [None]:
file_path = #HIDDEN
combined_data = pd.read_excel(file_path)

In [None]:
QUALITES = #HIDDEN
OUTPUT_DIR = #HIDDEN
os.makedirs(OUTPUT_DIR, exist_ok=True)

FEATURES_POOL = [
    #HIDDEN
]

CATEGORICAL_FEATURES = {
    #HIDDEN
}

ALL_CATEGORICAL = #HIDDEN

SEUILS_PRODUCTION = {
    'rmse_max': #HIDDEN,
    'mae_max': #HIDDEN,
    'erreur_max_acceptable': #HIDDEN,
    'pct_within_5_min': #HIDDEN,
    'confidence_threshold': #HIDDEN,
}

CIBLES = {
    #HIDDEN
}

TOLERANCE_TERRAIN = #HIDDEN

In [None]:
def rmse_score(y_true, y_pred):
    return -np.sqrt(mean_squared_error(y_true, y_pred))

def max_error_score(y_true, y_pred):
    return -np.max(np.abs(y_true - y_pred))

def p95_error_score(y_true, y_pred):
    return -np.percentile(np.abs(y_true - y_pred), 95)

def robust_score(y_true, y_pred):
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    p95 = np.percentile(np.abs(y_true - y_pred), 95)
    max_err = np.max(np.abs(y_true - y_pred))
    return -(0.5 * rmse + 0.3 * p95 + 0.2 * max_err)

rmse_scorer = make_scorer(rmse_score, greater_is_better=True)
robust_scorer = make_scorer(robust_score, greater_is_better=True)

In [None]:
class CategoricalEncoder:
    """Encodeur robuste pour variables cat√©gorielles avec Target Encoding."""
    
    def __init__(self, smoothing: float = 10.0):
        self.smoothing = smoothing
        self.encodings = {}
        self.global_means = {}
        self.category_counts = {}
        self.is_fitted = False
    
    def fit(self, X: pd.DataFrame, y: pd.Series, categorical_cols: List[str]):
        self.categorical_cols = [c for c in categorical_cols if c in X.columns]
        
        if not self.categorical_cols:
            self.is_fitted = True
            return self
        
        global_mean = y.mean()
        
        for col in self.categorical_cols:
            self.global_means[col] = global_mean
            self.encodings[col] = {}
            self.category_counts[col] = {}
            
            df_temp = pd.DataFrame({'cat': X[col], 'target': y})
            stats = df_temp.groupby('cat')['target'].agg(['mean', 'count'])
            
            for category, row in stats.iterrows():
                cat_mean = row['mean']
                cat_count = row['count']
                smoothed_mean = (
                    (cat_count * cat_mean + self.smoothing * global_mean) / 
                    (cat_count + self.smoothing)
                )
                self.encodings[col][category] = smoothed_mean
                self.category_counts[col][category] = cat_count
        
        self.is_fitted = True
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X_out = X.copy()
        
        if not self.is_fitted or not self.categorical_cols:
            return X_out
        
        for col in self.categorical_cols:
            if col not in X_out.columns:
                continue
            
            encoded_col = f'{col}_encoded'
            X_out[encoded_col] = X_out[col].map(self.encodings[col])
            X_out[encoded_col] = X_out[encoded_col].fillna(self.global_means[col])
            X_out[f'{col}_unknown'] = (~X_out[col].isin(self.encodings[col])).astype(int)
            
            total_count = sum(self.category_counts[col].values())
            rare_threshold = total_count * 0.05
            rare_categories = [
                cat for cat, count in self.category_counts[col].items() 
                if count < rare_threshold
            ]
            X_out[f'{col}_rare'] = X_out[col].isin(rare_categories).astype(int)
            X_out = X_out.drop(columns=[col])
        
        return X_out
    
    def fit_transform(self, X: pd.DataFrame, y: pd.Series, categorical_cols: List[str]) -> pd.DataFrame:
        return self.fit(X, y, categorical_cols).transform(X)
    
    def get_category_stats(self, col: str) -> pd.DataFrame:
        if col not in self.encodings:
            return pd.DataFrame()
        
        return pd.DataFrame({
            'category': list(self.encodings[col].keys()),
            'encoded_value': list(self.encodings[col].values()),
            'count': [self.category_counts[col].get(k, 0) for k in self.encodings[col].keys()]
        }).sort_values('encoded_value', ascending=False)



In [None]:
class RobustFeatureEngineer:
    """
    Handles feature engineering with a focus on robustness against outliers.
    Computes statistics based on Median/IQR rather than Mean/Std.
    """
    
    def __init__(self):
        self.feature_stats = {}
        self.is_fitted = False
    
    def fit(self, X: pd.DataFrame, y: pd.Series = None):
        """
        Compute robust statistics (Median, IQR, Quartiles) for numeric columns.
        These are stored to ensure consistent scaling during inference.
        """
        for col in X.columns:
            # Skip categorical/object types to prevent type errors
            if X[col].dtype in ['object', 'category']:
                continue
                
            # Storing stats for Robust Scaler logic later
            self.feature_stats[col] = {
                'median': X[col].median(),
                'iqr': iqr(X[col].dropna()),  # Dropna strictly required for scipy iqr
                'q1': X[col].quantile(0.25),
                'q3': X[col].quantile(0.75),
                'min': X[col].min(),
                'max': X[col].max()
            }
        self.is_fitted = True
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X_out = X.copy()
        
        # Interaction features
        interactions = [
                #HIDDEN
        ]
        
        for f1, f2 in interactions:
            if f1 in X_out.columns and f2 in X_out.columns:
                # Ensure we only multiply numeric features
                if X_out[f1].dtype not in ['object', 'category'] and X_out[f2].dtype not in ['object', 'category']:
                    X_out[f'{f1}_x_{f2}'] = X_out[f1] * X_out[f2]
        
        # Gap analysis (ratios and diffs)
        gap_pairs = #HIDDEN
        
        for gd, gg in gap_pairs:
            if gd in X_out.columns and gg in X_out.columns:
                prefix = gd[:5]
                # Basic differential
                X_out[f'{prefix}_diff'] = X_out[gd] - X_out[gg]
                
                # Ratio with epsilon (1e-6) to prevent DivisionByZero errors
                X_out[f'{prefix}_ratio'] = X_out[gd] / (X_out[gg] + 1e-6)
                
                # Aggregations to capture range and center
                X_out[f'{prefix}_mean'] = (X_out[gd] + X_out[gg]) / 2
                X_out[f'{prefix}_max'] = np.maximum(X_out[gd], X_out[gg])
                X_out[f'{prefix}_min'] = np.minimum(X_out[gd], X_out[gg])
        
        # Trend analysis
        # Checks if all required hidden columns exist before computing trends
        if all(f'#HIDDEN' in X_out.columns for i in [1, 2, 3]):
            # Calculate instant delta
            X_out['#HIDDEN'] = X_out['#HIDDEN'] - X_out['#HIDDEN']
            # Calculate a simple moving average (window=3)
            X_out['#HIDDEN'] = (
                X_out['#HIDDEN'] + X_out['#HIDDEN'] + X_out['#HIDDEN']
            ) / 3
        
        #  Physics-based deatures
        if '#HIDDEN' in X_out.columns:
            # Safe division for rate calculation
            if '#HIDDEN' in X_out.columns:
                X_out['#HIDDEN'] = X_out['#HIDDEN'] / (X_out['#HIDDEN'] + 1e-6)
            
            # Non-linear transformation (square)
            X_out['#HIDDEN'] = X_out['#HIDDEN'] ** 2
            
            # Interaction term if specific column exists
            if '#HIDDEN' in X_out.columns:
                X_out['#HIDDEN'] = X_out['#HIDDEN'] * X_out['#HIDDEN']
        
        # Outlier detection
        if self.is_fitted:
            for col in ['#HIDDEN', '#HIDDEN', '#HIDDEN', '#HIDDEN']:
                if col in X_out.columns and col in self.feature_stats:
                    stats = self.feature_stats[col]
                    
                    # Apply robust Z-Score: (x - median) / IQR
                    if stats['iqr'] > 0:
                        X_out[f'{col}_zscore_robust'] = (
                            (X_out[col] - stats['median']) / stats['iqr']
                        )
                    
                    # Flag outliers
                    X_out[f'{col}_outlier_flag'] = (
                        (X_out[col] < stats['q1'] - 1.5 * stats['iqr']) |
                        (X_out[col] > stats['q3'] + 1.5 * stats['iqr'])
                    ).astype(int)
        
        # Polynomial features
        for col in ['#HIDDEN', '#HIDDEN']:
            if col in X_out.columns:
                X_out[f'{col}_sq'] = X_out[col] ** 2
        
        return X_out
    
    def fit_transform(self, X: pd.DataFrame, y: pd.Series = None) -> pd.DataFrame:
        return self.fit(X, y).transform(X)

In [None]:
class StrictOutOfDomainDetector:
    """
    Detects if new observations are outside the training domain.
    Combines strict quantile bounding with isolation forest for robust rejection.
    """
    
    def __init__(
        self, 
        contamination: float = 0.05,
        quantile_low: float = 0.05,
        quantile_high: float = 0.95,
        strict_mode: bool = False,
        n_features_tolerance: int = 0
    ):
        self.contamination = contamination
        self.quantile_low = quantile_low
        self.quantile_high = quantile_high
        self.strict_mode = strict_mode
        self.n_features_tolerance = n_features_tolerance
        
        # isolation forest handles multivariate anomalies
        self.isolation_forest = IsolationForest(
            contamination=contamination,
            random_state=42,
            n_jobs=-1
        )
        # robust scaler minimizes the effect of outliers on distance calculations
        self.scaler = RobustScaler()
        self.train_centroid = None
        self.train_std = None
        self.feature_bounds = {}
        self.feature_names = []
    
    def fit(self, X: pd.DataFrame):
        """Learns the statistical boundaries and fits the anomaly detector."""
        self.feature_names = list(X.columns)
        
        X_scaled = self.scaler.fit_transform(X)
        self.isolation_forest.fit(X_scaled)
        
        # compute centroid on scaled space for distance-based rejection later
        self.train_centroid = np.median(X_scaled, axis=0)
        self.train_std = np.std(X_scaled, axis=0)
        
        # calculate hard acceptance boundaries per feature (raw scale)
        print(f"\n   üìè feature bounds (Q{self.quantile_low*100:.0f}% - Q{self.quantile_high*100:.0f}%):")
        for col in X.columns:
            q_low = X[col].quantile(self.quantile_low)
            q_high = X[col].quantile(self.quantile_high)
            self.feature_bounds[col] = {
                'min': q_low,
                'max': q_high,
                'median': X[col].median(),
                'std': X[col].std()
            }
            print(f"      ‚Ä¢ {col}: [{q_low:.4f}, {q_high:.4f}]")
        
        return self
    
    def check_bounds(self, X: pd.DataFrame) -> Tuple[np.ndarray, List[List[str]]]:
        """Checks if features are strictly within the learned quantiles."""
        n_samples = len(X)
        n_violations = np.zeros(n_samples)
        features_out_of_bounds = [[] for _ in range(n_samples)]
        
        for col in X.columns:
            if col in self.feature_bounds:
                bounds = self.feature_bounds[col]
                
                # vectorised check for efficiency
                too_low = X[col] < bounds['min']
                too_high = X[col] > bounds['max']
                violations = too_low | too_high
                
                n_violations += violations.astype(int)
                
                # collect verbose error messages for audit logs
                for i in range(n_samples):
                    # robust indexing check (handles both series and numpy arrays)
                    if violations.iloc[i] if hasattr(violations, 'iloc') else violations[i]:
                        val = X[col].iloc[i] if hasattr(X[col], 'iloc') else X[col][i]
                        if too_low.iloc[i] if hasattr(too_low, 'iloc') else too_low[i]:
                            features_out_of_bounds[i].append(f"{col}={val:.4f} < {bounds['min']:.4f}")
                        else:
                            features_out_of_bounds[i].append(f"{col}={val:.4f} > {bounds['max']:.4f}")
        
        return n_violations, features_out_of_bounds
    
    def predict(self, X: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, List[List[str]]]:
        """
        returns:
            - confidence: composite score (0-1)
            - should_reject_strict: boolean flag based on hard bounds
            - features_out_of_bounds: details on violations
        """
        X_scaled = self.scaler.transform(X)
        
        # strict check on individual features first
        n_violations, features_out_of_bounds = self.check_bounds(X)
        should_reject_strict = n_violations > self.n_features_tolerance
        
        # isolation forest score (normalized to 0-1)
        if_scores = self.isolation_forest.decision_function(X_scaled)
        if_min, if_max = if_scores.min(), if_scores.max()
        if if_max - if_min > 1e-10:
            if_confidence = (if_scores - if_min) / (if_max - if_min)
        else:
            if_confidence = np.ones(len(X))
        
        # geometric distance from training centroid
        distances = np.sqrt(np.sum((X_scaled - self.train_centroid) ** 2, axis=1))
        max_dist = np.percentile(distances, 99) # clip extreme outliers
        dist_confidence = 1 - np.clip(distances / max_dist, 0, 1)
        
        # bound violation penalty
        max_violations = len(self.feature_bounds)
        bound_confidence = 1 - np.clip(n_violations / max(max_violations, 1), 0, 1)
        
        # weighted ensemble of the three metrics
        # heavy weight on hard bounds (0.4) vs statistical outliers (0.3 each)
        confidence = 0.3 * if_confidence + 0.3 * dist_confidence + 0.4 * bound_confidence
        
        # force zero confidence if strict mode criteria are met
        if self.strict_mode:
            confidence[should_reject_strict] = 0.0
        
        return confidence, should_reject_strict, features_out_of_bounds
    
    def get_rejection_mask(self, X: pd.DataFrame, threshold: float = 0.5) -> Tuple[np.ndarray, List[List[str]]]:
        """Returns the final boolean decision mask."""
        confidence, should_reject_strict, features_out = self.predict(X)
        
        # if strict mode, reject on either strict flag or low confidence score
        if self.strict_mode:
            should_reject = should_reject_strict | (confidence < threshold)
        else:
            should_reject = confidence < threshold
        
        return should_reject, features_out

In [None]:
class QuantileEnsembleRegressor(BaseEstimator, RegressorMixin):
    """
    Ensemble wrapper that predicts the median and confidence intervals.
    Uses HistGradientBoosting for efficient quantile regression on large datasets.
    """
    
    def __init__(self, n_estimators: int = 100, quantiles: List[float] = [0.1, 0.5, 0.9]):
        self.n_estimators = n_estimators # note: currently unused, reserved for future bagging implementation
        self.quantiles = quantiles
        self.models = {}
        self.base_model = None
    
    def fit(self, X, y):
        # training the primary median estimator with standard regularization
        # used as the baseline for the main prediction
        self.base_model = HistGradientBoostingRegressor(
            loss='quantile',
            quantile=0.5,
            max_iter=200,
            max_depth=6,
            min_samples_leaf=20,
            learning_rate=0.05,
            l2_regularization=1.0,
            early_stopping=True,
            validation_fraction=0.1,
            n_iter_no_change=15,
            random_state=42
        )
        self.base_model.fit(X, y)
        
        for q in self.quantiles:
            # training specific quantile estimators (including bounds)
            # using slightly higher regularization (l2=1.5) to ensure smoother bounds and reduce crossing
            model = HistGradientBoostingRegressor(
                loss='quantile',
                quantile=q,
                max_iter=150, # slightly fewer iterations to prevent overfitting extreme quantiles
                max_depth=5,
                min_samples_leaf=25,
                learning_rate=0.05,
                l2_regularization=1.5,
                early_stopping=True,
                validation_fraction=0.1,
                n_iter_no_change=10,
                random_state=42
            )
            model.fit(X, y)
            self.models[q] = model
        
        return self
    
    def predict(self, X) -> np.ndarray:
        # defaults to the median prediction (q=0.5) for standard regression usage
        return self.models[0.5].predict(X)
    
    def predict_quantiles(self, X) -> Dict[float, np.ndarray]:
        # returns dictionary mapping quantile levels to their predictions
        return {q: model.predict(X) for q, model in self.models.items()}
    
    def predict_with_uncertainty(self, X) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        # helper to unpack median, lower bound, and upper bound
        preds = self.predict_quantiles(X)
        median = preds[0.5]
        # dynamically find widest interval requested (e.g. 10th and 90th percentiles)
        lower = preds[min(self.quantiles)]
        upper = preds[max(self.quantiles)]
        return median, lower, upper
    
    def get_uncertainty(self, X) -> np.ndarray:
        # returns the magnitude of the confidence interval (width)
        # useful for filtering out low-confidence predictions
        _, lower, upper = self.predict_with_uncertainty(X)
        return upper - lower

In [None]:
class RobustStackingRegressor(BaseEstimator, RegressorMixin):
    """
    Stacking ensemble that combines diverse base models via a meta-learner.
    Includes a post-training calibration step to correct systematic bias.
    """
    
    def __init__(self):
        self.base_models = {}
        self.meta_model = None
        self.scaler = RobustScaler()
        self.calibration_params = {}
    
    def _create_base_models(self) -> Dict:
        # mixing tree-based (non-linear) and linear models to maximize ensemble diversity
        # huber helps with outliers, while boosting captures complex patterns
        return {
            'hgb_1': HistGradientBoostingRegressor(
                max_iter=200, max_depth=5, min_samples_leaf=20,
                learning_rate=0.05, l2_regularization=1.0,
                early_stopping=True, validation_fraction=0.1,
                random_state=42
            ),
            'hgb_2': HistGradientBoostingRegressor(
                max_iter=150, max_depth=7, min_samples_leaf=15,
                learning_rate=0.03, l2_regularization=2.0,
                early_stopping=True, validation_fraction=0.1,
                random_state=43
            ),
            'rf': RandomForestRegressor(
                n_estimators=200, max_depth=8, min_samples_leaf=10,
                max_features=0.5, max_samples=0.8,
                random_state=42, n_jobs=-1
            ),
            'et': ExtraTreesRegressor(
                n_estimators=200, max_depth=8, min_samples_leaf=10,
                max_features=0.5, bootstrap=True,
                random_state=42, n_jobs=-1
            ),
            'huber': Pipeline([
                ('scaler', RobustScaler()),
                ('model', HuberRegressor(epsilon=1.35, alpha=0.001, max_iter=500))
            ]),
            'ridge': Pipeline([
                ('scaler', RobustScaler()),
                ('model', Ridge(alpha=10.0))
            ]),
            'bayesian': Pipeline([
                ('scaler', RobustScaler()),
                ('model', BayesianRidge())
            ]),
        }
    
    def fit(self, X, y):
        self.base_models = self._create_base_models()
        
        n_samples = len(X)
        n_models = len(self.base_models)
        # storage for out-of-fold predictions to train the meta-learner without leakage
        oof_predictions = np.zeros((n_samples, n_models))
        
        kf = KFold(n_splits=5, shuffle=True, random_state=42)
        
        print("   Entra√Ænement des mod√®les de base...")
        for i, (name, model) in enumerate(self.base_models.items()):
            print(f"      ‚Ä¢ {name}...", end=" ")
            
            oof_pred = np.zeros(n_samples)
            
            # generating oof predictions via cross-validation
            for train_idx, val_idx in kf.split(X):
                # safe indexing for both pandas dfs and numpy arrays
                X_train_fold = X.iloc[train_idx] if hasattr(X, 'iloc') else X[train_idx]
                y_train_fold = y.iloc[train_idx] if hasattr(y, 'iloc') else y[train_idx]
                X_val_fold = X.iloc[val_idx] if hasattr(X, 'iloc') else X[val_idx]
                
                model_clone = clone(model)
                model_clone.fit(X_train_fold, y_train_fold)
                oof_pred[val_idx] = model_clone.predict(X_val_fold)
            
            oof_predictions[:, i] = oof_pred
            rmse = np.sqrt(mean_squared_error(y, oof_pred))
            print(f"RMSE CV = {rmse:.3f}")
        
        # retrain all base models on the full dataset for deployment
        for name, model in self.base_models.items():
            model.fit(X, y)
        
        print("   Entra√Ænement du meta-learner...")
        # ridge regression ensures stable coefficients even if base models are correlated
        self.meta_model = Ridge(alpha=1.0)
        self.meta_model.fit(oof_predictions, y)
        
        meta_predictions = self.meta_model.predict(oof_predictions)
        self._calibrate(y, meta_predictions)
        
        print(f"   Poids du meta-learner: {dict(zip(self.base_models.keys(), self.meta_model.coef_))}")
        
        return self
    
    def _calibrate(self, y_true, y_pred):
        # computes global bias to shift predictions if the ensemble systematically over/undershoots
        residuals = y_true - y_pred
        self.calibration_params = {
            'bias': np.mean(residuals),
            'scale': 1.0
        }
    
    def predict(self, X) -> np.ndarray:
        # get predictions from all base models
        base_preds = np.column_stack([
            model.predict(X) for model in self.base_models.values()
        ])
        # combine using meta-learner weights
        raw_pred = self.meta_model.predict(base_preds)
        # apply final calibration shift
        calibrated_pred = raw_pred + self.calibration_params['bias']
        return calibrated_pred
    
    def predict_all_base(self, X) -> Dict[str, np.ndarray]:
        # helper to inspect individual model contributions
        return {name: model.predict(X) for name, model in self.base_models.items()}

In [None]:
class ProductionD43Model(BaseEstimator, RegressorMixin):
    """
    Top-level wrapper for the production pipeline.
    Enforces strict domain checks to prevent unreliable predictions on anomalous data.
    """
    
    def __init__(
        self, 
        rejection_threshold: float = 0.5, 
        qualite: str = None,
        strict_mode: bool = True,
        quantile_low: float = 0.05,
        quantile_high: float = 0.95,
        n_features_tolerance: int = 0
    ):
        self.rejection_threshold = rejection_threshold
        self.qualite = qualite
        self.strict_mode = strict_mode
        self.quantile_low = quantile_low
        self.quantile_high = quantile_high
        self.n_features_tolerance = n_features_tolerance
        
        # initializing pipeline components
        # smoothing=10 prevents overfitting on rare categories
        self.categorical_encoder = CategoricalEncoder(smoothing=10.0)
        self.feature_engineer = RobustFeatureEngineer()
        
        # the safety net: detects if input data is statistically similar to training data
        self.ood_detector = StrictOutOfDomainDetector(
            contamination=0.05,
            quantile_low=quantile_low,
            quantile_high=quantile_high,
            strict_mode=strict_mode,
            n_features_tolerance=n_features_tolerance
        )
        
        # hybrid approach: stacking for accuracy, quantile ensemble for uncertainty estimation
        self.stacking_model = RobustStackingRegressor()
        self.quantile_model = QuantileEnsembleRegressor(quantiles=[0.1, 0.5, 0.9])
        
        self.feature_selector = None
        self.selected_features = None
        self.categorical_cols = []
        self.training_stats = {}
    
    def fit(self, X: pd.DataFrame, y: pd.Series):
        print(f"   ‚Ä¢ Mode strict: {self.strict_mode}")
        print(f"   ‚Ä¢ Quantiles: [{self.quantile_low*100:.0f}%, {self.quantile_high*100:.0f}%]")
        print(f"   ‚Ä¢ Tol√©rance features hors bornes: {self.n_features_tolerance}")
        
        # filter categorical columns based on the specific quality config
        self.categorical_cols = CATEGORICAL_FEATURES.get(self.qualite, [])
        self.categorical_cols = [c for c in self.categorical_cols if c in X.columns]
        
        if self.categorical_cols:
            print(f"\n0Ô∏è‚É£ Features cat√©gorielles d√©tect√©es: {self.categorical_cols}")
        
        print("\n1Ô∏è‚É£ Encodage des features cat√©gorielles...")
        if self.categorical_cols:
            X_encoded = self.categorical_encoder.fit_transform(X, y, self.categorical_cols)
            # logging stats to monitor target encoding distribution
            for col in self.categorical_cols:
                stats = self.categorical_encoder.get_category_stats(col)
                print(f"\n   üìä Encodage {col}:")
                print(stats.to_string(index=False))
        else:
            X_encoded = X.copy()
            print("   Aucune feature cat√©gorielle")
        
        print("\n2Ô∏è‚É£ Feature Engineering...")
        # generating domain-specific interactions and ratios
        X_eng = self.feature_engineer.fit_transform(X_encoded, y)
        print(f"   {len(X_eng.columns)} features g√©n√©r√©es")
        
        print("\n3Ô∏è‚É£ S√©lection de features...")
        # crucial step: reduce dimensionality to prevent the stacking ensemble from overfitting
        self.selected_features = self._select_features(X_eng, y)
        X_selected = X_eng[self.selected_features]
        print(f"   {len(self.selected_features)} features s√©lectionn√©es")
        
        print("\n4Ô∏è‚É£ Entra√Ænement du d√©tecteur hors-domaine (STRICT)...")
        # fitting the safety net on the final feature space
        self.ood_detector.fit(X_selected)
        
        print("\n5Ô∏è‚É£ Entra√Ænement de l'ensemble stacking...")
        self.stacking_model.fit(X_selected, y)
        
        print("\n6Ô∏è‚É£ Entra√Ænement du mod√®le quantile...")
        self.quantile_model.fit(X_selected, y)
        
        # persist training metadata for reproducibility and audit trails
        self.training_stats = {
            'y_mean': float(y.mean()),
            'y_std': float(y.std()),
            'y_min': float(y.min()),
            'y_max': float(y.max()),
            'n_samples': len(y),
            'categorical_cols': self.categorical_cols,
            'n_features_total': len(X_eng.columns),
            'n_features_selected': len(self.selected_features),
            'strict_mode': self.strict_mode,
            'quantile_low': self.quantile_low,
            'quantile_high': self.quantile_high,
            'n_features_tolerance': self.n_features_tolerance,
            'feature_bounds': self.ood_detector.feature_bounds
        }
        
        return self
    
    def _select_features(self, X: pd.DataFrame, y: pd.Series, max_features: int = 15) -> List[str]:
        # two-stage selection process: fast filter (mutual info) -> wrapper (forward selection)
        
        # 1. Mutual Information (fast) to filter out noise
        mi_scores = mutual_info_regression(X, y, random_state=42)
        mi_df = pd.DataFrame({'feature': X.columns, 'mi': mi_scores})
        mi_df = mi_df.sort_values('mi', ascending=False)
        
        # keep top 70% features to reduce search space for the expensive wrapper method
        threshold = mi_df['mi'].quantile(0.3)
        preselected = mi_df[mi_df['mi'] > threshold]['feature'].tolist()
        
        # 2. Forward Selection (slow) optimizing for RMSE
        selected = []
        best_rmse = np.inf
        kf = KFold(n_splits=5, shuffle=True, random_state=42)
        
        available = preselected.copy()
        
        while available and len(selected) < max_features:
            scores = {}
            for feat in available:
                candidate = selected + [feat]
                
                # using a lightweight model for speed during selection loop
                model = HistGradientBoostingRegressor(
                    max_iter=100, max_depth=5, min_samples_leaf=15,
                    learning_rate=0.1, l2_regularization=1.0,
                    random_state=42
                )
                
                cv_rmse = -cross_val_score(
                    model, X[candidate], y, cv=kf,
                    scoring='neg_root_mean_squared_error'
                ).mean()
                scores[feat] = cv_rmse
            
            # greedy selection of the best feature
            best_feat = min(scores, key=scores.get)
            
            # early stopping if performance improvement is negligible (< 0.5%)
            if scores[best_feat] < best_rmse * 0.995:
                selected.append(best_feat)
                available.remove(best_feat)
                best_rmse = scores[best_feat]
                print(f"      + {best_feat:<30} RMSE={best_rmse:.4f}")
            else:
                break
        
        return selected
    
    def _prepare_features(self, X: pd.DataFrame) -> pd.DataFrame:
        # ensures inference pipeline matches training transformations exactly
        if self.categorical_cols:
            X_encoded = self.categorical_encoder.transform(X)
        else:
            X_encoded = X.copy()
        
        X_eng = self.feature_engineer.transform(X_encoded)
        X_selected = X_eng[self.selected_features]
        
        return X_selected
    
    def predict(self, X: pd.DataFrame, return_details: bool = False):
        """
        Orchestrates prediction, uncertainty estimation, and domain rejection.
        """
        X_selected = self._prepare_features(X)
        
        pred_stacking = self.stacking_model.predict(X_selected)
        pred_median, pred_lower, pred_upper = self.quantile_model.predict_with_uncertainty(X_selected)
        
        # weighted ensemble: giving slightly more weight to the stacker (60%) for accuracy,
        # but keeping median (40%) to stabilize predictions against outliers
        predictions = 0.6 * pred_stacking + 0.4 * pred_median
        uncertainty = pred_upper - pred_lower
        
        # check if data is out-of-domain (returns flags and strict bounds violations)
        ood_confidence, should_reject_strict, features_out_of_bounds = self.ood_detector.predict(X_selected)
        
        # normalize uncertainty to a confidence score based on training std dev
        uncertainty_confidence = 1 - np.clip(uncertainty / (self.training_stats['y_std'] * 2), 0, 1)
        
        # combined confidence score
        confidence = 0.5 * ood_confidence + 0.5 * uncertainty_confidence
        
        # strict mode logic: if ANY feature is out of bounds, reject immediately
        if self.strict_mode:
            should_reject = should_reject_strict | (confidence < self.rejection_threshold)
        else:
            should_reject = confidence < self.rejection_threshold
        
        if return_details:
            return {
                'predictions': predictions,
                'lower_bound': pred_lower,
                'upper_bound': pred_upper,
                'uncertainty': uncertainty,
                'confidence': confidence,
                'should_reject': should_reject,
                'should_reject_strict': should_reject_strict,
                'features_out_of_bounds': features_out_of_bounds,
                'pred_stacking': pred_stacking,
                'pred_quantile': pred_median
            }
        
        return predictions
    
    def predict_safe(self, X: pd.DataFrame, fallback_value: float = None) -> Tuple[np.ndarray, np.ndarray, List[List[str]]]:
        """
        Production-ready wrapper that handles rejections gracefully by substituting fallback values.
        """
        details = self.predict(X, return_details=True)
        
        predictions = details['predictions'].copy()
        is_reliable = ~details['should_reject']
        features_out = details['features_out_of_bounds']
        
        if fallback_value is not None:
            predictions[~is_reliable] = fallback_value
        
        return predictions, is_reliable, features_out

In [None]:
import os
import joblib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from typing import Dict, List, Tuple, Optional
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from scipy import stats as scipy_stats

def evaluer_modele_production(model, X_test, y_test, qualite: str) -> Dict:
    """
    Computes production metrics, distinguishing between all predictions 
    and 'reliable' predictions (those not rejected by the OOD detector).
    """
    
    # extract full prediction details including uncertainty and rejection flags
    details = model.predict(X_test, return_details=True)
    predictions = details['predictions']
    confidence = details['confidence']
    should_reject = details['should_reject']
    # fallback to general rejection if strict flag is missing
    should_reject_strict = details.get('should_reject_strict', should_reject)
    features_out = details.get('features_out_of_bounds', [])
    
    y_test_arr = y_test.values if hasattr(y_test, 'values') else np.array(y_test)
    residuals = y_test_arr - predictions
    abs_errors = np.abs(residuals)
    
    # standard regression metrics on the full dataset
    metrics = {
        'rmse': np.sqrt(mean_squared_error(y_test_arr, predictions)),
        'mae': mean_absolute_error(y_test_arr, predictions),
        'r2': r2_score(y_test_arr, predictions),
        'median_ae': np.median(abs_errors),
        'p90_ae': np.percentile(abs_errors, 90),
        'p95_ae': np.percentile(abs_errors, 95),
        'p99_ae': np.percentile(abs_errors, 99),
        'max_ae': np.max(abs_errors),
        'biais': np.mean(residuals),
    }
    
    # granular accuracy check: what % of predictions are within X units
    for threshold in [1, 2, 3, 5, 10]:
        metrics[f'pct_within_{threshold}'] = np.mean(abs_errors <= threshold) * 100
    
    n_total = len(y_test_arr)
    n_rejected_strict = np.sum(should_reject_strict)
    n_rejected_total = np.sum(should_reject)
    
    print(f"Rejet√©es (strict bounds): {n_rejected_strict} ({n_rejected_strict/n_total*100:.1f}%)")
    print(f" Rejet√©es (total): {n_rejected_total} ({n_rejected_total/n_total*100:.1f}%)")
    
    # logging specific reasons for rejection (audit trail)
    if n_rejected_strict > 0:
        print(f"\n D√©tail des rejets stricts (features hors bornes):")
        rejected_indices = np.where(should_reject_strict)[0]
        for idx in rejected_indices[:10]:
            print(f"      ‚Ä¢ Obs {idx}: {features_out[idx]}")
        if len(rejected_indices) > 10:
            print(f"      ... et {len(rejected_indices) - 10} autres")
    
    # compute metrics specifically for the accepted (reliable) subset
    # this shows the potential performance of the model in a live environment
    if np.sum(~should_reject) > 0:
        reliable_residuals = y_test_arr[~should_reject] - predictions[~should_reject]
        reliable_abs_errors = np.abs(reliable_residuals)
        
        metrics['reliable_rmse'] = np.sqrt(np.mean(reliable_residuals ** 2))
        metrics['reliable_mae'] = np.mean(reliable_abs_errors)
        metrics['reliable_max_ae'] = np.max(reliable_abs_errors)
        metrics['reliable_pct'] = np.mean(~should_reject) * 100
        metrics['rejected_strict_pct'] = n_rejected_strict / n_total * 100
        metrics['rejected_total_pct'] = n_rejected_total / n_total * 100
    
    print(f"""
    M√âTRIQUES GLOBALES:
    ‚Ä¢ RMSE          : {metrics['rmse']:.3f}
    ‚Ä¢ MAE           : {metrics['mae']:.3f}
    ‚Ä¢ R¬≤            : {metrics['r2']:.4f}
    ‚Ä¢ Biais         : {metrics['biais']:.3f}
    
    DISTRIBUTION DES ERREURS:
    ‚Ä¢ M√©diane       : {metrics['median_ae']:.2f}
    ‚Ä¢ P90           : {metrics['p90_ae']:.2f}
    ‚Ä¢ P95           : {metrics['p95_ae']:.2f}
    ‚Ä¢ Max           : {metrics['max_ae']:.2f}
    
    POURCENTAGES:
    ‚Ä¢ Erreur ‚â§ 1    : {metrics['pct_within_1']:.1f}%
    ‚Ä¢ Erreur ‚â§ 2    : {metrics['pct_within_2']:.1f}%
    ‚Ä¢ Erreur ‚â§ 3    : {metrics['pct_within_3']:.1f}%
    ‚Ä¢ Erreur ‚â§ 5    : {metrics['pct_within_5']:.1f}%
    ‚Ä¢ Erreur ‚â§ 10   : {metrics['pct_within_10']:.1f}%
    """)
    
    if 'reliable_rmse' in metrics:
        print(f"""
    PR√âDICTIONS FIABLES (non rejet√©es):
    ‚Ä¢ Taux fiable   : {metrics['reliable_pct']:.1f}%
    ‚Ä¢ Rejet strict  : {metrics['rejected_strict_pct']:.1f}% (features hors bornes)
    ‚Ä¢ RMSE fiable   : {metrics['reliable_rmse']:.3f}
    ‚Ä¢ MAE fiable    : {metrics['reliable_mae']:.3f}
    ‚Ä¢ Max Err fiable: {metrics['reliable_max_ae']:.2f}
        """)
    
    return metrics


def diagnostic_production(y_true, details, qualite: str, save_dir: str = None, 
                          tolerance_modele: float = 5.0):
    """
    Generates a 5-panel dashboard to visualize model reliability.
    Focuses on classification of 'Flags' (alerts) using TP/FP/FN/TN logic.
    """
    
    predictions = details['predictions']
    confidence = details['confidence']
    should_reject = details['should_reject']
    should_reject_strict = details.get('should_reject_strict', should_reject)
    uncertainty = details['uncertainty']
    
    y_true_arr = y_true.values if hasattr(y_true, 'values') else np.array(y_true)
    y_pred_arr = predictions
    residuals = y_true_arr - predictions
    abs_errors = np.abs(residuals)
    
    # setting up target values and physical tolerances
    cible = CIBLES_D43.get(qualite, 300)
    tolerance_terrain = TOLERANCE_TERRAIN # physical limit on the field
    tol_opt = tolerance_modele # optimized model threshold
    
    # split data into accepted (safe to predict) and rejected (OOD)
    mask_accepted = ~should_reject
    mask_rejected = should_reject
    n_accepted = np.sum(mask_accepted)
    n_rejected = np.sum(mask_rejected)
    
    y_true_accepted = y_true_arr[mask_accepted]
    y_pred_accepted = y_pred_arr[mask_accepted]
    y_true_rejected = y_true_arr[mask_rejected] if n_rejected > 0 else np.array([])
    y_pred_rejected = y_pred_arr[mask_rejected] if n_rejected > 0 else np.array([])
    
    
    
    # defining ground truth flags (what actually happened)
    flags_reels_accepted = (y_true_accepted < cible - tolerance_terrain) | (y_true_accepted > cible + tolerance_terrain)
    # defining predicted flags (what the model warned about)
    flags_pred_accepted = (y_pred_accepted < cible - tol_opt) | (y_pred_accepted > cible + tol_opt)
    
    # logic masks for confusion matrix visualization
    mask_TP = flags_reels_accepted & flags_pred_accepted
    mask_TN = ~flags_reels_accepted & ~flags_pred_accepted
    mask_FP = ~flags_reels_accepted & flags_pred_accepted
    mask_FN = flags_reels_accepted & ~flags_pred_accepted
    
    fig = plt.figure(figsize=(20, 14))
    gs = fig.add_gridspec(2, 3, hspace=0.3, wspace=0.3)
    fig.suptitle(f'Diagnostic Production (STRICT) - {qualite}\n'
                 f'Cible={cible}, Tol. terrain=¬±{tolerance_terrain}, Tol. mod√®le=¬±{tol_opt}', 
                 fontsize=16, fontweight='bold')
    
    # Panel 1: Main Scatter Plot
    ax1 = fig.add_subplot(gs[0, :2])
    
    # rejected points in gray background
    if n_rejected > 0:
        ax1.scatter(y_true_rejected, y_pred_rejected, 
                   c='gray', marker='o', s=40, alpha=0.3, 
                   label=f'REJET√âES (n={n_rejected})', zorder=1)
    
    # plotting classification results (FN are large red X's as they are critical errors)
    for mask, label, color, marker, size, zorder in [
        (mask_TN, f'TN (n={np.sum(mask_TN)})', 'lightgreen', 'o', 40, 2),
        (mask_TP, f'TP (n={np.sum(mask_TP)})', 'darkgreen', 'o', 60, 3),
        (mask_FP, f'FP (n={np.sum(mask_FP)})', 'orange', 's', 80, 4),
        (mask_FN, f'FN (n={np.sum(mask_FN)})', 'red', 'X', 150, 5),
    ]:
        if np.sum(mask) > 0:
            ax1.scatter(y_true_accepted[mask], y_pred_accepted[mask], 
                       c=color, marker=marker, s=size, 
                       label=label, alpha=0.7, edgecolors='black', linewidths=0.5,
                       zorder=zorder)
    
    # identity line and tolerance bands
    all_y = np.concatenate([y_true_arr, y_pred_arr])
    min_val, max_val = all_y.min() - 5, all_y.max() + 5
    ax1.plot([min_val, max_val], [min_val, max_val], 'k--', linewidth=1, alpha=0.5)
    
    ax1.axvline(cible - tolerance_terrain, color='red', linestyle='--', linewidth=2, alpha=0.5, label=f'Bornes terrain ¬±{tolerance_terrain}')
    ax1.axvline(cible + tolerance_terrain, color='red', linestyle='--', linewidth=2, alpha=0.5)
    ax1.axhline(cible - tol_opt, color='blue', linestyle=':', linewidth=2, alpha=0.5, label=f'Bornes mod√®le ¬±{tol_opt}')
    ax1.axhline(cible + tol_opt, color='blue', linestyle=':', linewidth=2, alpha=0.5)
    
    # visualising the "safe zone"
    ax1.axhspan(cible - tol_opt, cible + tol_opt, alpha=0.1, color='blue')
    ax1.axvspan(cible - tolerance_terrain, cible + tolerance_terrain, alpha=0.1, color='green')
    
    ax1.set_xlabel('Y R√©el (D43)', fontsize=12)
    ax1.set_ylabel('Y Pr√©dit', fontsize=12)
    ax1.set_title(f'Pr√©dictions avec Classification Flags\n'
                  f'(Gris=Rejet√©es, Vert=OK, Rouge=FN manqu√©s, Orange=FP fausses alertes)', 
                  fontsize=12, fontweight='bold')
    ax1.legend(loc='upper left', fontsize=9)
    ax1.grid(True, alpha=0.3)
    
    #Panel 2: Confusion Matrix
    ax2 = fig.add_subplot(gs[0, 2])
    
    cm = np.array([[np.sum(mask_TN), np.sum(mask_FP)],
                   [np.sum(mask_FN), np.sum(mask_TP)]])
    
    # hardcoded colors: Green for TP/TN, Orange/Red for FP/FN
    cm_colors = np.array([['#90EE90', '#FFA500'],
                          ['#FF4444', '#228B22']])
    
    for i in range(2):
        for j in range(2):
            ax2.add_patch(plt.Rectangle((j, 1-i), 1, 1, fill=True, 
                                        color=cm_colors[i, j], alpha=0.7))
            ax2.text(j + 0.5, 1.5 - i, f'{cm[i, j]}', 
                    ha='center', va='center', fontsize=24, fontweight='bold')
    
    ax2.set_xlim(0, 2)
    ax2.set_ylim(0, 2)
    ax2.set_xticks([0.5, 1.5])
    ax2.set_yticks([0.5, 1.5])
    ax2.set_xticklabels(['Pr√©dit OK', 'Pr√©dit FLAG'], fontsize=11)
    ax2.set_yticklabels(['R√©el FLAG', 'R√©el OK'], fontsize=11)
    ax2.set_title(f'Matrice de Confusion (Flags)\n'
                  f'Sur {n_accepted} pr√©dictions accept√©es', fontsize=12, fontweight='bold')
    
    # manual calculation of precision/recall for annotation
    precision = np.sum(mask_TP) / (np.sum(mask_TP) + np.sum(mask_FP)) if (np.sum(mask_TP) + np.sum(mask_FP)) > 0 else 0
    recall = np.sum(mask_TP) / (np.sum(mask_TP) + np.sum(mask_FN)) if (np.sum(mask_TP) + np.sum(mask_FN)) > 0 else 0
    ax2.text(1, -0.15, f'Precision: {precision:.1%} | Recall: {recall:.1%}', 
             ha='center', va='top', fontsize=10, transform=ax2.transAxes)
    
    #Panel 3: Error Distribution
    ax3 = fig.add_subplot(gs[1, 0])
    
    accepted_mask = ~should_reject
    rejected_bounds = should_reject_strict
    
    if np.sum(accepted_mask) > 0:
        ax3.hist(abs_errors[accepted_mask], bins=30, alpha=0.7, 
                label=f'Accept√©es (n={np.sum(accepted_mask)})', color='green', density=True)
    if np.sum(rejected_bounds) > 0:
        # showing that rejected items usually have higher errors
        ax3.hist(abs_errors[rejected_bounds], bins=20, alpha=0.7, 
                label=f'Rejet bounds (n={np.sum(rejected_bounds)})', color='red', density=True)
    
    ax3.axvline(x=5, color='black', linestyle='--', linewidth=2, label='Seuil 5')
    ax3.set_xlabel('Erreur Absolue', fontsize=11)
    ax3.set_ylabel('Densit√©', fontsize=11)
    ax3.set_title('Distribution des Erreurs\npar Type de Rejection', fontsize=12, fontweight='bold')
    ax3.legend(fontsize=9)
    ax3.grid(True, alpha=0.3)
    
    if np.sum(accepted_mask) > 0:
        mae_acc = abs_errors[accepted_mask].mean()
        mae_rej = abs_errors[rejected_bounds].mean() if np.sum(rejected_bounds) > 0 else 0
        ax3.text(0.95, 0.95, f'MAE accept√©es: {mae_acc:.2f}\nMAE rejet√©es: {mae_rej:.2f}',
                transform=ax3.transAxes, ha='right', va='top', fontsize=10,
                bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
    
    #Panel 4: Error vs Uncertainty (Calibration Check)
    ax4 = fig.add_subplot(gs[1, 1])
    
    colors_scatter = np.where(should_reject_strict, 'red', 
                              np.where(should_reject, 'orange', 'green'))
    
    # validates if higher uncertainty actually correlates with higher error
    scatter = ax4.scatter(uncertainty, abs_errors, c=colors_scatter, alpha=0.6, 
                          edgecolors='black', linewidths=0.3, s=50)
    ax4.axhline(y=5, color='red', linestyle='--', linewidth=2, label='Erreur = 5')
    
    # trend line to visualize the correlation
    z = np.polyfit(uncertainty[~should_reject], abs_errors[~should_reject], 1)
    p = np.poly1d(z)
    x_line = np.linspace(uncertainty.min(), uncertainty.max(), 100)
    ax4.plot(x_line, p(x_line), 'b--', alpha=0.5, label=f'Tendance (pente={z[0]:.2f})')
    
    ax4.set_xlabel('Incertitude (largeur intervalle)', fontsize=11)
    ax4.set_ylabel('Erreur Absolue', fontsize=11)
    ax4.set_title('Erreur vs Incertitude\n(Vert=OK, Rouge=Hors bornes)', fontsize=12, fontweight='bold')
    ax4.legend(fontsize=9)
    ax4.grid(True, alpha=0.3)
    
    # Panel 5: Error vs Confidence Score
    ax5 = fig.add_subplot(gs[1, 2])
    
    ax5.scatter(confidence[accepted_mask], abs_errors[accepted_mask], 
               c='green', alpha=0.5, s=50, label=f'Accept√©es (n={np.sum(accepted_mask)})')
    if np.sum(rejected_bounds) > 0:
        ax5.scatter(confidence[rejected_bounds], abs_errors[rejected_bounds], 
                   c='red', alpha=0.7, marker='X', s=100, label=f'Hors bornes (n={np.sum(rejected_bounds)})')
    
    ax5.axhline(y=5, color='black', linestyle='--', alpha=0.5, label='Erreur = 5')
    
    # shading the low confidence zone
    ax5.axvspan(0, 0.25, alpha=0.1, color='red', label='Zone low conf')
    
    ax5.set_xlabel('Confidence Score', fontsize=11)
    ax5.set_ylabel('Erreur Absolue', fontsize=11)
    ax5.set_title('Erreur vs Confidence\n(Zone rouge = low confidence)', fontsize=12, fontweight='bold')
    ax5.legend(fontsize=9, loc='upper right')
    ax5.grid(True, alpha=0.3)
    
    corr = np.corrcoef(confidence[accepted_mask], abs_errors[accepted_mask])[0, 1]
    ax5.text(0.05, 0.95, f'Corr: {corr:.3f}', transform=ax5.transAxes, 
             ha='left', va='top', fontsize=10,
             bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
    
    plt.tight_layout()
    
    if save_dir:
        filepath = os.path.join(save_dir, f'diagnostic_production_strict_{qualite}.png')
        plt.savefig(filepath, dpi=150, bbox_inches='tight')
    
    plt.show()
    
    return fig


def analyser_rejets_stricts(X, y_true, details, qualite, save_dir=None):
    """
    Analyzes observations rejected due to out-of-bounds features.
    Helps understand which features are driving the strict rejection.
    """
    
    y_true_arr = y_true.values if hasattr(y_true, 'values') else np.array(y_true)
    predictions = details['predictions']
    should_reject_strict = details.get('should_reject_strict', details['should_reject'])
    features_out = details.get('features_out_of_bounds', [])
    
    n_total = len(y_true_arr)
    n_rejected = np.sum(should_reject_strict)
    
    print(f"Total: {n_total}")
    print(f"Rejet√©es (strict): {n_rejected} ({n_rejected/n_total*100:.1f}%)")
    
    if n_rejected == 0:
        print("\n Aucune observation rejet√©e")
        return {'n_rejected': 0}
    
    # aggregating violation counts per feature
    feature_violation_count = {}
    for feats in features_out:
        for feat_info in feats:
            feat_name = feat_info.split('=')[0]
            feature_violation_count[feat_name] = feature_violation_count.get(feat_name, 0) + 1
    
    print(f"\nüìã Features hors bornes (top 10):")
    sorted_features = sorted(feature_violation_count.items(), key=lambda x: x[1], reverse=True)
    for feat, count in sorted_features[:10]:
        pct = count / n_rejected * 100
        print(f"   ‚Ä¢ {feat}: {count} fois ({pct:.1f}%)")
    
    # comparing error rates to justify the rejection
    # usually rejected items have much higher RMSE, validating the strategy
    abs_errors_rejected = np.abs(y_true_arr[should_reject_strict] - predictions[should_reject_strict])
    abs_errors_accepted = np.abs(y_true_arr[~should_reject_strict] - predictions[~should_reject_strict])
    
    print(f"\nüìà Comparaison erreurs:")
    print(f"   ‚Ä¢ Erreur moyenne (rejet√©es): {abs_errors_rejected.mean():.2f}")
    print(f"   ‚Ä¢ Erreur moyenne (accept√©es): {abs_errors_accepted.mean():.2f}")
    print(f"   ‚Ä¢ % erreur >= 5 (rejet√©es): {np.mean(abs_errors_rejected >= 5)*100:.1f}%")
    print(f"   ‚Ä¢ % erreur >= 5 (accept√©es): {np.mean(abs_errors_accepted >= 5)*100:.1f}%")
    
    return {
        'n_rejected': n_rejected,
        'pct_rejected': n_rejected / n_total * 100,
        'feature_violations': feature_violation_count,
    }


def analyser_faux_negatifs(
    X: pd.DataFrame,
    y_true: np.ndarray,
    y_pred: np.ndarray,
    mask_accepted: np.ndarray,
    qualite: str,
    tolerance_modele: float,
    X_engineered: pd.DataFrame = None,
    selected_features: List[str] = None,
    save_dir: str = None,
) -> Dict:
    """
    Deep dive into False Negatives (Missed Flags).
    Uses statistical tests to see if FN samples have distinct feature distributions.
    """
    
    cible = CIBLES_D43.get(qualite, 300)
    tolerance_terrain = TOLERANCE_TERRAIN
    
    # analyze only within accepted data (since rejected data is already handled)
    y_true_acc = y_true[mask_accepted]
    y_pred_acc = y_pred[mask_accepted]
    X_acc = X[mask_accepted].copy()
    
    if X_engineered is not None:
        X_eng_acc = X_engineered[mask_accepted].copy()
    else:
        X_eng_acc = None
    
    # logic definitions for FN/TP/TN
    flags_reels = (y_true_acc < cible - tolerance_terrain) | (y_true_acc > cible + tolerance_terrain)
    flags_pred = (y_pred_acc < cible - tolerance_modele) | (y_pred_acc > cible + tolerance_modele)
    
    mask_FN = flags_reels & ~flags_pred  # Real flag missed by model
    mask_TP = flags_reels & flags_pred   # Real flag caught
    mask_TN = ~flags_reels & ~flags_pred # Correctly identified as normal
    mask_autres = ~mask_FN  # Everything else
    
    n_FN = np.sum(mask_FN)
    n_TP = np.sum(mask_TP)
    n_total_flags = np.sum(flags_reels)
    
    
    print(f"Total flags r√©els: {n_total_flags}")
    print(f"TP (d√©tect√©s): {n_TP}")
    print(f"FN (manqu√©s): {n_FN}")
    
    if n_FN == 0:
        print("\n‚úÖ Aucun Faux N√©gatif ! Tous les flags ont √©t√© d√©tect√©s.")
        return {'n_FN': 0, 'message': 'Aucun FN'}
    
    # listing specific cases for manual review
    
    fn_indices = np.where(mask_FN)[0]
    
    print(f"\n{'#':<4} {'Y_r√©el':<10} {'Y_pr√©d':<10} {'Erreur':<10} {'√âcart/cible':<12} {'Direction'}")
    
    fn_details = []
    for i, idx in enumerate(fn_indices):
        y_r = y_true_acc[idx]
        y_p = y_pred_acc[idx]
        err = abs(y_r - y_p)
        ecart_cible = y_r - cible
        direction = "HAUT ‚Üë" if y_r > cible else "BAS ‚Üì"
        
        fn_details.append({
            'idx': idx,
            'y_true': y_r,
            'y_pred': y_p,
            'erreur': err,
            'ecart_cible': ecart_cible,
            'direction': 'haut' if y_r > cible else 'bas'
        })
        
        print(f"{i+1:<4} {y_r:<10.1f} {y_p:<10.1f} {err:<10.1f} {ecart_cible:<+12.1f} {direction}")
    
    # Statistical Comparison: FN vs The Rest
    
    features_originales = [f for f in FEATURES_POOL if f in X_acc.columns]
    
    print(f"{'Feature':<25} {'M√©d. FN':<12} {'M√©d. Autres':<12} {'Diff %':<10} {'p-value':<10} {'Signif.'}")
    
    significant_features = []
    feature_stats = {}
    
    # Mann-Whitney U test to check if FN samples come from a different distribution
    for feature in features_originales:
        if X_acc[feature].dtype in ['object', 'category']:
            continue
        
        fn_values = X_acc.loc[mask_FN, feature].dropna()
        autres_values = X_acc.loc[mask_autres, feature].dropna()
        
        if len(fn_values) >= 2 and len(autres_values) >= 5:
            try:
                stat, p_value = scipy_stats.mannwhitneyu(fn_values, autres_values, alternative='two-sided')
                
                median_fn = fn_values.median()
                median_autres = autres_values.median()
                diff_pct = ((median_fn - median_autres) / (abs(median_autres) + 1e-10)) * 100
                
                feature_stats[feature] = {
                    'median_fn': median_fn,
                    'median_autres': median_autres,
                    'mean_fn': fn_values.mean(),
                    'mean_autres': autres_values.mean(),
                    'std_fn': fn_values.std(),
                    'std_autres': autres_values.std(),
                    'diff_pct': diff_pct,
                    'p_value': p_value,
                }
                
                # permissive p-value threshold (0.1) due to small sample size of FNs
                sig = "‚ö†Ô∏è OUI" if p_value < 0.1 else "Non"
                if p_value < 0.1:
                    significant_features.append((feature, p_value, diff_pct, 'original'))
                
                print(f"{feature:<25} {median_fn:<12.4f} {median_autres:<12.4f} {diff_pct:<+10.1f} {p_value:<10.4f} {sig}")
            except Exception as e:
                pass
    
    # similar analysis for engineered features to see if synthetic features capture the issue
    if X_eng_acc is not None and selected_features is not None:
        print(f"{'Feature':<30} {'M√©d. FN':<12} {'M√©d. Autres':<12} {'Diff %':<10} {'p-value':<10} {'Signif.'}")
        print("-" * 95)
        
        for feature in selected_features:
            if feature not in X_eng_acc.columns:
                continue
            if X_eng_acc[feature].dtype in ['object', 'category']:
                continue
            
            fn_values = X_eng_acc.loc[mask_FN, feature].dropna()
            autres_values = X_eng_acc.loc[mask_autres, feature].dropna()
            
            if len(fn_values) >= 2 and len(autres_values) >= 5:
                try:
                    stat, p_value = scipy_stats.mannwhitneyu(fn_values, autres_values, alternative='two-sided')
                    
                    median_fn = fn_values.median()
                    median_autres = autres_values.median()
                    diff_pct = ((median_fn - median_autres) / (abs(median_autres) + 1e-10)) * 100
                    
                    feature_stats[f"eng_{feature}"] = {
                        'median_fn': median_fn,
                        'median_autres': median_autres,
                        'diff_pct': diff_pct,
                        'p_value': p_value,
                    }
                    
                    sig = "‚ö†Ô∏è OUI" if p_value < 0.1 else "Non"
                    if p_value < 0.1:
                        significant_features.append((feature, p_value, diff_pct, 'engineered'))
                    
                    print(f"{feature:<30} {median_fn:<12.4f} {median_autres:<12.4f} {diff_pct:<+10.1f} {p_value:<10.4f} {sig}")
                except:
                    pass
    
    # comparing TP vs FN to understand why some flags are caught and others missed
    if n_TP > 0:
        print("   ‚Üí Pourquoi certains flags sont d√©tect√©s et d'autres non?")
        print(f"\n{'Feature':<25} {'M√©d. FN':<12} {'M√©d. TP':<12} {'Diff %':<10}")
        print("-" * 65)
        
        for feature in features_originales[:10]:
            if X_acc[feature].dtype in ['object', 'category']:
                continue
            
            fn_values = X_acc.loc[mask_FN, feature].dropna()
            tp_values = X_acc.loc[mask_TP, feature].dropna()
            
            if len(fn_values) >= 1 and len(tp_values) >= 1:
                median_fn = fn_values.median()
                median_tp = tp_values.median()
                diff_pct = ((median_fn - median_tp) / (abs(median_tp) + 1e-10)) * 100
                
                print(f"{feature:<25} {median_fn:<12.4f} {median_tp:<12.4f} {diff_pct:<+10.1f}")

    #Plotting distributions for significant features
    features_to_plot = [f for f, p, d, t in significant_features if t == 'original'][:4]
    if len(features_to_plot) < 4:
        remaining = [f for f in features_originales if f not in features_to_plot][:4-len(features_to_plot)]
        features_to_plot.extend(remaining)
    
    if len(features_to_plot) > 0:
        n_features = min(len(features_to_plot), 4)
        
        fig, axes = plt.subplots(2, n_features, figsize=(5*n_features, 10))
        fig.suptitle(f'Analyse des Faux N√©gatifs - {qualite}\n'
                     f'(FN={n_FN} flags manqu√©s sur {n_total_flags} flags r√©els)', 
                     fontsize=14, fontweight='bold')
        
        if n_features == 1:
            axes = axes.reshape(2, 1)
        
        for i, feature in enumerate(features_to_plot[:n_features]):
            # Row 1: Histograms
            ax1 = axes[0, i]
            
            fn_vals = X_acc.loc[mask_FN, feature].dropna()
            autres_vals = X_acc.loc[mask_autres, feature].dropna()
            tp_vals = X_acc.loc[mask_TP, feature].dropna() if n_TP > 0 else np.array([])
            
            bins = 20
            ax1.hist(autres_vals, bins=bins, alpha=0.5, color='gray', 
                     label=f'Autres (n={len(autres_vals)})', density=True)
            if len(tp_vals) > 0:
                ax1.hist(tp_vals, bins=bins, alpha=0.7, color='green', 
                         label=f'TP d√©tect√©s (n={len(tp_vals)})', density=True)
            ax1.hist(fn_vals, bins=bins, alpha=0.8, color='red', 
                     label=f'FN manqu√©s (n={len(fn_vals)})', density=True)
            
            ax1.axvline(autres_vals.median(), color='gray', linestyle='--', linewidth=2)
            ax1.axvline(fn_vals.median(), color='red', linestyle='--', linewidth=2)
            if len(tp_vals) > 0:
                ax1.axvline(tp_vals.median(), color='green', linestyle='--', linewidth=2)
            
            p_val = feature_stats.get(feature, {}).get('p_value', 1.0)
            title = f'{feature}'
            if p_val < 0.1:
                title += f'\n‚ö†Ô∏è p={p_val:.3f}'
            ax1.set_title(title, fontsize=10, fontweight='bold' if p_val < 0.1 else 'normal')
            ax1.legend(fontsize=7)
            ax1.set_xlabel(feature)
            ax1.set_ylabel('Densit√©')
            
            # Row 2: Boxplots
            ax2 = axes[1, i]
            
            data_boxplot = []
            labels_boxplot = []
            colors_boxplot = []
            
            if len(autres_vals) > 0:
                data_boxplot.append(autres_vals)
                labels_boxplot.append(f'Autres\n(n={len(autres_vals)})')
                colors_boxplot.append('lightgray')
            if len(tp_vals) > 0:
                data_boxplot.append(tp_vals)
                labels_boxplot.append(f'TP\n(n={len(tp_vals)})')
                colors_boxplot.append('lightgreen')
            if len(fn_vals) > 0:
                data_boxplot.append(fn_vals)
                labels_boxplot.append(f'FN\n(n={len(fn_vals)})')
                colors_boxplot.append('lightcoral')
            
            bp = ax2.boxplot(data_boxplot, labels=labels_boxplot, patch_artist=True)
            for patch, color in zip(bp['boxes'], colors_boxplot):
                patch.set_facecolor(color)
            
            # highlighting specific FN points on the boxplot
            if len(fn_vals) > 0:
                x_fn = [len(data_boxplot)] * len(fn_vals)
                ax2.scatter(x_fn, fn_vals, c='red', s=100, marker='X', zorder=5, 
                           edgecolors='black', linewidths=1)
            
            ax2.set_ylabel(feature)
            ax2.set_title(f'Distribution {feature}', fontsize=10)
            ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        
        if save_dir:
            filepath = os.path.join(save_dir, f'analyse_FN_{qualite}.png')
            plt.savefig(filepath, dpi=150, bbox_inches='tight')
            print(f"\nüíæ Analyse FN sauvegard√©e: {filepath}")
        
        plt.show()
    
    # Zoomed plots for FN analysis
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    fig.suptitle(f'Zoom sur les Faux N√©gatifs - {qualite}', fontsize=14, fontweight='bold')
    
    ax1 = axes[0]
    ax1.scatter(y_true_acc[mask_TN], y_pred_acc[mask_TN], c='lightgray', alpha=0.5, 
                label=f'TN (n={np.sum(mask_TN)})', s=30)
    if n_TP > 0:
        ax1.scatter(y_true_acc[mask_TP], y_pred_acc[mask_TP], c='green', alpha=0.7, 
                    label=f'TP (n={n_TP})', s=60)
    ax1.scatter(y_true_acc[mask_FN], y_pred_acc[mask_FN], c='red', marker='X', s=200, 
                label=f'FN (n={n_FN})', edgecolors='black', linewidths=1.5, zorder=10)
    
    min_val, max_val = y_true_acc.min() - 5, y_true_acc.max() + 5
    ax1.plot([min_val, max_val], [min_val, max_val], 'k--', alpha=0.5)
    ax1.axvline(cible - tolerance_terrain, color='red', linestyle='--', alpha=0.5, label='Bornes terrain')
    ax1.axvline(cible + tolerance_terrain, color='red', linestyle='--', alpha=0.5)
    ax1.axhline(cible - tolerance_modele, color='blue', linestyle=':', alpha=0.5, label='Bornes mod√®le')
    ax1.axhline(cible + tolerance_modele, color='blue', linestyle=':', alpha=0.5)
    
    ax1.set_xlabel('Y R√©el (D43)', fontsize=11)
    ax1.set_ylabel('Y Pr√©dit', fontsize=11)
    ax1.set_title('Vue globale avec FN en rouge', fontsize=12)
    ax1.legend(loc='upper left', fontsize=9)
    ax1.grid(True, alpha=0.3)
    
    # comparing absolute errors by category
    ax2 = axes[1]
    
    erreurs = np.abs(y_true_acc - y_pred_acc)
    
    categories = ['TN', 'TP', 'FN']
    masks = [mask_TN, mask_TP, mask_FN]
    colors = ['lightgray', 'lightgreen', 'lightcoral']
    
    data_err = [erreurs[m] for m in masks if np.sum(m) > 0]
    labels_err = [f'{cat}\n(n={np.sum(m)})' for cat, m in zip(categories, masks) if np.sum(m) > 0]
    colors_err = [c for c, m in zip(colors, masks) if np.sum(m) > 0]
    
    bp = ax2.boxplot(data_err, labels=labels_err, patch_artist=True)
    for patch, color in zip(bp['boxes'], colors_err):
        patch.set_facecolor(color)
    
    ax2.axhline(y=5, color='orange', linestyle='--', linewidth=2, label='Seuil erreur = 5')
    ax2.set_ylabel('Erreur Absolue', fontsize=11)
    ax2.set_title('Distribution des erreurs par cat√©gorie', fontsize=12)
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    
    if save_dir:
        filepath = os.path.join(save_dir, f'zoom_FN_{qualite}.png')
        plt.savefig(filepath, dpi=150, bbox_inches='tight')
        print(f"üíæ Zoom FN sauvegard√©: {filepath}")
    
    plt.show()
    
    # Hypothesis Generation   
    fn_haut = sum(1 for d in fn_details if d['direction'] == 'haut')
    fn_bas = sum(1 for d in fn_details if d['direction'] == 'bas')
    
    print(f"\nüìç Direction des flags manqu√©s:")
    print(f"FN au-dessus de la cible (D43 > {cible + tolerance_terrain}): {fn_haut}")
    print(f"FN en-dessous de la cible (D43 < {cible - tolerance_terrain}): {fn_bas}")
    
    erreur_moy_fn = np.mean([d['erreur'] for d in fn_details])
    print(f"\nüìè Erreur moyenne des FN: {erreur_moy_fn:.2f}")
    
    if significant_features:
        print(f"\nüéØ Features significativement diff√©rentes pour les FN:")
        for feat, p_val, diff, feat_type in sorted(significant_features, key=lambda x: x[1])[:5]:
            direction = "PLUS √âLEV√âE" if diff > 0 else "PLUS BASSE"
            print(f"   ‚Ä¢ {feat} ({feat_type}): {direction} de {abs(diff):.1f}% (p={p_val:.3f})")
    
    return {
        'n_FN': n_FN,
        'n_TP': n_TP,
        'fn_details': fn_details,
        'feature_stats': feature_stats,
        'significant_features': significant_features,
        'erreur_moyenne_fn': erreur_moy_fn,
        'fn_direction': {'haut': fn_haut, 'bas': fn_bas},
    }


def optimiser_tolerance_modele(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    mask_accepted: np.ndarray,
    qualite: str,
    tolerances_test: List[float] = None,
) -> Tuple[float, Dict]:
    """
    Finds the optimal model tolerance using Lexicographical Optimization.
    Strategy: 
    1. Minimize FN (Safety first - don't miss alerts)
    2. Then Minimize FP (Cost efficiency - don't raise false alarms)
    """
    
    if tolerances_test is None:
        tolerances_test = list(range(3, 21))
    
    cible = CIBLES_D43.get(qualite, 300)
    tolerance_terrain = TOLERANCE_TERRAIN
    
    y_true_acc = y_true[mask_accepted]
    y_pred_acc = y_pred[mask_accepted]
    
    # Ground truth flags
    flags_reels = (y_true_acc < cible - tolerance_terrain) | (y_true_acc > cible + tolerance_terrain)
    n_flags_reels = np.sum(flags_reels)
    
    print(f"\nüéØ OPTIMISATION DE LA TOL√âRANCE MOD√àLE (Lexicographique)")
    print(f"   Strat√©gie: 1) Minimiser FN, 2) Puis minimiser FP")
    print(f"   Cible: {cible}, Tol√©rance terrain: ¬±{tolerance_terrain}")
    print(f"   Flags r√©els: {n_flags_reels} sur {len(y_true_acc)} accept√©es")
    print(f"\n{'Tol.':<6} {'FN':<6} {'FP':<6} {'TP':<6} {'Recall':<10} {'Precision':<10}")
    print("-" * 60)
    
    resultats = []
    
    
    
    for tol in tolerances_test:
        flags_pred = (y_pred_acc < cible - tol) | (y_pred_acc > cible + tol)
        
        TP = np.sum(flags_reels & flags_pred)
        TN = np.sum(~flags_reels & ~flags_pred)
        FP = np.sum(~flags_reels & flags_pred)
        FN = np.sum(flags_reels & ~flags_pred)
        
        precision = TP / (TP + FP) if (TP + FP) > 0 else 0
        recall = TP / (TP + FN) if (TP + FN) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        resultats.append({
            'tolerance': tol,
            'TP': TP, 'TN': TN, 'FP': FP, 'FN': FN,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'borne_basse': cible - tol,
            'borne_haute': cible + tol,
        })
        
        print(f"¬± {tol:<4} {FN:<6} {FP:<6} {TP:<6} {recall:<10.1%} {precision:<10.1%}")
    
    df_results = pd.DataFrame(resultats)
    
    # Step 1: Find minimum FN achievable
    fn_min = df_results['FN'].min()

    # Step 2: Filter results that achieve this minimum FN
    df_fn_min = df_results[df_results['FN'] == fn_min]
    
    # Step 3: Among those, find the one with minimum FP
    fp_min_dans_fn_min = df_fn_min['FP'].min()
    df_optimal = df_fn_min[df_fn_min['FP'] == fp_min_dans_fn_min]

    # Step 4: Tie-breaker, cap tolerance at 15 and pick largest valid tolerance
    df_optimal = df_optimal[df_optimal['tolerance'] <= 15]
    best_idx = df_optimal['tolerance'].idxmax()
    best = df_results.loc[best_idx].to_dict()
    
    print(f"‚úÖ OPTIMISATION LEXICOGRAPHIQUE:")
    print(f"   1. FN minimum trouv√©: {fn_min}")
    print(f"   2. Parmi FN={fn_min}, FP minimum: {fp_min_dans_fn_min}")
    print(f"   3. Tol√©rance s√©lectionn√©e: ¬± {best['tolerance']}")
    print(f"\n TOL√âRANCE OPTIMALE: ¬± {best['tolerance']}")
    print(f"   Bornes: [{best['borne_basse']}, {best['borne_haute']}]")
    print(f"   FN: {best['FN']} | FP: {best['FP']}")
    print(f"   Recall: {best['recall']:.1%} | Precision: {best['precision']:.1%}")
    
    return float(best['tolerance']), {
        'all_results': df_results.to_dict('records'),
        'best': best,
        'cible': cible,
        'tolerance_terrain': tolerance_terrain,
        'n_flags_reels': n_flags_reels,
        'fn_min': int(fn_min),
        'fp_min_given_fn_min': int(fp_min_dans_fn_min),
        'optimization_strategy': 'lexicographic: min(FN) then min(FP)',
    }


def analyser_detection_flags_avec_rejection(
    y_true, y_pred, confidence, should_reject, should_reject_strict,
    features_out_of_bounds, qualite, confidence_threshold=0.5,
    tolerances_modele=[5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
    save_dir=None, 
    optimiser_auto: bool = True,
):
    """
    Wrapper that applies rejection logic BEFORE optimizing the detection tolerance.
    """
    
    y_true_arr = y_true.values if hasattr(y_true, 'values') else np.array(y_true)
    y_pred_arr = np.array(y_pred)
    
    cible = CIBLES_D43.get(qualite, 300)
    tolerance_terrain = TOLERANCE_TERRAIN
    
    mask_accepted = ~should_reject
    n_total = len(y_true_arr)
    n_accepted = np.sum(mask_accepted)
    n_rejected = np.sum(should_reject)
    n_rejected_strict = np.sum(should_reject_strict)
    
    print("\n" + "=" * 80)
    print(f"üö® ANALYSE D√âTECTION FLAGS - REJECTION STRICTE - {qualite}")
    print("=" * 80)
    
    print(f"\nüõ°Ô∏è SYST√àME DE REJECTION (STRICT):")
    print(f"   ‚Ä¢ Total: {n_total}")
    print(f"   ‚Ä¢ Accept√©es: {n_accepted} ({n_accepted/n_total*100:.1f}%)")
    print(f"   ‚Ä¢ Rejet√©es: {n_rejected} ({n_rejected/n_total*100:.1f}%)")
    
    if n_accepted == 0:
        print("\n‚ùå Toutes rejet√©es!")
        return {'error': 'Toutes rejet√©es', 'tolerance_optimale': 10}
    
    y_true_accepted = y_true_arr[mask_accepted]
    y_pred_accepted = y_pred_arr[mask_accepted]
    
    abs_errors_accepted = np.abs(y_true_accepted - y_pred_accepted)
    rmse_accepted = np.sqrt(np.mean((y_true_accepted - y_pred_accepted) ** 2))
    
    print(f"\nüìä M√âTRIQUES ACCEPT√âES ({n_accepted}):")
    print(f"   ‚Ä¢ RMSE: {rmse_accepted:.3f}")
    
    flags_reels_accepted = (y_true_accepted < cible - tolerance_terrain) | (y_true_accepted > cible + tolerance_terrain)
    n_flags_reels = np.sum(flags_reels_accepted)
    
    print(f"\nüìã Flags terrain: {n_flags_reels} ({n_flags_reels/n_accepted*100:.1f}%)")
    
    if optimiser_auto:
        # runs the lexicographical optimization
        tolerance_optimale, optim_results = optimiser_tolerance_modele(
            y_true=y_true_arr,
            y_pred=y_pred_arr,
            mask_accepted=mask_accepted,
            qualite=qualite,
            tolerances_test=list(range(3, 21)),
        )
        best = optim_results['best']
    else:
        # fallback manual testing (legacy mode)
        print(f"\n{'Tol.':<6} {'FN':<6} {'FP':<6} {'Recall':<10} {'Precision':<10} {'F1':<10}")
        print("-" * 65)
        
        results_par_tolerance = []
        for tol in tolerances_modele:
            flags_pred = (y_pred_accepted < cible - tol) | (y_pred_accepted > cible + tol)
            
            TP = np.sum(flags_reels_accepted & flags_pred)
            FP = np.sum(~flags_reels_accepted & flags_pred)
            FN = np.sum(flags_reels_accepted & ~flags_pred)
            
            precision = TP / (TP + FP) if (TP + FP) > 0 else 0
            recall = TP / (TP + FN) if (TP + FN) > 0 else 0
            f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
            
            results_par_tolerance.append({
                'tolerance': tol, 'precision': precision, 'recall': recall, 
                'f1': f1, 'FN': FN, 'FP': FP
            })
            
            print(f"¬± {tol:<4} {FN:<6} {FP:<6} {recall:<10.3f} {precision:<10.3f} {f1:<10.3f}")
        
        df_results = pd.DataFrame(results_par_tolerance)
        best = (
            df_results
            .sort_values(by=["FN", "FP", "f1"], ascending=[True, True, False])
            .iloc[0]
            .to_dict()
        )
        
        tolerance_optimale = best['tolerance']
        optim_results = {'all_results': results_par_tolerance, 'best': best}

    print(f"\n" + "=" * 60)
    print(f"üìå TOL√âRANCE OPTIMALE S√âLECTIONN√âE: ¬± {tolerance_optimale}")
    print(f"   ‚Ä¢ Bornes d'alerte: [{cible - tolerance_optimale}, {cible + tolerance_optimale}]")
    print(f"   ‚Ä¢ FN (flags manqu√©s): {best['FN']}")
    print(f"   ‚Ä¢ FP (fausses alertes): {best['FP']}")
    print("=" * 60)
    
    return {
        'qualite': qualite,
        'n_accepted': n_accepted,
        'tolerance_optimale': int(tolerance_optimale),
        'best': best,
        'optimization': optim_results,
    }


def entrainer_modele_production(
    qual: str, 
    combined_data: pd.DataFrame, 
    confidence_threshold: float = 0.25,
    strict_mode: bool = True,
    quantile_low: float = 0.05,
    quantile_high: float = 0.95,
    n_features_tolerance: int = 0
) -> Optional[Dict]:
    """
    Main orchestration function: 
    Prepares data -> Trains Model -> Runs all Diagnostics -> Saves Artifacts.
    """
    
    output_path = os.path.join(OUTPUT_DIR, f"model_production_{qual}.joblib")
    
    print("\nüìä Pr√©paration des donn√©es...")
    
    data = combined_data[combined_data['#HIDDEN'] == qual].copy()
    data['#HIDDEN'] = pd.to_datetime(data['#HIDDEN'], errors='coerce')
    
    # ensure numeric types for pool features
    for col in FEATURES_POOL:
        if col in data.columns:
            data[col] = pd.to_numeric(data[col], errors='coerce')
    data['#HIDDEN'] = pd.to_numeric(data['#HIDDEN'], errors='coerce')
    
    features_num_dispo = [f for f in FEATURES_POOL if f in data.columns]
    features_cat_dispo = [f for f in CATEGORICAL_FEATURES.get(qual, []) if f in data.columns]
    features_dispo = features_num_dispo + features_cat_dispo
    
    print(f"   ‚Ä¢ Features: {len(features_num_dispo)} num + {len(features_cat_dispo)} cat")
    
    cols_needed = features_dispo + ['#HIDDEN', '#HIDDEN', '#HIDDEN']
    cols_needed = [c for c in cols_needed if c in data.columns]

    data_clean = data.dropna(subset=cols_needed)

    # applying date filter for specific quality codes (business requirement)
    data_filtered = data_clean[
        (data_clean['#HIDDEN'] != '#HIDDEN') |
        (data_clean['#HIDDEN'] >= '#HIDDEN')
    ].copy()
    
    print(f"   ‚Ä¢ Donn√©es: {len(data_filtered)} lignes")
    
    if len(data_filtered) < 100:
        print(f"‚ùå Pas assez de donn√©es")
        return None
    
    X = data_filtered[features_dispo].copy()
    y = data_filtered['#HIDDEN'].copy()
    
    # stratified split based on target quartiles to ensure test set representativity
    y_quartiles = pd.qcut(y, q=4, labels=False, duplicates='drop')
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y_quartiles
    )
    
    print(f"   ‚Ä¢ Train: {len(X_train)} | Test: {len(X_test)}")
    
    # initializing and training the strict production model
    model = ProductionD43Model(
        rejection_threshold=confidence_threshold, 
        qualite=qual,
        strict_mode=strict_mode,
        quantile_low=quantile_low,
        quantile_high=quantile_high,
        n_features_tolerance=n_features_tolerance
    )
    model.fit(X_train, y_train)
    
    # inference on test set
    details = model.predict(X_test, return_details=True)
    y_pred = details['predictions']
    confidence = details['confidence']
    should_reject = details['should_reject']
    should_reject_strict = details['should_reject_strict']
    features_out = details['features_out_of_bounds']
    
    # evaluation logic
    y_test_arr = y_test.values if hasattr(y_test, 'values') else np.array(y_test)
    mask_accepted = ~should_reject
    n_total = len(y_test_arr)
    n_accepted = np.sum(mask_accepted)
    n_rejected_strict = np.sum(should_reject_strict)
    
    print("\n" + "=" * 70)
    print(f"üìä √âVALUATION AVEC REJECTION STRICTE")
    print("=" * 70)
    
    print(f"\nüõ°Ô∏è REJECTION:")
    print(f"   ‚Ä¢ Total: {n_total}")
    print(f"   ‚Ä¢ Accept√©es: {n_accepted} ({n_accepted/n_total*100:.1f}%)")
    print(f"   ‚Ä¢ Rejet√©es (bounds): {n_rejected_strict} ({n_rejected_strict/n_total*100:.1f}%)")
    
    if n_accepted > 0:
        y_true_accepted = y_test_arr[mask_accepted]
        y_pred_accepted = y_pred[mask_accepted]
        abs_errors_accepted = np.abs(y_true_accepted - y_pred_accepted)
        
        rmse_accepted = np.sqrt(np.mean((y_true_accepted - y_pred_accepted) ** 2))
        mae_accepted = np.mean(abs_errors_accepted)
        pct_within_5 = np.mean(abs_errors_accepted <= 5) * 100
        
        print(f"\nüìà M√âTRIQUES ACCEPT√âES ({n_accepted}):")
        print(f"   ‚Ä¢ RMSE: {rmse_accepted:.3f}")
        print(f"   ‚Ä¢ MAE: {mae_accepted:.3f}")
        print(f"   ‚Ä¢ % err ‚â§ 5: {pct_within_5:.1f}%")
        
        # quick check on how much value the rejection adds
        rmse_global = np.sqrt(np.mean((y_test_arr - y_pred) ** 2))
        print(f"\nüìä COMPARAISON:")
        print(f"   ‚Ä¢ RMSE global: {rmse_global:.3f}")
        print(f"   ‚Ä¢ RMSE accept√©: {rmse_accepted:.3f}")
        print(f"   ‚Ä¢ Am√©lioration: {rmse_global - rmse_accepted:.3f}")
    
    # run full diagnostics suite
    flag_results = analyser_detection_flags_avec_rejection(
        y_true=y_test,
        y_pred=y_pred,
        confidence=confidence,
        should_reject=should_reject,
        should_reject_strict=should_reject_strict,
        features_out_of_bounds=features_out,
        qualite=qual,
        confidence_threshold=confidence_threshold,
        save_dir=OUTPUT_DIR,
        optimiser_auto=True,
    )
    
    tolerance_optimale = flag_results.get('tolerance_optimale', 5)
    
    diagnostic_production(y_test_arr, details, qual, save_dir=OUTPUT_DIR, 
                          tolerance_modele=float(tolerance_optimale))
    
    analyser_rejets_stricts(X_test, y_test, details, qual, save_dir=OUTPUT_DIR)
    
    # prepare engineered features for deep dive analysis on false negatives
    X_test_engineered = model._prepare_features(X_test)
    fn_analysis = analyser_faux_negatifs(
        X=X_test,
        y_true=y_test_arr,
        y_pred=y_pred,
        mask_accepted=mask_accepted,
        qualite=qual,
        tolerance_modele=float(tolerance_optimale),
        X_engineered=X_test_engineered,
        selected_features=model.selected_features,
        save_dir=OUTPUT_DIR,
    )
    
    
    model_package = {
        'model': model,
        'qualite': qual,
        'features_pool': features_dispo,
        'selected_features': model.selected_features,
        'training_stats': model.training_stats,
        'rejection_config': {
            'strict_mode': strict_mode,
            'quantile_low': quantile_low,
            'quantile_high': quantile_high,
            'n_features_tolerance': n_features_tolerance,
            'confidence_threshold': confidence_threshold,
        },
        'metrics': {
            'rmse_accepted': rmse_accepted if n_accepted > 0 else None,
            'mae_accepted': mae_accepted if n_accepted > 0 else None,
            'pct_within_5': pct_within_5 if n_accepted > 0 else None,
            'pct_rejected': (n_total - n_accepted) / n_total * 100,
        },
        'flag_detection': {
            'cible': flag_results.get('cible', CIBLES_D43.get(qual, 300)),
            'tolerance_terrain': TOLERANCE_TERRAIN,
            'tolerance_optimale': flag_results.get('tolerance_optimale', 10),
            'borne_basse': flag_results.get('borne_basse'),
            'borne_haute': flag_results.get('borne_haute'),
            'FN': flag_results.get('best', {}).get('FN'),
            'FP': flag_results.get('best', {}).get('FP'),
        },
        'date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    }
    
    joblib.dump(model_package, output_path)
    print(f"   ‚úÖ Sauvegard√©: {output_path}")
    
    return {
        'qualite': qual,
        'rmse': rmse_accepted if n_accepted > 0 else None,
        'pct_rejected': (n_total - n_accepted) / n_total * 100,
        'pct_within_5': pct_within_5 if n_accepted > 0 else None,
    }

In [None]:
if __name__ == "__main__":
    
    # adjusted to target ~10-20% rejection rate based on recent validation results
    CONFIDENCE_THRESHOLD = 0.25
    STRICT_MODE = False # disabled strict mode to allow for tolerance logic below
    
    # widened bounds (2nd-98th percentile) to keep more data while filtering extreme anomalies
    QUANTILE_LOW = 0.02
    QUANTILE_HIGH = 0.98
    
    # tolerance of 1 prevents rejecting a sample just because of a single noisy sensor
    N_FEATURES_TOLERANCE = 1
    
    print(f"""
    ‚öôÔ∏è Configuration:
    ‚Ä¢ Mode strict: {STRICT_MODE}
    ‚Ä¢ Quantiles: [{QUANTILE_LOW*100:.0f}%, {QUANTILE_HIGH*100:.0f}%]
    ‚Ä¢ Tol√©rance features hors bornes: {N_FEATURES_TOLERANCE}
    ‚Ä¢ Seuil confidence: {CONFIDENCE_THRESHOLD}
    
    R√àGLE DE REJECTION:
    ‚Ä¢ Si {N_FEATURES_TOLERANCE + 1}+ feature(s) hors [{QUANTILE_LOW*100:.0f}%, {QUANTILE_HIGH*100:.0f}%] ‚Üí PAS DE PR√âDICTION
    """)
    
    all_results = []
    
    if 'combined_data' in dir():
        for qual in QUALITES:
            # running the full pipeline: training, calibration, and diagnostic generation
            result = entrainer_modele_production(
                qual, 
                combined_data,
                confidence_threshold=CONFIDENCE_THRESHOLD,
                strict_mode=STRICT_MODE,
                quantile_low=QUANTILE_LOW,
                quantile_high=QUANTILE_HIGH,
                n_features_tolerance=N_FEATURES_TOLERANCE
            )
            if result:
                all_results.append(result)
        
        if all_results:
            # simple summary table to compare rejection rates and rmse across targets
            df = pd.DataFrame(all_results)
            print(df.to_string(index=False))
            
    else:
        print("\n Variable non d√©finie.")