# Projet de Machine Learning  UMONS 2024-2025

### Thème : Prédiction du score de Macron aux 2nd Tour des éléctions 2022

----

# Configuration et Installation des Dépendances

----

In [120]:
from sys import modules as sys_modules
from os.path import join
# chemin vers le dossier contenant toute les données à utiliser
if 'google.colab' in sys_modules :
  from google.colab import drive
  drive.mount('/content/drive')
  data_path = "/content/drive/MyDrive/Colab_Notebooks/ProjetML/src/datasets"
else:
  data_path = "datasets"


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [121]:
if 'google.colab' in sys_modules:
      bib = ["openpyxl",
             "xlrd",
             "optuna",
             "lightgbm",
             "xgboost"
            ]
      for b in bib:
        %pip install {b}



## a. Import des Bibliothèques/dépendances


In [122]:
# @title manipulation des vecteurs
import pandas as pd
import numpy as np


In [123]:
# @title création des graphiques
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.ticker as mtick
from tqdm import tqdm

In [124]:
# @title prétraitement des données en masse
from sklearn.preprocessing import OneHotEncoder, RobustScaler , FunctionTransformer
from sklearn.compose import ColumnTransformer , make_column_transformer
from sklearn.compose import make_column_selector
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer


In [125]:
# @title selection des features
from collections import defaultdict
from sklearn.feature_selection import SelectFromModel
from functools import lru_cache
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LassoCV

In [126]:
# @title sélection du meilleur modèle
import optuna
from optuna.pruners import MedianPruner
from optuna.exceptions import TrialPruned
from sklearn.model_selection import train_test_split, KFold, cross_val_score , RepeatedKFold
from typing import Dict, Any, Tuple , List
from sklearn.base import clone
from sklearn.model_selection import GridSearchCV

In [127]:
# @title fonction de score et evaluation
from scipy import stats
from sklearn.metrics import mean_squared_error, r2_score ,mean_absolute_error
from sklearn.metrics import confusion_matrix , ConfusionMatrixDisplay

In [128]:
# @title initialisation des modèles
from sklearn.linear_model import ElasticNet , Lasso
from xgboost import XGBRegressor


In [129]:
# @title options sytèmes
from sys import modules as sys_modules
import os
import sys
import joblib
from joblib import Parallel , delayed
import json
import warnings
import os
import glob
import re
from IPython.display import Markdown, display




## b. Configuration


In [130]:
#Pour ignorer les warnings
warnings.filterwarnings('ignore')

# configuration des graphiques
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 8)
sns.set_palette('Set2')

# Pour une meilleure lisibilité dans le notebook
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.float_format', '{:.3f}'.format)

print("chargement des bibliothèques terminé")

chargement des bibliothèques terminé


## c. Constantes

In [131]:
# @title Paramètres globaux
CORRTHRESHOLD = 60 # @param {"type":"integer"}
RANDOM_STATE = 42 # @param {"type":"integer"}
IDs= [] # @param {type:"raw"}

## d. Fonctions utilitaires

In [132]:
# @title Utilitaires

def sep(lg=90):
    """Affiche une ligne de séparation"""
    print("\n" + "-"*lg + "\n")

def sub(l1,l2):
  """Retourne la liste des éléments de l1 qui ne sont pas dans l2"""
  return [x for x in l1 if x not in l2]

def get_columns_above_missing_threshold(df, threshold:int=CORRTHRESHOLD):
    """
    Identifie les colonnes ayant un pourcentage de valeurs manquantes supérieur au seuil spécifié.

    Args:
        df (pandas.DataFrame): Le DataFrame à analyser
        threshold (float): Le seuil en pourcentage (entre 0 et 100) au-delà duquel une colonne est considérée
                          comme ayant trop de valeurs manquantes. Par défaut : 50

    Returns:
        list: Liste des noms de colonnes dont le pourcentage de valeurs manquantes dépasse le seuil,
              triée par pourcentage décroissant
    """
    # Vérification que le seuil est valide
    if not 0 <= threshold <= 100:
        raise ValueError("Le seuil doit être compris entre 0 et 100")

    # Calcul du pourcentage de valeurs manquantes par colonne
    missing_percentages = (df.isnull().sum() / len(df)) * 100

    # Sélection des colonnes dépassant le seuil
    columns_above_threshold = missing_percentages[missing_percentages > threshold]

    # Tri par pourcentage décroissant
    columns_above_threshold = columns_above_threshold.sort_values(ascending=False)

    # Création d'un DataFrame avec les colonnes et leurs pourcentages
    """result_df = pd.DataFrame({
        'Colonne': columns_above_threshold.index,
        'Pourcentage de valeurs manquantes': columns_above_threshold.values
    })"""

    return  columns_above_threshold.index.to_list() , pd.DataFrame(columns_above_threshold)

def visualise(df):
  print(f"forme : {df.shape}")

def all_columns(df, res=True):
  all_col = df.columns.tolist()
  print(f"les colonnes sont : \n{df.columns}")
  print(f" il y'a {len(all_col)} colonnes dans le dataframe")
  sep()
  if res:
    return all_col
  else:
    return None

def get_analyse(data , id , col_to_drop , res =True):

    df= data.copy()
    all_col = df.columns.tolist()
    col_to_keep = sub(all_col,col_to_drop)
    to_holes,_ = get_columns_above_missing_threshold(df[col_to_keep])
    col_to_keep2 = sub(col_to_keep,to_holes)
    IDs.append(id)
    if res:
      return col_to_keep2 , to_holes
    else:
      return None

def write_markdown_conclusion(id_colonne, colonnes_manquantes, colonnes_supprimer, colonnes_conserver):
    """
    Génère une section Markdown pour la conclusion de l'analyse d'un DataFrame.

    Args:
        id_colonne (str): Nom de la colonne d'identification.
        colonnes_manquantes (list): Liste des colonnes avec un fort taux de valeurs manquantes.
        colonnes_supprimer (list): Liste des colonnes à supprimer.
        colonnes_conserver (list): Liste des colonnes à conserver.
    """

    markdown_text = f"""
#### Conclusion

* _Identifiant_ : ``'{id_colonne}'``

* _Colonnes avec plus de {CORRTHRESHOLD}% de valeurs manquantes_ : ``{colonnes_manquantes}``

* _Colonnes à supprimer_ : ``{colonnes_supprimer}``

* _Colonnes à conserver_ : ``{colonnes_conserver}``

----
"""
    display(Markdown(markdown_text))

def display_correlation_matrix(df , save = False,name=None , table=False , target=None, threshold=0.5):
    """
    Affiche la matrice de corrélation entre les colonnes numériques du DataFrame et la colonne cible.

    Args:
    ----
    df : pd.DataFrame
        Le DataFrame à analyser.
    target : str
        Le nom de la colonne cible.
    threshold : float
        Le seuil de corrélation au-delà duquel les colonnes sont considérées comme corrélées.
    """
    df = df.copy()
    t_in = (target in df.columns)

    num_cols = df.select_dtypes(include=[np.number]).columns
    # Calculer la matrice de corrélation

    if len(num_cols) == 0:
        raise ValueError("Aucune colonne numérique trouvée dans le DataFrame.")

    if (target is not None) and t_in:
        num_cols = [target] + [col for col in num_cols if col != target]

    corr_matrix = df[num_cols].corr()

    # Masquer la moitié inférieure
    upper = corr_matrix.where(np.tril(np.ones(corr_matrix.shape), k=1).astype(bool))

    # filter les correlations dépassant le seuil
    filtered_corr = upper[
                          (upper.abs() > threshold) &
                          (upper != 1.0)
                          ].dropna(how='all',axis=0).dropna(axis=1, how='all').sort_values(by=target, ascending=False) if t_in else upper[ (upper.abs() > threshold) & (upper != 1.0)].dropna(how='all',axis=0).dropna(axis=1, how='all').sort_values(ascending=False)

    if t_in:
        corr_with_target=  corr_matrix[target].drop(target).sort_values(ascending=False)


        if table :
            filt_title = f"Tableau de corrélation entre les colonnes dépassnt  {threshold} : \n"
            display(Markdown(filt_title))
            display(filtered_corr)
            display(Markdown("-"*50))
            with_ta_title = f"Tableau de corrélation entre les colonnes et la colonne {target} : \n"
            display(Markdown(with_ta_title))
            display(corr_with_target)

        # Afficher la matrice de corrélation
        fig , (ax1 , ax2) = plt.subplots( 1 , 2 , figsize=(12, 8))
        sns.heatmap( filtered_corr , ax= ax1 , annot=True, fmt=".2f", cmap='coolwarm', square=True)
        ax1.set_title(f"Matrice de corrélation (seuil : {threshold})")
        sns.heatmap( corr_with_target , ax= ax2 , annot=True, fmt=".2f", cmap='coolwarm', square=True)
        ax2.set_title(f"Matrice de corrélation avec {target}")
        plt.legend(loc='upper right', fontsize=10)
        plt.tight_layout()

        if save:
            if name is None:
                plt.savefig("corr_matrix.png")
            else:
                plt.savefig(f"corr_matrix_{name}.png")

        plt.show()
    else:
        if table :
            display(Markdown(f"La colonne cible '{target}' n'existe pas dans le DataFrame."))
            display(Markdown("-"*50))
            filt_title = f"Tableau de corrélation entre les colonnes dépassnt  {threshold} : \n"
            display(Markdown(filt_title))
            display(filtered_corr)

        # Afficher la matrice de corrélation
        plt.figsize=(12, 8)
        sns.heatmap( filtered_corr  , annot=True, fmt=".2f", cmap='coolwarm', square=True)
        plt.title(f"Matrice de corrélation (seuil : {threshold})")
        plt.tight_layout()

        if save:
            if name is None:
                plt.savefig("corr_matrix.png")
            else:
                plt.savefig(f"corr_matrix_{name}.png")

        plt.show()



# 1. EXPLORATION DES DONNEES

----


Le but ici c'est d'essayer de comprendre  les données , c'est pouvoir repondre aux questions :
* Quelles sont les données visiblement non-pertinentes ?
* Detecter les outliers ?
* Vérifier le taux de valeurs manquantes
* regrouper les informations en indices synthétiques

----

## 1.1 Chargement des données

In [None]:

print("debut du chargement des données ... .. ... ..")

# Chargement des données de d'entrainement et de test
result_train = pd.read_csv( os.path.join(data_path,"results_train.csv") , sep = ',',encoding='utf-8')
result_test = pd.read_csv( os.path.join(data_path,"results_test.csv") , sep = ',',encoding='utf-8')

res_train_df = result_train.copy()
res_test_df = result_test.copy()

#-------------------------
# Données additionnelles
#------------------------

#  Niveau de vie
niveau_vie = pd.read_excel(os.path.join(data_path, "Niveau_de_vie_2013_a_la_commune.xlsx"))
niveau_vie_df = niveau_vie.copy()

#  Communes de France
communes_france = pd.read_csv(os.path.join(data_path, "communes-france-2022.csv"), sep=',', encoding='utf-8')
communes_df = communes_france.copy()

#  Données d'âge
age_insee = pd.read_excel(os.path.join(data_path, "age-insee-2020.xlsx"))
age_df = age_insee.copy()

# Données diverses INSEE
insee_divers = pd.read_excel(os.path.join(data_path, "MDB-INSEE-V2.xls"))
insee_divers_df = insee_divers.copy()

print("chargement des données terminé !! ")

debut du chargement des données ... .. ... ..


Optimisation des hyperparamètres:  30%|███       | 3/10 [1:58:28<4:36:27, 2369.58s/itération, Best RMSE=21.6]


## 1.2 Pré-Analyse  et Pré-Selection(visuelle) des Features (colonnes)

In [None]:
# @title colonne cible
target = '% Voix/Ins' # @param {type:"string"}

### results_train

In [None]:
# @title visualisation
visualise(res_train_df)

res_train_df.head(3)

In [None]:
# @title affichage des colonnes
all_col_train = all_columns(res_train_df)

In [None]:
# présélection
# colonnes clairement non informatives à supprimer
col_to_drop_train = ['Unnamed: 27' ,'Unnamed: 26' , 'Unnamed: 28' ,
                'Unnamed: 29' , 'Unnamed: 30','Unnamed: 31' ,
                'Unnamed: 32' ,'Prénom','Sexe','Nom','N°Panneau','Libellé de la commune']

id_train = 'CodeINSEE'

col_to_keep1 , to_holes_train = get_analyse(res_train_df,id_train,col_to_drop_train)

In [None]:
# @title conclusion
write_markdown_conclusion(id_train, to_holes_train, col_to_drop_train, col_to_keep1)

### niveau de vie

In [None]:
# @title visualisation
visualise(niveau_vie_df)

niveau_vie_df.head(3)

In [None]:
all_col_niveau_vie = all_columns(niveau_vie_df)


In [None]:
# @title présélection
# colonnes clairement non informatives à supprimer
col_to_drop_niveau = []
id_niveau = 'Code Commune'

col_to_keep2 , to_holes_niveau = get_analyse(niveau_vie_df,id_niveau,col_to_drop_niveau)

In [None]:
# @title conclusion
write_markdown_conclusion(id_niveau, to_holes_niveau, col_to_drop_niveau, col_to_keep2)

### communes de frances

In [None]:
# @title visualisation
visualise(communes_df)

communes_df.head(3)

In [None]:
# @title all colonnes
all_col_commune = all_columns(communes_df)

A première vu , on n'a pas besoin des
* url vers les sites internet des communes  c'est à dire ``url_wikipedia`` , ``url_ville``
* ``typecom`` et ``typecom_texte`` sont des colonnes constantes : on est toujours censé avoir à faire à des communes
* pas besoin de tous les type de noms de la communes , un seul suffira `nom_standard` , mais il est aussi renseigné dans `Niveau de vie`
* unamed ici represente un index donc inutile aussi
* on peut regrouper ` 'altitude_moyenne'`, `'altitude_minimale'`,
       `'altitude_maximale'` en  un ou deux indices d'altitude , idem pour les ``latitude...`` et ``longitute...``
* supprimer les infos d'identification (x_code , x_nom) sur les départements et régions , leurs codes sont déjà fourni dans ``MDB-insee-divers`` ce dernier ayant plus d'échantillons

In [None]:
# @title présélection
# colonnes jugées non informatives à supprimer

col_to_drop_commune = ['url_wikipedia','url_villedereve',
                       'typecom','typecom_texte',
                       'nom_standard','nom_a','nom_de',
                       'nom_sans_pronom','gentile',
                       'nom_sans_accent','nom_standard_majuscule',
                       'superficie_hectare','Unnamed: 0',
                       'academie_nom','codes postaux',
                       'longitude_centre','latitude_centre',
                       'reg_nom','dep_nom','canton_nom','epci_nom']
id_commune = 'code_insee'

col_to_keep3 , to_holes_commune = get_analyse(communes_df,id_commune,col_to_drop_commune)

In [None]:
# @title conclusion
write_markdown_conclusion(id_commune, to_holes_commune, col_to_drop_commune, col_to_keep3)

### age-insee

#### première analyse visuelle

In [None]:
# @title visualisation
visualise(age_df)

age_df.head(3)

In [None]:
# @title Présélection
# visuellement
col_to_drop_age = ['NOM']
id_age = 'INSEE'
all_col_age = age_df.columns.tolist()
col_to_keep4 , to_holes_age = get_analyse(age_df,id_age,col_to_drop_age)


In [None]:
col_to_drop_age += to_holes_age
print(col_to_drop_age)


A defaut d'avoir toutes les tranches d'âge pour chaque sexe , on regroupe les nombres d'hommes et femmes de chaque tranche pour avoir moins de variables:
* Sur le plan social  :
    * `% Mineurs` (0 - 17),
    * `% Adultes`(18-54) ,
    * `% Seniors`(55-79) ,
    * `% Tres_seniors`(80+)
* sur le plan économique :
    *`% Travailleurs` ceux qui ont l'age de potentiellement travailler
    * `% Retraites`  ceux qu'on a jugés ne plus pouvoir travailler car ont plus de `64` ans

#### Regroupement des âges

In [None]:
age_groups = age_df[age_df.columns[:5]]

# population total
col_genre = sub(sub(all_col_age,col_to_drop_age),age_df.columns[:5].tolist())
col_hom = col_genre[10:]
age_groups['Population'] = age_df[col_genre].sum(axis=1)


In [None]:
#--------------------------------
# Regroupement par cycle de vie
#-------------------------------

# les mineurs
age_groups['% Mineurs'] = (age_df['F0-2'] + age_df['F3-5'] +age_df['F6-10']+age_df['F11-17'] + age_df['H0-2'] + age_df['H3-5'] + age_df['H0-2']+age_df['H6-10']+age_df['F11-17'] ) / age_groups['Population'] * 100

# les adultes
age_groups['% Adultes'] = (age_df['F18-24'] + age_df['F25-39'] + age_df['F40-54']+age_df['H18-24']+age_df['H25-39'] + age_df['H40-54']) / age_groups['Population'] * 100


# les agés
age_groups['% Seniors'] = (age_df['F55-64'] + age_df['F65-79'] + age_df['H55-64'] + age_df['H65-79']) / age_groups['Population'] * 100


# les très agés
age_groups['% Tres_Seniors'] = (age_df['F80+'] + age_df['H80+']) / age_groups['Population'] * 100

# les retraités
age_groups['% Retraites'] = (age_df['F65-79'] + age_df['H65-79'] + age_df['F80+'] + age_df['H80+']) / age_groups['Population'] * 100

# travailleurs potentiels
age_groups['% Travailleurs'] = (age_groups['% Adultes'] + age_groups['% Seniors'] + age_groups['% Tres_Seniors'] - age_groups['% Retraites']) / age_groups['Population'] * 100

# ratio hommes/femmes
age_groups['rH/F'] = ( age_df[col_hom].sum(axis=1)  / age_df[col_genre[:10]].sum(axis=1) )

age_groups.to_csv("age_groups.csv",index=False)
age_groups.head(3)


#### mise à jour et conclusion

In [None]:
col_to_drop_age = []
colt_to_keep4 = sub(all_col_age,col_to_drop_age)


### MDB-INSEE-Divers

#### première analyse visuelle

In [None]:
# @title visualisation
visualise(insee_divers_df)

insee_divers_df.head(3)

In [None]:
# @title affichage des colonnes
all_col_insee_divers = all_columns(insee_divers_df)

#### regroupement

In [None]:
# @title présélection
id_mdb = 'CODGEO'
# colonnes jugées non informatives ou trop difficile à manipuler
col_to_drop_mdb = ['Nb Résidences Principales','Nb Log Vacants' ,
                   'Nb Résidences Secondaires',
                   'Score démographique' , 'Score Ménages' ,
                   'Evolution Pop %','Score Fiscal',
                   'Score Evasion Client','Score Synergie Médicale',
                   'Population','Nb Logement Secondaires',
                   'Capacité Camping', 'Capacité Hotel',
                   'Dynamique Démographique BV', 'Capacité Fisc',
                   'CP' , 'Urbanité Ruralité' , 'SEG Environnement Démographique Obsolète']
# les colonnes qui on été regroupées
HF = ['Nb Homme' , 'Nb Femme']

mM = ['Nb Mineurs' , 'Nb Majeurs']

Nb_entreprises = ['Nb Entreprises Secteur Commerce' ,
                  'Nb Entreprises Secteur Construction',
                  'Nb Entreprises Secteur Industrie' ,
                  'Nb Entreprises Secteur Services']

Nb_medecins = ['Nb Omnipraticiens BV' ,
             'Nb Infirmiers Libéraux BV',
             'Nb dentistes Libéraux BV',
             'Nb pharmaciens Libéraux BV']

Indice_creation_eco = ['Nb Création Enteprises' ,
                    'Nb Création Industrielles',
                    'Nb Création Construction',
                    'Nb Création Commerces',
                    'Nb création Services']

Indice_fiscal = ['Moyenne Revenus Fiscaux Départementaux',
                 'Moyenne Revenus Fiscaux Régionaux']

Indice_social =['Nb Education, santé, action sociale',
                'Nb Santé, action sociale',
                'Nb institution de Education, santé, action sociale, administration']

Indice_services = ['Nb Services personnels et domestiques',
                   'Nb Industries des biens intermédiaires',
                   'Nb de Commerce',
                   'Nb de Services aux particuliers']

Indice_salaire_global = ['Dep Moyenne Salaires Horaires',
                         'Reg Moyenne Salaires Horaires',]

indice_salaire_csp = ['Dep Moyenne Salaires Cadre Horaires',
                      'Reg Moyenne Salaires Cadre Horaires',
                      'Dep Moyenne Salaires Prof Intermédiaire Horaires',
                      'Reg Moyenne Salaires Prof Intermédiaire Horaires',
                      'Dep Moyenne Salaires Employé Horaires',
                      'Reg Moyenne Salaires Employé Horaires',
                      'Dep Moyenne Salaires Ouvrié Horaires',
                      'Reg Moyenne Salaires Ouvrié Horaires']

dict_indice_salaire = {
    'indice_salaire_global': Indice_salaire_global,
    **{'indice_salaire_'+str(i) : indice_salaire_csp[i:i+2] for i in range(0, len(indice_salaire_csp), 2)}
}

col_to_drop_mdb += HF + mM + Nb_entreprises + Nb_medecins + Indice_creation_eco + Indice_fiscal + Indice_salaire_global + indice_salaire_csp + Indice_services + Indice_social

col_to_keep5 = sub(all_col_insee_divers,col_to_drop_mdb)

# ...existing code...

# Dictionnaire de correspondance pour les nouveaux noms
nouveaux_noms = {
    'indice_salaire_0': 'indice_salaire_cadre',
    'indice_salaire_2': 'indice_salaire_prof',
    'indice_salaire_4': 'indice_salaire_employé',
    'indice_salaire_6': 'indice_salaire_ouvrié'
}

for old_key, new_key in nouveaux_noms.items():
    if old_key in dict_indice_salaire:
        dict_indice_salaire[new_key] = dict_indice_salaire.pop(old_key)

# Vérification
for k, v in dict_indice_salaire.items():
    print(f"{k} : {v}")
# ...existing code...

In [None]:
mdb_df = insee_divers_df[col_to_keep5].copy()

# nombre d'entreprises
mdb_df['Nb Entreprises'] = insee_divers_df[Nb_entreprises].sum(axis=1)

# nombre de médecins
mdb_df['Nb Medecins'] = insee_divers_df[Nb_medecins].sum(axis=1)
mdb_df.head(3)

# ratio du nombre d'hommes par rapport au femmes
mdb_df['rH/F'] = insee_divers_df[HF[0]] / insee_divers_df[HF[1]]

# ratio du nombre de mineurs par par rapport au majeurs
mdb_df['rm/M'] = insee_divers_df[mM[0]] / insee_divers_df[mM[1]]

# Indice salariax
for k , v in dict_indice_salaire.items():
    mdb_df[k] = insee_divers_df[v[0]]/insee_divers_df[v[1]]

# indice social
mdb_df['indice_social'] = insee_divers_df[Indice_social].sum(axis=1)

# indice services
mdb_df['indice_services'] = insee_divers_df[Indice_services].sum(axis=1)

mdb_df.to_csv("MDB_groupe.csv",index=False)
mdb_df.head(3)

* pas besoin du `Nb Résidences Principales` et `Nb Résidences Secondaires` , `Nb log Vacants`  car leur somme = `Nb Logement`
* on peut regrouper les colonnes `Moyenne Revenus Fiscaux Départementaux` à `Moyenne Revenus Fiscaux Régionaux` en `indice fiscal` par  Indice_fiscal_dep $= \frac{Moyenne Revenus Fiscaux Départementaux}{Moyenne Revenus Fiscaux Régionaux}$

* regrouper  `Dep Moyenne Salaires Horaires` à `Reg Moyenne Salaires Horaires`  en indice salariaux -> utiliser la méthodes ACP apprament qui permet de les réduire à 2 indices synthétiques `CP1` et `CP2`

## 1.3 Fusion des sources fournies et nettoyage

### resultat de la préanalyse

In [None]:
# @title colonnes à supprimer , avec trop de valeurs manquantes et à conserver

cols_to_drop = col_to_drop_train + col_to_drop_niveau + col_to_drop_commune + col_to_drop_age + col_to_drop_mdb
print(f"{len(cols_to_drop)} colonnes(s) à supprimer")

cols_to_keep = col_to_keep1 + col_to_keep2 + col_to_keep3 + col_to_keep4 + col_to_keep5
print(f"{len(cols_to_keep)} colonne(s) à conserver y compris les identifiants")


### fusion et extration des données

In [None]:
def safe_feature_selection(X, selected_features):
    """
    Retourne les colonnes de X correspondant aux selected_features valides.
    """
    return X[[feat for feat in selected_features if feat in X.columns]]

merge_list = [(niveau_vie_df , id_niveau),
              (communes_df , id_commune),
              (age_groups , id_age),
              (mdb_df , id_mdb)]

#fonction de fussion
def prepare_datasets(train_data,
                     test_data,
                     merge_list ,
                     base_id = id_train,
                     cols_to_drop = cols_to_drop,
                     verbose=False):
    """
    Fonction pour préparer et fusionner les datasets pour la modélisation
    """
    #Extraire les données de Macron
    train_features = train_data[train_data['Nom']=='MACRON'].copy()
    test_features = test_data.copy()

    # harmoniser les colonnes d'identification
    train_features['CodeINSEE'] = train_features['CodeINSEE'].astype(str).str.zfill(5)
    test_features['CodeINSEE'] = test_features['CodeINSEE'].astype(str).str.zfill(5)

    for i , (df , id_col) in enumerate(merge_list):
        # Vérifier si la colonne d'identification est présente dans le DataFrame
        if id_col not in df.columns:
            raise ValueError(f"La colonne d'identification '{id_col}' n'est pas présente dans le DataFrame à l'index {i}.")

        # Assurer que la colonne d'identification est au format string avec padding
        df[id_col] = df[id_col].astype(str).str.zfill(5)

        # nombre d'échantillons avant la fusion
        n_samples_before = train_features.shape[0]

        # Fusionner les DataFrames sur la colonne d'identification
        train_features = pd.merge(train_features, df, left_on=base_id, right_on=id_col, how='left')

        test_features = pd.merge(test_features, df, left_on=base_id, right_on=id_col, how='left')

        # nombre d'échantillons après la fusion
        n_samples_after = train_features.shape[0]

        assert n_samples_before == n_samples_after, f"Erreur de fusion : le nombre d'échantillons a changé après la fusion avec {id_col}."

        # supprimer l'identifiant
        if id_col != base_id:
            train_features = train_features.drop(columns=[id_col], axis=1, errors='ignore')
            test_features = test_features.drop(columns=[id_col], axis=1, errors='ignore')
    #-----------------------------------------------------------------
    # Supprimer les colonnes non informatives
    train_features = train_features.drop(columns=cols_to_drop, axis=1, errors='ignore')
    test_features = test_features.drop(columns=cols_to_drop, axis=1, errors='ignore')
    #-----------------------------------------------------------------
    # Supprimer les colonnes d'identification
    train_features = train_features.drop(columns=IDs, axis=1 , errors='ignore')

    if verbose :
        missing1 = ((train_data.isnull().sum()/ train_data.shape[0]).sum()/train_data.shape[1] )* 100
        missing2 = ((train_features.isnull().sum()/ train_features.shape[0]).sum()/train_features.shape[1] )* 100
        #-------------------------------------------------------------
        print(f" {train_features.shape[1]} colonnes vs {train_data.shape[1]} avant la fusion\n")
        print(f" {len(cols_to_drop) + len(merge_list) + 1} colonnes supprimées pendant la fusion \n")
        print(f" \n{train_features.shape[0]} lignes vs {train_data.shape[0]} avant la fusion\n")
        print(f" \n{missing2}% de valeurs manquantes vs {missing1}% avant la fusion")


    return train_features, test_features


In [None]:
train_data , test_data = prepare_datasets(res_train_df, res_test_df,
                                          merge_list,
                                          base_id = id_train,
                                          cols_to_drop = cols_to_drop,
                                          verbose=True)

print(f"forme du train_data : {train_data.shape}")
print(f"forme du test_data : {test_data.shape}")
test_data.head(3)

In [None]:
X = train_data.drop(columns=[target], axis=1,errors='ignore')
y = train_data[target]

print(X.info())
print(f"forme des prédicteurs : {X.shape}")
print(f"forme de la cible : {y.shape}")


# 2. Prétraitement , Pipeline et Selection automatique de features

----

## 2.1 Création du pipelines de prétraitement

In [None]:
def get_preprocessor(data,features=None):
    """
    Crée un préprocesseur complet avec gestion des valeurs infinies, manquantes et aberrantes
    """
    X=data.copy()
    # Remplacer les valeurs infinies par NaN
    X = X.replace([np.inf, -np.inf], np.nan)

    if features is not None:
        X = X[features]

    numeric_features = X.select_dtypes(include=['int64', 'float64',np.number]).columns
    categorical_features = X.select_dtypes(include=['object', 'category','bool']).columns

    # conversion des colonnes catégorielles en type str pour éviter les type mixte
    X[categorical_features] = X[categorical_features].astype(str)


    # Pipeline pour les variables numériques
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', RobustScaler())
    ] ,   memory='cache_directory')

    # Pipeline pour les variables catégorielles
    categorical_transformer = Pipeline(
        steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder( sparse_output=False, handle_unknown='ignore'))
        ])

    # Combinaison des transformateurs
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ],
        remainder = 'drop',
        verbose_feature_names_out=False,
        n_jobs=-1)

    return preprocessor


## 2.2 Selection automatique des features

### 2.2.1 Selection grossière automatique avec xgboost


In [None]:
def remove_quasi_constant_features(df, threshold=0.01):
    """
    Supprime les colonnes avec une variance inférieure à un seuil donné : elle sont quasi contantes"""

    # Traitement des variables numériques
    numerical_df = df.select_dtypes(include=[np.number, 'bool'])
    variances = numerical_df.var()
    low_variance_cols = variances[variances < threshold].index.tolist()

    # Traitement des variables catégorielles (en regardant le ratio du mode)
    categorical_df = df.select_dtypes(include=['object', 'category'])
    cat_low_variance = []

    for col in categorical_df.columns:
        # Calcul du ratio de la valeur la plus fréquente
        value_counts = categorical_df[col].value_counts(normalize=True)
        if len(value_counts) > 0 and value_counts.iloc[0] > 1 - threshold:
            cat_low_variance.append(col)

    # Combinaison des résultats
    all_low_variance = low_variance_cols + cat_low_variance

    keep = sub(df.columns.tolist(), all_low_variance)

    return all_low_variance, keep

drop , keep = remove_quasi_constant_features(X)
print(f"nombre de colonnes jugées quasi constantes : {len(drop)}")


In [None]:

def fast_feature_selection(x_data, y_data, n_features=20,sort=False):
    """
    Selection automatique des ``n_features`` meilleures features avec  XGBoost
    Args:
        X (pd.DataFrame): DataFrame contenant les caractéristiques
        y (pd.Series): Série contenant la cible
        n_features (int): Nombre de features/colonnes à sélectionner
        sort (bool): Si True, trie les features sélectionnées par importance décroissante
    Returns:
        list: Liste des noms des features sélectionnées
    """
    # Créer une copie de X pour éviter les modifications sur l'original
    X = x_data.copy()
    # Supprimer les colonnes avec une variance inférieure à 0.01
    drop, keep = remove_quasi_constant_features(X)

    X = X[keep]
    # Remplacer les valeurs infinies par NaN
    X = X.replace([np.inf, -np.inf], np.nan)

    # identification des colonnes numériques et catégorielles
    num_col = X.select_dtypes(include=[np.number,'float64','int64']).columns
    cat_col = X.select_dtypes(include=['object', 'category','bool']).columns

    # transformer tous les colonnes catégorielles en type str
    # pour éviter les types mixtes
    X[cat_col] = X[cat_col].astype(str)

    # Préprocessing light
    preprocessor = make_column_transformer(
        (SimpleImputer(strategy='median'), num_col),
        (Pipeline([
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('encoder', OneHotEncoder(handle_unknown='ignore'))
            ])              ,            cat_col ),
        remainder='drop',
        verbose_feature_names_out=False,
        n_jobs=-1
    )

    # Modèle unique mais robuste
    model = Pipeline([
        ('preprocessor', preprocessor),
        ('selector', SelectFromModel(
            XGBRegressor(
                tree_method='hist',
                n_estimators=50,
                max_depth=6),
            max_features=n_features
        )),
        ('estimator', XGBRegressor())
    ])

    model.fit(X, y_data)

    transformed_features_names = model.named_steps['preprocessor'].get_feature_names_out(input_features=keep)

    if sort:

        importances = model.named_steps['estimator'].feature_importances_

        feature_importance = list(zip(transformed_features_names, importances))

        feature_importance.sort(key=lambda x: x[1], reverse=True)

        selected_features = [name for name, _ in feature_importance[:n_features]]

    selected_indices = model.named_steps['selector'].get_support(indices=True)

    selected_features = [transformed_features_names[i] for i in selected_indices]

    return list(selected_features)


In [None]:
def combined_feature_selection(X_data, y, n_features=20):
    X = X_data.copy()
    drop, keep = remove_quasi_constant_features(X)
    X = X[keep]
    X = X.replace([np.inf, -np.inf], np.nan)

    num_col = X.select_dtypes(include=[np.number, 'float64','int64']).columns
    cat_col = X.select_dtypes(include=['object', 'category', 'bool']).columns

    X[cat_col] = X[cat_col].astype(str)

    num_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', RobustScaler())
    ])

    cat_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore'))
    ])

    preprocessor = make_column_transformer(
        (num_pipeline, num_col),
        (cat_pipeline, cat_col),
        remainder='drop',
        verbose_feature_names_out=False,
        n_jobs=-1
    )

    # Apply preprocessing
    X_transformed = preprocessor.fit_transform(X)
    feature_names = preprocessor.get_feature_names_out(input_features=keep)

    # Define models
    xgb = XGBRegressor(tree_method='hist',
                       n_estimators=50,
                       max_depth=6,
                       max_features=n_features,
                       random_state=RANDOM_STATE)
    lasso = LassoCV(cv=5)
    rf = RandomForestRegressor(n_estimators=50,
                               max_depth=6,
                               max_features=n_features,
                               random_state=RANDOM_STATE)

    # Fit models
    xgb.fit(X_transformed, y)
    lasso.fit(X_transformed, y)
    rf.fit(X_transformed, y)

    # Get importances or coefficients
    xgb_importance = xgb.feature_importances_
    lasso_coef = np.abs(lasso.coef_)
    rf_importance = rf.feature_importances_

    # Normalize
    def normalize(arr):
        return arr / np.max(arr) if np.max(arr) != 0 else arr

    # scores nomalisés
    scores_norm = [
        normalize(xgb_importance),
        normalize(lasso_coef) ,
        normalize(rf_importance)
    ]
    scores = np.vstack(scores_norm)
    scores = np.median(scores, axis=0)

    # Select top features
    top_indices = np.argsort(scores)[::-1][:n_features]
    selected_features = [feature_names[i] for i in top_indices]

    return selected_features



In [None]:

selected_features = fast_feature_selection(X, y, n_features=40)
with open("40_best_features.json" , 'w') as file:
    json.dump(selected_features , file)


In [None]:
for f in selected_features :
    print(f)

### 2.2.2 Selection plus fine pour un modèle donné

In [None]:

def compute_rmse(pipeline , X_sub, y,cv=None,
                 message="Début du calcul...",
                 error="une erreur s'est produite")->float:
    """
    Function to compute cross-validated RMSE
    """
    if cv is None:
        cv = KFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    if message is not None:
        print(message)
    try :
        pipeline.fit(X_sub, y)
        # Calculer le RMSE avec validation croisée
        scores = cross_val_score(pipeline, X_sub, y,
                                scoring='neg_root_mean_squared_error',
                                cv=cv,n_jobs=-1)
        return -scores.mean()
    except Exception as e:
        if error is not None:
            print(error)
            print(f"Erreur : {e}")
        # Si une erreur se produit, on retourne une valeur élevée pour forcer la suppression de la feature
        return float('inf')

def make_pipeline(model , X,features=None):

    preprocessor = get_preprocessor(X , features)
    return Pipeline([
        ('preprocessor', preprocessor),
        ('model', model)
    ])


def backward_stepwise(model, X, y,
                       initial_features:List[str]=None,
                       min_features:int=3,
                       rtol:float=0.005,
                       max_degrad:float=0.005,
                       cv=None,
                       logg:bool=True)->Tuple[List[str] , float]:
    """
    Backward stepwise feature selection algorithm.

    Parameters:
    - model: The predictive model (must implement fit and predict).
    - X: DataFrame of predictors.
    - y: Target variable.
    - initial_features: Initial subset of features to test. If None, uses all features.
    - min_features: Minimum number of features to keep.
    - max_degrad: Maximum allowed degradation in RMSE to consider a feature removable.
    - rtol: Relative tolerance for improvement in RMSE. If improvement < rtol, stops early.

    Returns:
    - best_features: List of selected features.
    - best_score: Best RMSE score achieved.
    """


    all_features = X.columns.tolist()
    if initial_features is None:
        current_features = all_features.copy()
    else:
        current_features = [f for f in initial_features if f in all_features]

    if min_features is None:
        min_features = max(5, int(0.1 * len(current_features)))

    if cv is None:
        cv = KFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)

    pipeline = make_pipeline(model, X ,current_features)

    # Compute initial RMSE
    best_score = compute_rmse(pipeline,
                              X[current_features], y,cv=cv , message="calcul du score initial..." if logg else None,
                              error="Erreur lors de l'évaluation initiale" if logg else None)

    if logg:
        print("début des suppressions...\nInitial RMSE: {best_score:.4f}")
    features_removed = []
    improve = True
    while improve and len(current_features) > min_features:
        worst_feature = None
        current_rmse = best_score
        # essai de suppression de chaque feature
        for feature in current_features:
            trial_features = [f for f in current_features if f != feature]
            trial_pipeline = make_pipeline(model, X,trial_features)
            # Compute RMSE without the feature
            trial_rmse = compute_rmse(trial_pipeline,
                                      X[trial_features], y,cv=cv,message=f"Test sans '{feature}'"if logg else None,error=f"Erreur en testant sans {feature} "if logg else None)

            relative_change = (best_score-trial_rmse) / best_score
            # If removing the feature improves RMSE beyond rtol
            if relative_change >= rtol and trial_rmse < current_rmse:
                    current_rmse = trial_rmse
                    worst_feature = feature
            elif relative_change < -max_degrad:
                if logg:
                    print(f" suppression de {feature} rejetée : dégradation de {-relative_change*100:.2f} %")

        # Remove the worst feature if found
        if worst_feature is not None:
            current_features.remove(worst_feature)
            features_removed.append(worst_feature)
            best_score = current_rmse
            improve = True
            if logg:
                print(f" '{worst_feature}' supprimée  , nouvelle RMSE: {best_score:.4f},  {len(current_features)} features restante")
        else:
            improve = False

    if logg:
        print(f"selction terminée {len(current_features)} Features conservées. RMSE final: {best_score:.4f}")
    return current_features, best_score



# 3. MODELISATION

### Separation X_train , y_train ,....

In [None]:
train_data , test_data = prepare_datasets(res_train_df, res_test_df,
                                          merge_list,
                                          base_id = id_train,
                                          cols_to_drop = cols_to_drop,
                                          verbose=True)

In [None]:
X = train_data.drop(columns=[target], axis=1,errors='ignore')
y = train_data[target]

#selection des 20 meilleures features
selected_features = fast_feature_selection(X, y, n_features=20)

In [None]:
X_train , X_test , y_train , y_test = train_test_split(safe_feature_selection(X,selected_features),y, test_size=0.2, random_state=RANDOM_STATE)

### Fonction d'optimisation des hyperparamètres et des features pour un moèle


In [None]:
def quick_gridsearch(model, param_grid, X, y, cv=3, scoring='neg_root_mean_squared_error', verbose=0):
    """
    Effectue une GridSearchCV rapide pour évaluer rapidement un modèle de régression.

    Returns:
    - best_estimator_: le modèle entraîné avec les meilleurs paramètres
    - best_params_: dictionnaire des meilleurs paramètres
    - best_score_: meilleur score trouvé (négatif car RMSE)
    - cv_results_: résultats détaillés
    """
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        scoring=scoring,
        cv=cv,
        n_jobs=-1,
        verbose=verbose,
        return_train_score=True
    )
    grid_search.fit(X, y)

    return grid_search.best_estimator_, grid_search.best_params_, grid_search.best_score_, grid_search.cv_results_


In [None]:
class TQDMCallback:
    """
    Callback to update tqdm progress bar during hyperparameter tuning.
    """
    def __init__(self, total):
        self.pbar = tqdm(total=total, desc="Optimisation des hyperparamètres", unit="itération")

    def __call__(self, study, trial):
        self.pbar.update(1)
        self.pbar.set_postfix({"Best RMSE": study.best_value})

#Fonction d'optimisation des hyperparamètres et des features pour un modèle
def optimise_model(model, params, X, y,
                   n_trials: int = 3,
                   model_name: str = '',
                   cv:int=3,
                   selected_feature_fn = None
                   ) -> Tuple[Dict, List[str], optuna.study.Study]:
    """
    Optimise les hyperparamètres et selectionne les features optimales pour le modèle

    Args
    ---
    model : (fit_Object)
      instance du modèle choisit , ici ce sera soit Lasso , ElesticNet ,  XGboost , LightGBM
    params :
      disctionnaire permettant de definir l'espace des hyperparamètres à optimiser
    X : (pd.DataFrame)
      dataframes des predicteurs
    y : (pd.DataFrame)
      variables cible
    n_trials : (int)
      nombre d'essais pour l'optimisation des hyperparamètres

    Returns
    -------
    Tuple[Dict, List[str], optuna.study.Study]:
      tuple contenant
    -best_params : (dict)
      dictionnaire des meilleurs hyperparamètres
    -best_features : (list)
      liste des features sélectionnées
    -study : (optuna.study.Study)
      instance de l'étude d'optimisation
    """
    if cv is None:
      cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=RANDOM_STATE )

    def objective(trial):
        # Définir les espaces de recherche pour chaque hyperparamètre
        param_grid = {}
        for name, conf in params.items():
            if conf['type'] == 'int':
                if isinstance(conf['range'], (list, tuple)) and len(conf['range']) == 2:
                    param_grid[name] = trial.suggest_int(name, conf['range'][0], conf['range'][1])
                else:
                    print(f"Erreur: conf['range'] n'est pas une liste ou un tuple valide pour {name} (type int)")
                    continue  # Passe à l'hyperparamètre suivant
            elif conf['type'] == 'float':
                if isinstance(conf['range'], (list, tuple)) and len(conf['range']) == 2:
                    param_grid[name] = trial.suggest_float(name, conf['range'][0], conf['range'][1])
                else:
                    print(f"Erreur: conf['range'] n'est pas une liste ou un tuple valide pour {name} (type float)")
                    continue  # Passe à l'hyperparamètre suivant
            elif conf['type'] == 'categorical':
                if isinstance(conf['range'], (list, tuple)):
                    param_grid[name] = trial.suggest_categorical(name, conf['range'])
                else:
                    print(f"Erreur: conf['range'] n'est pas une liste ou un tuple valide pour {name} (type categorical)")
                    continue  # Passe à l'hyperparamètre suivant
            elif conf['type'] == 'log':
                if isinstance(conf['range'], (list, tuple)) and len(conf['range']) == 2:
                    param_grid[name] = trial.suggest_float(name, conf['range'][0], conf['range'][1], log=True)
                else:
                    print(f"Erreur: conf['range'] n'est pas une liste ou un tuple valide pour {name} (type log)")
                    continue  # Passe à l'hyperparamètre suivant

        # Appliquer les paramètres au modèle
        model_trial = clone(model).set_params(**param_grid)

        try:
            # Selection des features
            if selected_feature_fn is not None:
                selected_features = selected_feature_fn(X, y)
            else:
              selected_features = backward_stepwise(model_trial, X, y, cv=cv, logg=False)[0]

            # sélection sécurisée des colonnes
            X_selected = safe_feature_selection(X, selected_features)

            # si aucune colonne valide , utiliser toutes le colonnes
            if X_selected.shape[1] == 0:
                X_selected = X.copy()
                selected_features = X.columns.tolist()

            prepro = get_preprocessor(X_selected)
            pipeline = Pipeline([
                ('preprocessor', prepro),
                ('model', model_trial)
            ])

            trial.set_user_attr('selected_features', selected_features)

            # Effectuer la validation croisée
            scores = cross_val_score(pipeline,
                                      X_selected, y,
                                      cv=cv, scoring='neg_root_mean_squared_error',
                                      n_jobs=-1)

            score = -np.mean(scores)
            trial.report(score, step=0)
            if trial.should_prune():
                raise TrialPruned()
            print(f"[Trial {trial.number}] RMSE: {score:.4f} | Params: {param_grid}")
            return score

        except Exception as e:
            print(f"Trial {trial.number} Erreur : {e}")
            # on retourne une valeur élevée pour forcer la suppression de la feature
            trial.set_user_attr('error', str(e))
            trial.set_user_attr('selected_features', X.columns.tolist())
            return float('inf')

    study = optuna.create_study(direction='minimize',
                                  pruner=MedianPruner())
    study.optimize(objective,
                   n_trials=n_trials,
                   n_jobs=-1,
                   callbacks=[TQDMCallback(n_trials)])

    joblib.dump(study, f'{model_name}_study.pkl')

    best_params = study.best_params
    best_features = study.best_trial.user_attrs['selected_features']

    return best_params, best_features, study



def optimise_then_select(model, param_space, X, y,
                           n_trials=10, cv=None,
                           min_features=5,
                           rtol=0.01, logg=False,
                           model_name=None):
  best_params , _ , _ = optimise_model(model = model,
                                       params=  param_space ,
                                       X=X , y=y ,
                                       selected_feature_fn=None,
                                       n_trials=n_trials ,
                                       model_name=model_name ,
                                       cv=cv,
                                       )
  model = model.set_params(**best_params)

  best_features = backward_stepwise(model=model,
                                    X=X,
                                    y=y,
                                    min_features=min_features,
                                    rtol=rtol,
                                    logg=False)[0]
  return model , best_features , best_params

## Initialisation des modèles et recherche d'hyperparamètres


In [None]:
# 1. Définir les modèles et leurs espaces de recherche
models_grids = {
    "Lasso": (             # modèle imposé
        Lasso(random_state=RANDOM_STATE),
        {
            'alpha': {'type': 'log', 'range': (1e-4, 10)},
            'max_iter': {'type': 'int', 'range': (1000, 5000)},
            'fit_intercept': {'type': 'categorical', 'range': [True, False]}
        }
    ),
    "ElasticNet": (       #modèle au choix
        ElasticNet(random_state=RANDOM_STATE),
        {
            'alpha': {'type': 'log', 'range': (1e-4, 10)},
            'l1_ratio': {'type': 'float', 'range': (0.1, 1)},
            'max_iter': {'type': 'int', 'range': (1000, 5000)},
            'fit_intercept': {'type': 'categorical', 'range': [True, False]}
        }
    ),
    "XGBoost": (      # modèle au choix 2
        XGBRegressor(random_state=RANDOM_STATE),
        {
            'n_estimators': {'type': 'int', 'range': (100, 1000)},
            'max_depth': {'type': 'int', 'range': (3, 10)},
            'learning_rate': {'type': 'float', 'range': (0.01, 0.3)},
            'subsample': {'type': 'float', 'range': (0.6, 1.0)},
            'colsample_bytree': {'type': 'float', 'range': (0.6, 1.0)},
            'gamma': {'type': 'float', 'range': (0, 5)},
            'reg_alpha': {'type': 'float', 'range': (0, 1)},
            'reg_lambda': {'type': 'float', 'range': (0, 1)}
        }
    )
}

# 2. Boucle d'optimisation + sélection des best features
models_info = {}

for name, (model, space) in models_grids.items():
    print(f"\n🔍 Traitement du modèle : {name}")

    final_model, selected_features, best_params = optimise_then_select(
        model=model,
        param_space=space,
        X=X_train,
        y=y_train,
        n_trials=200,
        min_features=5,
        rtol=0.01,
        logg=False,
        model_name = name
    )

    models_info[name] = {
        "model": final_model,
        "features": selected_features,
        "params": best_params
    }


## Comparaison des modèles et selection du meilleur modèle

###   Comparaison

In [None]:
def compare_grid_models(models_grids: Dict[str, Tuple], X_train: pd.DataFrame, y_train: pd.Series, cv: int = 3) -> Tuple[pd.DataFrame, Dict[str, Pipeline]]:
    """
    Compare plusieurs modèles de régression à l'aide de GridSearchCV rapide.

    Args
    ----
    - models_grids : dict où chaque clé est le nom du modèle et la valeur est un tuple (modèle sklearn, grille d'hyperparamètres)
    - X_train : données d'entraînement
    - cv : nombre de folds pour la validation croisée

    Returns:
    - results_df: DataFrame des scores RMSE moyens
    - best_models: dictionnaire des pipelines optimaux entraînés
    """
    results = []
    best_models = {}

    for name, (model, grid) in models_grids.items():
        print(f"\n Recherche GridSearch pour {name}...")

        # Prétraitement et pipeline
        preprocessor = get_preprocessor(X_train)
        pipeline = Pipeline([
            ("preprocessor", preprocessor),
            ("model", model)
        ])

        # Adapter la grille avec le préfixe "model__"
        param_grid = {'model__' + k: v for k, v in grid.items()}

        best_pipeline, best_params, best_score, _ = quick_gridsearch(
            pipeline,
            param_grid=param_grid,
            X=X_train,
            y=y_train,
            cv=cv,
            scoring='neg_root_mean_squared_error',
        )

        results.append({
            "Model": name,
            "Best RMSE (CV)": -best_score,
            "Best Params": best_params
        })
        best_models[name] = best_pipeline

    results_df = pd.DataFrame(results).sort_values(by="Best RMSE (CV)")
    return results_df, best_models



results_df, pipelines = compare_grid_models(models_grids, X_train, y_train)
print(results_df)



### Selection automatique du meilleur Modèle

In [None]:
# selection du meilleur modèle

best_model_name = results_df.iloc[0]['Model'] # nom du meilleur modèle

print(f"le meilleur modèle est {best_model_name}")
best_pipeline = best_models[best_model_name] #extraction de la meilleur  pipeline
best_features = models_info[best_model_name]['features'] #extraction des features optimal
best_model = models_info[best_model_name]['model'] #extraction du meilleur modèle
best_params = models_info[best_model_name]['params'] #extraction des meilleurs paramètres

# entrainement du meilleur modèle
final_best_model = best_pipeline.fit(X_train[best_features], y_train)
final_best_features = best_features

# mise à jour de models_info:
for name  in models_info:
  models_info[name]["is_best"] = (name == best_model_name)
for name , pipe in pipelines:
  models_info[name]["pipeline"] = pipe

##  Quelques Analyses graphiques des modèles

In [None]:
def analyze_model_predictions(y_true, y_pred, model_name=""):
    """
    Génère des graphiques d'analyse des performances d'un modèle de régression :
    - Résidus vs Prédictions
    - Histogramme des résidus
    - QQ-plot
    - Vraies vs Prédictions
    - Affichage des métriques

    Args:
    - y_true: valeurs réelles
    - y_pred: prédictions du modèle
    - model_name: nom du modèle (affiché sur les titres)
    """
    residuals = y_true - y_pred
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)

    print(f"\n📊 Évaluation du {model_name}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE : {mae:.4f}")
    print(f"R²  : {r2:.4f}")

    plt.figure(figsize=(15, 10))

    # Résidus vs Prédictions
    plt.subplot(2, 2, 1)
    sns.scatterplot(x=y_pred, y=residuals)
    plt.axhline(0, color='red', linestyle='--')
    plt.xlabel("Prédictions")
    plt.ylabel("Résidus")
    plt.title("Résidus vs Prédictions")

    # Histogramme des résidus
    plt.subplot(2, 2, 2)
    sns.histplot(residuals, kde=True)
    plt.title("Distribution des résidus")

    # QQ-plot
    plt.subplot(2, 2, 3)
    stats.probplot(residuals, dist="norm", plot=plt)
    plt.title("QQ-Plot des résidus")

    # Vraies vs Prédictions
    plt.subplot(2, 2, 4)
    sns.scatterplot(x=y_true, y=y_pred)
    plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--')
    plt.xlabel("Vraies valeurs")
    plt.ylabel("Prédictions")
    plt.title("Vraies vs Prédictions")

    plt.suptitle(f"Analyse du {model_name}", fontsize=16)
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.savefig(f"graphiques d'analyse du modèle {model_name}")
    plt.show()

#-------------------------------------------------------------------------------
def plot_feature_importance(model, feature_names, model_name=""):
    """
    Trace l'importance des features pour un modèle donné.
    """
    if hasattr(model, "coef_"):
        importances = model.coef_
    elif hasattr(model, "feature_importances_"):
        importances = model.feature_importances_
    else:
        print(f"Aucune importance de feature disponible pour {model_name}")
        return

    sorted_idx = np.argsort(importances)[::-1]
    plt.figure(figsize=(10, 6))
    sns.barplot(x=np.abs(importances)[sorted_idx], y=np.array(feature_names)[sorted_idx])
    plt.title(f"Importance des variables - {model_name}")
    plt.xlabel("Importance absolue")
    plt.ylabel("Variables")
    plt.tight_layout()
    plt.savefig(f"Figures/importances_{model_name}.png")
    plt.close()

#----------------------------------------------------------------
def analyze_all_models(models_info , X_test, y_test):
    """
    Applique l'analyse des prédictions à tous les modèles comparés.
    """
    for model_name, info in models_info.items():
        features = info.get("features" , X_test.columns.tolist())
        pipeline = info.get("pipeline")
        x_val = safe_feature_selection(X_test, features) if features else X_test

        y_pred = pipeline.predict(X_test[features])
        analyze_model_predictions(y_test, y_pred, model_name)

#----------------------------------------------------------------------

os.makedirs("Figures",exist_ok=True)

# tracer les graphiques pour les analyses des trois modèles
analyze_all_models(models_info , X_test, y_test)


# graphique de l'importance des features pour le meilleur modèle
plot_feature_importance(final_best_model , final_best_features, model_name=best_model_name)

# 4. Soumission sur Kaggle

In [None]:
# Soumission sur Kaggle

# copier le fichier de test déja merge
data_test = test_data.copy()

# selectionner les features optimaux pour le best_model
data_test_submit = data_test[final_best_features]

# Calcul des prédictions sur le dataset de test final
y_pred_test = final_best_model.predict(data_test_submit)

# Ajout des prédictions au dataframe
data_test['Prediction'] = y_pred_test

submission = data_test[['CodeINSEE', 'Prediction']]

# sauvegarder la liste des features
with open(f"final_best_features_with_{best_model_name}.json" , 'w') as file:
    json.dump(final_best_features , file)

# sauvegarder les prédictions
submission.to_csv('results_test_predicted.csv', index=False)
print(f"\nFichier de soumission 'results_test_predicted.csv' généré avec succès \nForme {submission.shape}  ")

submission.head(5)