In [62]:
# =========================================================
# Spark MLlib - Prédiction de l'espérance de vie
# 
# =========================================================


from pyspark.sql.functions import col, when, count, isnan
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline


StatementMeta(sparkkemeans, 1, 41, Finished, Available, Finished)

In [63]:
 # Chargement du dataset depuis ADLS 
df = spark.read.csv(
    "abfss://esperance@datalakekmeans.dfs.core.windows.net/mahamathassan86/esperance_vie_spark_regression/refs/heads/main/esperance_vie.csv",
    header=True,
    inferSchema=True
)


StatementMeta(sparkkemeans, 1, 42, Finished, Available, Finished)

In [64]:

df.printSchema()


StatementMeta(sparkkemeans, 1, 43, Finished, Available, Finished)

root
 |-- Pays: string (nullable = true)
 |-- Année: integer (nullable = true)
 |-- Statut: string (nullable = true)
 |-- Espérance de vie: double (nullable = true)
 |-- Mortalité des adultes: integer (nullable = true)
 |-- Décès de nourrissons: integer (nullable = true)
 |-- Alcool: double (nullable = true)
 |-- Dépenses en pourcentage: double (nullable = true)
 |-- Hépatite B: integer (nullable = true)
 |-- Rougeole: integer (nullable = true)
 |-- IMC: double (nullable = true)
 |-- Polio: integer (nullable = true)
 |-- Dépenses totales: double (nullable = true)
 |-- Diphtérie: integer (nullable = true)
 |-- VIH: double (nullable = true)
 |-- Population: double (nullable = true)
 |-- thinness 5-9 years: double (nullable = true)
 |-- Composition des revenus des ressources: double (nullable = true)
 |-- Scolarisation: double (nullable = true)



In [65]:
df.count()

StatementMeta(sparkkemeans, 1, 44, Finished, Available, Finished)

2938

In [66]:
df.show(5)

StatementMeta(sparkkemeans, 1, 45, Finished, Available, Finished)

+-----------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+-----+----------------+---------+---+-----------+------------------+--------------------------------------+-------------+
|       Pays|Année|    Statut|Espérance de vie|Mortalité des adultes|Décès de nourrissons|Alcool|Dépenses en pourcentage|Hépatite B|Rougeole| IMC|Polio|Dépenses totales|Diphtérie|VIH| Population|thinness 5-9 years|Composition des revenus des ressources|Scolarisation|
+-----------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+-----+----------------+---------+---+-----------+------------------+--------------------------------------+-------------+
|Afghanistan| 2015|Developing|            65.0|                  263|                  62|  0.01|            71.27962362|        65|    1154|19.1|    6|            8.16|       65|0.1|3.3736494E7| 

In [67]:
#  Sélection des colonnes utiles
target = "Espérance de vie"
categorical_col = "Statut"
numeric_cols = [
    "Mortalité des adultes", "Décès de nourrissons", "Alcool",
    "Dépenses en pourcentage", "Hépatite B", "Rougeole",
    "IMC", "Polio", "Dépenses totales", "Diphtérie",
    "VIH", "Population", "thinness 5-9 years",
    "Composition des revenus des ressources", "Scolarisation"
]

StatementMeta(sparkkemeans, 1, 46, Finished, Available, Finished)

In [68]:


# Vérification des valeurs manquantes dans la cible
df.select([
    count(when(col("Espérance de vie").isNull() | isnan(col("Espérance de vie")), "Espérance de vie")).alias("Valeurs_manquantes_Espérance_de_vie"),
    count("*").alias("Total_lignes")
]).show()


StatementMeta(sparkkemeans, 1, 47, Finished, Available, Finished)

+-----------------------------------+------------+
|Valeurs_manquantes_Espérance_de_vie|Total_lignes|
+-----------------------------------+------------+
|                                 10|        2938|
+-----------------------------------+------------+



In [69]:
# Supprimer les lignes avec valeurs manquantes
df = df.dropna(subset=["Espérance de vie"] + numeric_cols)

StatementMeta(sparkkemeans, 1, 48, Finished, Available, Finished)

In [70]:


# --- Vérifier les valeurs manquantes AVANT imputation ---
missing_before = df.select([
    count(when(isnan(c) | col(c).isNull(), c)).alias(c)
    for c in numeric_cols
])

# Transformer en format lisible : (colonne, nb_valeurs_manquantes)
missing_before_long = missing_before.toPandas().T.reset_index()
missing_before_long.columns = ['Colonne', 'Valeurs manquantes']

print("\n Valeurs manquantes AVANT imputation :")
print(missing_before_long.to_string(index=False))


StatementMeta(sparkkemeans, 1, 49, Finished, Available, Finished)


 Valeurs manquantes AVANT imputation :
                               Colonne  Valeurs manquantes
                 Mortalité des adultes                   0
                  Décès de nourrissons                   0
                                Alcool                   0
               Dépenses en pourcentage                   0
                            Hépatite B                   0
                              Rougeole                   0
                                   IMC                   0
                                 Polio                   0
                      Dépenses totales                   0
                             Diphtérie                   0
                                   VIH                   0
                            Population                   0
                    thinness 5-9 years                   0
Composition des revenus des ressources                   0
                         Scolarisation                   0


In [71]:
# 4️⃣ Imputation des valeurs manquantes (moyenne)
imputer = Imputer(inputCols=numeric_cols, outputCols=[c + "_imp" for c in numeric_cols]).setStrategy("median")
df = imputer.fit(df).transform(df)


StatementMeta(sparkkemeans, 1, 50, Finished, Available, Finished)

In [72]:
# --- Après imputation ---
missing_after = df.select([
    count(when(isnan(c) | col(c).isNull(), c)).alias(c + "_imp")
    for c in [c + "_imp" for c in numeric_cols]
])

missing_after_long = missing_after.toPandas().T.reset_index()
missing_after_long.columns = ['Colonne', 'Valeurs manquantes']

print("\n Valeurs manquantes APRÈS imputation :")
print(missing_after_long.to_string(index=False))

StatementMeta(sparkkemeans, 1, 51, Finished, Available, Finished)


 Valeurs manquantes APRÈS imputation :
                                       Colonne  Valeurs manquantes
                 Mortalité des adultes_imp_imp                   0
                  Décès de nourrissons_imp_imp                   0
                                Alcool_imp_imp                   0
               Dépenses en pourcentage_imp_imp                   0
                            Hépatite B_imp_imp                   0
                              Rougeole_imp_imp                   0
                                   IMC_imp_imp                   0
                                 Polio_imp_imp                   0
                      Dépenses totales_imp_imp                   0
                             Diphtérie_imp_imp                   0
                                   VIH_imp_imp                   0
                            Population_imp_imp                   0
                    thinness 5-9 years_imp_imp                   0
Composition des revenu

In [73]:
# Encodage du statut
indexer = StringIndexer(inputCol=categorical_col, outputCol="Statut_indexed")

StatementMeta(sparkkemeans, 1, 52, Finished, Available, Finished)

In [74]:
# Assembler toutes les features
features_cols = [c + "_imp" for c in numeric_cols] + ["Statut_indexed"]
assembler = VectorAssembler(inputCols=features_cols, outputCol="features")

StatementMeta(sparkkemeans, 1, 53, Finished, Available, Finished)

In [75]:
# Standardisation
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

StatementMeta(sparkkemeans, 1, 54, Finished, Available, Finished)

In [76]:
# Split Train/Test
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

StatementMeta(sparkkemeans, 1, 55, Finished, Available, Finished)

In [77]:
# Définition des modèles
lr = LinearRegression(featuresCol="scaled_features", labelCol=target)
rf = RandomForestRegressor(featuresCol="scaled_features", labelCol=target, seed=42)
gbt = GBTRegressor(featuresCol="scaled_features", labelCol=target, seed=42)

StatementMeta(sparkkemeans, 1, 56, Finished, Available, Finished)

In [78]:
# Pipelines
pipeline_lr = Pipeline(stages=[indexer, assembler, scaler, lr])
pipeline_rf = Pipeline(stages=[indexer, assembler, scaler, rf])
pipeline_gbt = Pipeline(stages=[indexer, assembler, scaler, gbt])

StatementMeta(sparkkemeans, 1, 57, Finished, Available, Finished)

In [79]:
# Grilles d’hyperparamètres
param_grid_lr = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.01, 0.1, 0.5])
                 .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                 .build())

param_grid_rf = (ParamGridBuilder()
                 .addGrid(rf.numTrees, [50, 100])
                 .addGrid(rf.maxDepth, [5, 10])
                 .build())

param_grid_gbt = (ParamGridBuilder()
                  .addGrid(gbt.maxDepth, [5, 10])
                  .addGrid(gbt.maxIter, [50, 100])
                  .build())

StatementMeta(sparkkemeans, 1, 58, Finished, Available, Finished)

In [80]:
# Évaluateur
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

# Cross-validation
cv_lr = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=param_grid_lr, evaluator=evaluator, numFolds=3)
cv_rf = CrossValidator(estimator=pipeline_rf, estimatorParamMaps=param_grid_rf, evaluator=evaluator, numFolds=3)
cv_gbt = CrossValidator(estimator=pipeline_gbt, estimatorParamMaps=param_grid_gbt, evaluator=evaluator, numFolds=3)


StatementMeta(sparkkemeans, 1, 59, Finished, Available, Finished)

In [81]:
# Entraînement
print(" Entraînement des modèles en cours...")
models = {
    "LinearRegression": cv_lr.fit(train_df),
    "RandomForest": cv_rf.fit(train_df),
    "GBTRegressor": cv_gbt.fit(train_df)
}
print(" Entraînement terminé.")

StatementMeta(sparkkemeans, 1, 60, Finished, Available, Finished)

 Entraînement des modèles en cours...
 Entraînement terminé.


In [82]:
#  Évaluation et comparaison
results = []
for name, model in models.items():
    preds = model.transform(test_df)
    r2 = evaluator.evaluate(preds)
    rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse").evaluate(preds)
    mae = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="mae").evaluate(preds)
    results.append((name, r2, rmse, mae))
    print(f"\n {name} → R²={r2:.4f}, RMSE={rmse:.4f}, MAE={mae:.4f}")

#  Résumé final
print("\n====================== Résumé des performances ======================")
print("| Modèle           |    R²     |   RMSE   |   MAE    |")
print("--------------------------------------------------------------------")
for name, r2, rmse, mae in results:
    print(f"| {name:15} | {r2:8.4f} | {rmse:8.4f} | {mae:8.4f} |")
print("--------------------------------------------------------------------")

StatementMeta(sparkkemeans, 1, 61, Finished, Available, Finished)


 LinearRegression → R²=0.8106, RMSE=3.9118, MAE=2.9745

 RandomForest → R²=0.9455, RMSE=2.0976, MAE=1.3380

 GBTRegressor → R²=0.9341, RMSE=2.3078, MAE=1.4803

| Modèle           |    R²     |   RMSE   |   MAE    |
--------------------------------------------------------------------
| LinearRegression |   0.8106 |   3.9118 |   2.9745 |
| RandomForest    |   0.9455 |   2.0976 |   1.3380 |
| GBTRegressor    |   0.9341 |   2.3078 |   1.4803 |
--------------------------------------------------------------------
