In [1]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LassoCV
from itertools import combinations
import warnings
warnings.filterwarnings('ignore')

class LassoDefaultModel:
    def __init__(self, data, target_col='delta_logit_td', max_vars=4, max_lags=4):
        """
        Classe pour modélisation Lasso avec combinaisons de variables
        
        Paramètres:
        -----------
        data : DataFrame
            Données préparées avec delta_logit_td et variables macro
        target_col : str
            Variable cible (delta logit du taux de défaut)
        max_vars : int
            Nombre maximum de variables par combinaison
        max_lags : int
            Nombre maximum de retards pour Yt
        """
        self.data = data.copy()
        self.target_col = target_col
        self.max_vars = max_vars
        self.max_lags = max_lags
        self.best_models = {}
        self.feature_combinations = []
        
    def prepare_lagged_variables(self, y_lag_name='logit_td'):
        """Prépare les variables retardées de Yt"""
        for lag in range(1, self.max_lags + 1):
            self.data[f'{y_lag_name}_lag{lag}'] = self.data[y_lag_name].shift(lag)
        
        # Différences des retards de Yt (pour stationnarité)
        for lag in range(1, self.max_lags + 1):
            self.data[f'delta_{y_lag_name}_lag{lag}'] = self.data[f'{y_lag_name}_lag{lag}'].diff()
        
        return self.data.dropna()
    
    def get_feature_combinations(self, macro_vars, include_y_lags=True):
        """Génère toutes les combinaisons de variables possibles"""
        all_features = macro_vars.copy()
        
        if include_y_lags:
            # Ajouter les retards de Yt
            y_lag_features = [f'logit_td_lag{lag}' for lag in range(1, self.max_lags + 1)]
            y_lag_features += [f'delta_logit_td_lag{lag}' for lag in range(1, self.max_lags + 1)]
            all_features.extend(y_lag_features)
        
        # Générer toutes les combinaisons de 1 à max_vars variables
        self.feature_combinations = []
        for k in range(1, self.max_vars + 1):
            for combo in combinations(all_features, k):
                self.feature_combinations.append(list(combo))
        
        print(f"Nombre de combinaisons générées: {len(self.feature_combinations)}")
        return self.feature_combinations
    
    def train_lasso_models(self, macro_vars, test_size=0.2, include_y_lags=True):
        """Entraîne des modèles Lasso pour chaque combinaison"""
        # Préparation des données
        df_prepared = self.prepare_lagged_variables()
        
        # Séparation train/test
        split_idx = int(len(df_prepared) * (1 - test_size))
        train_data = df_prepared.iloc[:split_idx]
        test_data = df_prepared.iloc[split_idx:]
        
        # Génération des combinaisons
        combinations_list = self.get_feature_combinations(macro_vars, include_y_lags)
        
        results = []
        
        for i, feature_combo in enumerate(combinations_list):
            try:
                # Vérifier que toutes les features existent dans les données
                missing_features = [f for f in feature_combo if f not in train_data.columns]
                if missing_features:
                    continue
                
                X_train = train_data[feature_combo]
                y_train = train_data[self.target_col]
                X_test = test_data[feature_combo]
                y_test = test_data[self.target_col]
                
                # Modèle Lasso avec validation croisée
                lasso_model = LassoCV(cv=5, random_state=42, max_iter=10000)
                lasso_model.fit(X_train, y_train)
                
                # Évaluation
                train_score = lasso_model.score(X_train, y_train)
                test_score = lasso_model.score(X_test, y_test)
                
                # Nombre de variables sélectionnées (coefficients non nuls)
                n_selected_vars = np.sum(lasso_model.coef_ != 0)
                
                results.append({
                    'model_id': i,
                    'features': feature_combo,
                    'model': lasso_model,
                    'train_score': train_score,
                    'test_score': test_score,
                    'n_selected_vars': n_selected_vars,
                    'alpha': lasso_model.alpha_,
                    'coefficients': lasso_model.coef_
                })
                
            except Exception as e:
                continue
        
        # Trier par performance sur test set
        results.sort(key=lambda x: x['test_score'], reverse=True)
        self.best_models = results
        
        return results
    
    def get_top_models(self, top_n=10):
        """Retourne les top N modèles"""
        return self.best_models[:top_n]

class DefaultRateProjector:
    def __init__(self, initial_logit_td):
        """
        Classe pour les projections et reconstruction des taux de défaut
        
        Paramètres:
        -----------
        initial_logit_td : float
            Dernière valeur observée de logit_td
        """
        self.initial_logit_td = initial_logit_td
        self.current_logit_td = initial_logit_td
        
    def reset(self):
        """Réinitialise pour de nouvelles projections"""
        self.current_logit_td = self.initial_logit_td
    
    def logit_to_default_rate(self, logit_val):
        """Convertit logit en taux de défaut"""
        return np.exp(logit_val) / (1 + np.exp(logit_val))
    
    def default_rate_to_logit(self, td):
        """Convertit taux de défaut en logit"""
        return np.log(td / (1 - td))
    
    def project_with_model(self, model, feature_names, future_macro_data, 
                          n_periods=6, initial_lags=None):
        """
        Fait des projections avec un modèle Lasso
        
        Paramètres:
        -----------
        model : modèle Lasso entraîné
        feature_names : list
            Noms des features utilisées dans le modèle
        future_macro_data : DataFrame
            Valeurs futures des variables macro
        n_periods : int
            Nombre de périodes à projeter
        initial_lags : dict
            Valeurs initiales des retards {lag1: val1, lag2: val2, ...}
        """
        projections = []
        self.reset()
        
        # Initialisation des retards
        lags_history = {
            'logit_td_lag1': initial_lags.get('lag1', self.initial_logit_td) if initial_lags else self.initial_logit_td,
            'logit_td_lag2': initial_lags.get('lag2', self.initial_logit_td) if initial_lags else self.initial_logit_td,
        }
        
        for period in range(n_periods):
            # Préparer les features pour cette période
            features_dict = {}
            
            for feature in feature_names:
                if feature in future_macro_data.columns:
                    # Variable macro - prendre la valeur future
                    features_dict[feature] = future_macro_data[feature].iloc[period]
                elif feature in lags_history:
                    # Retard de Yt - prendre de l'historique
                    features_dict[feature] = lags_history[feature]
                elif feature.startswith('delta_logit_td_lag'):
                    # Différence de retard - à calculer
                    lag_num = int(feature.split('lag')[-1])
                    parent_feature = f'logit_td_lag{lag_num}'
                    if lag_num == 1:
                        delta_val = lags_history.get('logit_td_lag1', 0) - lags_history.get('logit_td_lag2', 0)
                    else:
                        delta_val = 0  # Simplification
                    features_dict[feature] = delta_val
            
            # Créer le vecteur de features dans le bon ordre
            feature_vector = np.array([[features_dict.get(f, 0) for f in feature_names]])
            
            # Faire la prédiction du delta logit
            delta_logit_pred = model.predict(feature_vector)[0]
            
            # Mettre à jour le logit courant
            new_logit_td = self.current_logit_td + delta_logit_pred
            
            # Convertir en taux de défaut
            default_rate_pred = self.logit_to_default_rate(new_logit_td)
            
            # Stocker la projection
            projections.append({
                'period': period + 1,
                'delta_logit_pred': delta_logit_pred,
                'logit_td_pred': new_logit_td,
                'default_rate_pred': default_rate_pred,
                'features_used': feature_names
            })
            
            # Mettre à jour l'historique pour la période suivante
            self.current_logit_td = new_logit_td
            lags_history['logit_td_lag2'] = lags_history['logit_td_lag1']
            lags_history['logit_td_lag1'] = new_logit_td
        
        return pd.DataFrame(projections)

def comprehensive_lasso_analysis(data, macro_vars, target_col='delta_logit_td', 
                               logit_col='logit_td', top_models=5, n_periods=6):
    """
    Analyse complète Lasso avec projections
    
    Paramètres:
    -----------
    data : DataFrame avec données historiques
    macro_vars : list de variables macroéconomiques
    target_col : colonne cible (delta logit)
    logit_col : colonne logit du taux de défaut
    top_models : nombre de meilleurs modèles à considérer
    n_periods : nombre de périodes de projection
    """
    
    # 1. Initialisation du modèle Lasso
    lasso_model = LassoDefaultModel(data, target_col=target_col, max_vars=4)
    
    # 2. Entraînement des modèles
    print("Entraînement des modèles Lasso...")
    models_results = lasso_model.train_lasso_models(macro_vars)
    
    # 3. Sélection des meilleurs modèles
    top_models_list = lasso_model.get_top_models(top_n=top_models)
    
    print(f"\nTop {top_models} modèles sélectionnés:")
    for i, model_info in enumerate(top_models_list):
        print(f"Modèle {i+1}: R² test = {model_info['test_score']:.3f}, "
              f"Variables: {model_info['features']}")
    
    # 4. Préparation des données futures (exemple)
    future_macro = pd.DataFrame({
        var: [np.random.normal(0, 1) for _ in range(n_periods)] for var in macro_vars
    })
    
    # 5. Initialisation du projecteur
    last_logit = data[logit_col].iloc[-1]
    projector = DefaultRateProjector(last_logit)
    
    # 6. Projections avec chaque meilleur modèle
    all_projections = {}
    
    for i, model_info in enumerate(top_models_list):
        model = model_info['model']
        features = model_info['features']
        
        projections = projector.project_with_model(
            model=model,
            feature_names=features,
            future_macro_data=future_macro,
            n_periods=n_periods
        )
        
        all_projections[f'model_{i+1}'] = {
            'projections': projections,
            'features': features,
            'test_score': model_info['test_score']
        }
    
    return {
        'lasso_analyzer': lasso_model,
        'projector': projector,
        'all_projections': all_projections,
        'top_models': top_models_list
    }



In [2]:
# Exemple d'utilisation
if __name__ == "__main__":
    # Données d'exemple
    np.random.seed(42)
    n_obs = 100
    
    sample_data = pd.DataFrame({
        'logit_td': np.random.normal(-2, 0.5, n_obs),
        'gdp_growth': np.random.normal(2, 1, n_obs),
        'unemployment': np.random.normal(8, 2, n_obs),
        'inflation': np.random.normal(2, 0.5, n_obs),
        'interest_rate': np.random.normal(3, 1, n_obs)
    })
    
    # Calcul du delta logit
    sample_data['delta_logit_td'] = sample_data['logit_td'].diff()
    
    macro_variables = ['gdp_growth', 'unemployment', 'inflation', 'interest_rate']
    
    # Lancement de l'analyse
    results = comprehensive_lasso_analysis(
        data=sample_data,
        macro_vars=macro_variables,
        top_models=3,
        n_periods=6
    )
    
    # Affichage des résultats
    for model_name, model_data in results['all_projections'].items():
        print(f"\n{model_name} (R²: {model_data['test_score']:.3f}):")
        print(model_data['projections'][['period', 'default_rate_pred']].head())

Entraînement des modèles Lasso...
Nombre de combinaisons générées: 793

Top 3 modèles sélectionnés:
Modèle 1: R² test = 0.530, Variables: ['unemployment', 'interest_rate', 'logit_td_lag2', 'delta_logit_td_lag1']
Modèle 2: R² test = 0.530, Variables: ['gdp_growth', 'unemployment', 'interest_rate', 'logit_td_lag1']
Modèle 3: R² test = 0.527, Variables: ['unemployment', 'logit_td_lag1', 'logit_td_lag4', 'delta_logit_td_lag4']

model_1 (R²: 0.530):
   period  default_rate_pred
0       1           0.097998
1       2           0.094897
2       3           0.108375
3       4           0.095749
4       5           0.104236

model_2 (R²: 0.530):
   period  default_rate_pred
0       1           0.099777
1       2           0.093455
2       3           0.118699
3       4           0.098368
4       5           0.111913

model_3 (R²: 0.527):
   period  default_rate_pred
0       1           0.107205
1       2           0.106549
2       3           0.108803
3       4           0.106588
4       5     