In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GraviteAccidentFrance") \
    .getOrCreate()

In [17]:
dfUsagers = spark.read.csv("usagers-2023.csv", header=True, inferSchema=True, sep=';')
dfVehicules = spark.read.csv("vehicules-2023.csv", header=True, inferSchema=True, sep=';')
dfLieux = spark.read.csv("lieux-2023.csv", header=True, inferSchema=True, sep=';')
dfCaracteristiques = spark.read.csv("caract-2023.csv", header=True, inferSchema=True, sep=';')


In [18]:
dfUsagers.printSchema()
dfUsagers.show(3)

dfVehicules.printSchema()
dfCaracteristiques.printSchema()
dfLieux.printSchema()


root
 |-- Num_Acc: long (nullable = true)
 |-- id_usager: string (nullable = true)
 |-- id_vehicule: string (nullable = true)
 |-- num_veh: string (nullable = true)
 |-- place: double (nullable = true)
 |-- catu: integer (nullable = true)
 |-- grav: double (nullable = true)
 |-- sexe: double (nullable = true)
 |-- an_nais: integer (nullable = true)
 |-- trajet: double (nullable = true)
 |-- secu1: double (nullable = true)
 |-- secu2: double (nullable = true)
 |-- secu3: double (nullable = true)
 |-- locp: double (nullable = true)
 |-- actp: string (nullable = true)
 |-- etatp: double (nullable = true)

+------------+-----------+-----------+-------+-----+----+----+----+-------+------+-----+-----+-----+----+----+-----+
|     Num_Acc|  id_usager|id_vehicule|num_veh|place|catu|grav|sexe|an_nais|trajet|secu1|secu2|secu3|locp|actp|etatp|
+------------+-----------+-----------+-------+-----+----+----+----+-------+------+-----+-----+-----+----+----+-----+
|202300000001|203 851 184|155 680 557| 

In [19]:
# 1. Jointure usagers ↔ véhicules
df_uv = dfUsagers.join(dfVehicules, on=["Num_Acc", "id_vehicule"], how="left")

# 2. Ajout des caractéristiques d'accident
df_uvc = df_uv.join(dfCaracteristiques, on="Num_Acc", how="left")

# 3. Ajout des informations du lieu
df_final = df_uvc.join(dfLieux, on="Num_Acc", how="left")

# Affichage pour vérification
df_final.printSchema()
df_final.show(3)


root
 |-- Num_Acc: long (nullable = true)
 |-- id_vehicule: string (nullable = true)
 |-- id_usager: string (nullable = true)
 |-- num_veh: string (nullable = true)
 |-- place: double (nullable = true)
 |-- catu: integer (nullable = true)
 |-- grav: double (nullable = true)
 |-- sexe: double (nullable = true)
 |-- an_nais: integer (nullable = true)
 |-- trajet: double (nullable = true)
 |-- secu1: double (nullable = true)
 |-- secu2: double (nullable = true)
 |-- secu3: double (nullable = true)
 |-- locp: double (nullable = true)
 |-- actp: string (nullable = true)
 |-- etatp: double (nullable = true)
 |-- num_veh: string (nullable = true)
 |-- senc: double (nullable = true)
 |-- catv: double (nullable = true)
 |-- obs: double (nullable = true)
 |-- obsm: double (nullable = true)
 |-- choc: double (nullable = true)
 |-- manv: double (nullable = true)
 |-- motor: double (nullable = true)
 |-- occutc: integer (nullable = true)
 |-- jour: integer (nullable = true)
 |-- mois: integer (null

In [20]:
from pyspark.sql.functions import col, when

# Supprimer doublon de colonnes
df_clean = df_final.drop("num_veh")  # une des deux versions

# Grave : Tué (2) ou hospitalisé (3)
df_clean = df_clean.withColumn(
    "target", when(col("grav").isin(2.0, 3.0, 4.0), 1).otherwise(0)
)


# Optionnel : filtrer les lignes où certaines valeurs clés sont manquantes ou invalides
df_clean = df_clean.filter(
    (col("grav").isNotNull()) & (col("grav") != -1) &
    (col("sexe").isNotNull()) & (col("sexe") != -1) &
    (col("catv").isNotNull()) &
    (col("atm").isNotNull()) &
    (col("lum").isNotNull())
)


In [21]:
from pyspark.sql.functions import year, lit

# Création de la colonne age (en supposant les données de 2023)
df_clean = df_clean.withColumn("age", lit(2023) - col("an_nais"))


In [7]:
# Vérifions la distribution des classes
df_clean.groupBy("target").count().show()

+------+-----+
|target|count|
+------+-----+
|     1|92665|
|     0|67739|
+------+-----+



In [22]:
numeric_cols = [
    "age", "place", "catv", "atm", "lum", "circ",
    "choc", "manv", "motor", "vma"
]


In [23]:
categorical_cols = [
    "sexe", "catu", "trajet", "secu1", "agg", "dep"
]


In [10]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Indexeurs
indexers = [StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="keep")
            for col in categorical_cols]

# Colonnes finales à assembler
final_features = [col + "_indexed" for col in categorical_cols] + numeric_cols

# Assembler toutes les features dans une seule colonne "features"
assembler = VectorAssembler(inputCols=final_features, outputCol="features")

# Pipeline encodage + assemblage
pipeline = Pipeline(stages=indexers + [assembler])

# Appliquer le pipeline
df_model = pipeline.fit(df_clean).transform(df_clean)

# Vérif
df_model.select("features", "target").show(3, truncate=False)


+--------------------------------------------------------------------+------+
|features                                                            |target|
+--------------------------------------------------------------------+------+
|[0.0,0.0,0.0,1.0,0.0,0.0,45.0,1.0,30.0,2.0,1.0,1.0,5.0,1.0,1.0,30.0]|1     |
|[0.0,0.0,0.0,1.0,0.0,0.0,45.0,1.0,30.0,2.0,1.0,1.0,5.0,1.0,1.0,30.0]|1     |
|[1.0,0.0,4.0,0.0,0.0,4.0,26.0,1.0,7.0,3.0,5.0,2.0,1.0,1.0,1.0,50.0] |0     |
+--------------------------------------------------------------------+------+
only showing top 3 rows



In [11]:
# df_model_clean = df_model.dropna(subset=["age"])

In [None]:
# from pyspark.sql.functions import col, sum

# # Combiner toutes les colonnes utilisées dans le VectorAssembler
# all_features = [col + "_indexed" for col in categorical_cols] + numeric_cols

# # Calculer le nombre de NULLs par colonne
# null_counts = df_model_clean.select([
#     sum(col(c).isNull().cast("int")).alias(c)
#     for c in all_features
# ])

# # Afficher les résultats
# null_counts.show(truncate=False)


In [None]:
# from pyspark.ml.classification import GBTClassifier
# from pyspark.ml.evaluation import BinaryClassificationEvaluator
# from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# # Split train/test
# train_data, test_data = df_model_clean.randomSplit([0.8, 0.2], seed=42)

# # Modèle GBT
# gbt = GBTClassifier(labelCol="target", featuresCol="features", maxIter=20)

# # Entraînement
# model = gbt.fit(train_data)

# # Prédictions
# predictions = model.transform(test_data)

# # Évaluation
# evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
# auc = evaluator.evaluate(predictions)

# print("AUC (GBT Classifier) :", round(auc, 4))


In [24]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

categorical_cols = ["sexe", "catu", "trajet", "secu1", "agg", "dep"]
numeric_cols = ["age", "place", "catv", "atm", "lum", "circ", "choc", "manv", "motor", "vma"]

# Encodage avec protection contre valeurs inconnues
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="keep")
    for col in categorical_cols
]

# VectorAssembler
assembler = VectorAssembler(
    inputCols=[col + "_indexed" for col in categorical_cols] + numeric_cols,
    outputCol="features",
    handleInvalid="skip"  # encore plus safe
)

# Pipeline
pipeline = Pipeline(stages=indexers + [assembler])
df_model = pipeline.fit(df_clean).transform(df_clean)


In [25]:
df_model_clean = df_model.dropna(subset=["features"])

In [42]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

train_data, test_data = df_model_clean.randomSplit([0.8, 0.2], seed=42)

gbt = GBTClassifier(
    labelCol="target",
    featuresCol="features",
    maxIter=20,
    maxBins=256  # très important pour éviter l'erreur liée à dep
)
gbt_model = gbt.fit(train_data)

predictions = gbt_model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

print("✅ AUC (GBT Classifier) :", round(auc, 4))

✅ AUC (GBT Classifier) : 0.847


In [43]:
from pyspark.sql.functions import col

# Arrondir la prédiction à 0 ou 1
predictions_rounded = predictions.withColumn("prediction_bin", col("prediction").cast("int"))

# Matrice de confusion
confusion_matrix = predictions_rounded.groupBy("target", "prediction_bin").count().orderBy("target", "prediction_bin")
confusion_matrix.show()


+------+--------------+-----+
|target|prediction_bin|count|
+------+--------------+-----+
|     0|             0| 9956|
|     0|             1| 3572|
|     1|             0| 4047|
|     1|             1|14417|
+------+--------------+-----+



In [29]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Split des données
train_data, test_data = df_model_clean.randomSplit([0.8, 0.2], seed=42)

# Modèle Random Forest
rf = RandomForestClassifier(
    labelCol="target",
    featuresCol="features",
    numTrees=100,         # tu peux ajuster
    maxBins=256,
    maxDepth=5,           # profondeur max de l’arbre
    seed=42
)

# Entraînement
rf_model = rf.fit(train_data)

# Prédiction
rf_predictions = rf_model.transform(test_data)

# Évaluation AUC
evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
auc_rf = evaluator.evaluate(rf_predictions)

print("✅ AUC (Random Forest) :", round(auc_rf, 4))


✅ AUC (Random Forest) : 0.8173


In [31]:
from pyspark.sql.functions import col

# Créer une colonne 'prediction_bin' au cas où ce n’est pas encore un entier
predictions = predictions.withColumn("prediction_bin", col("prediction").cast("int"))

# Matrice de confusion
confusion_matrix = predictions.groupBy("target", "prediction_bin").count().orderBy("target", "prediction_bin")
confusion_matrix.show()


+------+--------------+-----+
|target|prediction_bin|count|
+------+--------------+-----+
|     0|             0| 9956|
|     0|             1| 3572|
|     1|             0| 4047|
|     1|             1|14417|
+------+--------------+-----+



In [32]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, when

# Étape 1 : Séparation des données
train_data, test_data = df_model_clean.randomSplit([0.8, 0.2], seed=42)

# Étape 2 : Modèle
lr = LogisticRegression(
    featuresCol="features",
    labelCol="target",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.0  # 0 = L2 régularisation (Ridge)
)

# Étape 3 : Entraînement
lr_model = lr.fit(train_data)

# Étape 4 : Prédiction
lr_predictions = lr_model.transform(test_data)

# Étape 5 : AUC
evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
auc_lr = evaluator.evaluate(lr_predictions)

print("✅ AUC (Logistic Regression) :", round(auc_lr, 4))

# Étape 6 : Matrice de confusion
lr_predictions = lr_predictions.withColumn("prediction_bin", col("prediction").cast("int"))
lr_predictions.groupBy("target", "prediction_bin").count().show()


✅ AUC (Logistic Regression) : 0.7659
+------+--------------+-----+
|target|prediction_bin|count|
+------+--------------+-----+
|     1|             0| 4649|
|     1|             1|13815|
|     0|             0| 8657|
|     0|             1| 4871|
+------+--------------+-----+



In [44]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

def save_best_model(test_data, export_path="models/best_model"):
    """
    Sauvegarde le meilleur modèle (selon AUC) parmi gbt_model, rf_model et lr_model.

    Args:
        test_data (DataFrame): le DataFrame de test.
        export_path (str): chemin de sauvegarde du meilleur modèle.
    """
    evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")

    models = [
        ("GBT Classifier", gbt_model),
        ("Random Forest", rf_model),
        ("Logistic Regression", lr_model)
    ]

    best_auc = 0.0
    best_model = None
    best_name = ""

    for name, model in models:
        predictions = model.transform(test_data)
        auc = evaluator.evaluate(predictions)
        print(f"AUC ({name}) : {round(auc, 4)}")

        if auc > best_auc:
            best_auc = auc
            best_model = model
            best_name = name

    if best_model:
        print(f"\n✅ Meilleur modèle : {best_name} avec AUC = {round(best_auc, 4)}")
        best_model.write().overwrite().save(export_path)
        print(f"💾 Modèle sauvegardé dans : {export_path}")
    else:
        print("❌ Aucun modèle sauvegardé.")


In [45]:
save_best_model(test_data, "models/best_model")


AUC (GBT Classifier) : 0.847
AUC (Random Forest) : 0.8173
AUC (Logistic Regression) : 0.7659

✅ Meilleur modèle : GBT Classifier avec AUC = 0.847
💾 Modèle sauvegardé dans : models/best_model


In [33]:
# from pyspark.sql.functions import col, when
# from pyspark.ml.classification import RandomForestClassifier
# from pyspark.ml.evaluation import BinaryClassificationEvaluator

# # --- Étape 1 : Calculer les poids de classe ---
# n_total = df_model_clean.count()
# n_pos = df_model_clean.filter(col("target") == 1).count()
# n_neg = df_model_clean.filter(col("target") == 0).count()

# # Ratio pour renforcer les exemples positifs (graves)
# balancing_ratio = n_neg / n_pos

# # Ajouter la colonne de poids
# df_weighted = df_model_clean.withColumn(
#     "classWeightCol",
#     when(col("target") == 1, balancing_ratio).otherwise(1.0)
# )

# # --- Étape 2 : Split des données ---
# train_data, test_data = df_weighted.randomSplit([0.8, 0.2], seed=42)

# # --- Étape 3 : Définir et entraîner le modèle Random Forest ---
# rf = RandomForestClassifier(
#     labelCol="target",
#     featuresCol="features",
#     weightCol="classWeightCol",
#     numTrees=100,
#     maxBins=256
# )

# rf_model = rf.fit(train_data)

# # --- Étape 4 : Prédictions et évaluation ---
# predictions = rf_model.transform(test_data)

# evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
# auc = evaluator.evaluate(predictions)

# print("🌲 AUC (Random Forest Classifier) :", round(auc, 4))


🌲 AUC (Random Forest Classifier) : 0.8021


In [34]:
# from pyspark.sql.functions import col

# # Arrondir la prédiction (float) en binaire
# predictions_rounded = predictions.withColumn("prediction_bin", col("prediction").cast("int"))

# # Matrice de confusion
# confusion_matrix = predictions_rounded.groupBy("target", "prediction_bin").count().orderBy("target", "prediction_bin")
# confusion_matrix.show()


+------+--------------+-----+
|target|prediction_bin|count|
+------+--------------+-----+
|     0|             0|18138|
|     0|             1| 8302|
|     1|             0| 1258|
|     1|             1| 4294|
+------+--------------+-----+



In [35]:
# from pyspark.ml.classification import LogisticRegression
# from pyspark.sql.functions import col, when
# from pyspark.ml.evaluation import BinaryClassificationEvaluator

# # --- Étape 1 : Ajouter classWeightCol ---
# n_total = df_model_clean.count()
# n_pos = df_model_clean.filter(col("target") == 1).count()
# n_neg = df_model_clean.filter(col("target") == 0).count()

# balancing_ratio = n_neg / n_pos

# df_weighted = df_model_clean.withColumn(
#     "classWeightCol",
#     when(col("target") == 1, balancing_ratio).otherwise(1.0)
# )

# # --- Étape 2 : Split train/test ---
# train_data, test_data = df_weighted.randomSplit([0.8, 0.2], seed=42)

# # --- Étape 3 : Définir et entraîner le modèle Logistic Regression ---
# lr = LogisticRegression(
#     labelCol="target",
#     featuresCol="features",
#     weightCol="classWeightCol",  # important pour déséquilibre
#     maxIter=100,
#     regParam=0.0,       # pas de régularisation pour commencer
#     elasticNetParam=0.0
# )

# lr_model = lr.fit(train_data)

# # --- Étape 4 : Prédictions et évaluation ---
# predictions = lr_model.transform(test_data)

# evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
# auc = evaluator.evaluate(predictions)

# print("📈 AUC (Logistic Regression) :", round(auc, 4))


📈 AUC (Logistic Regression) : 0.7574


In [36]:
# # Matrice de confusion
# predictions_rounded = predictions.withColumn("prediction_bin", col("prediction").cast("int"))
# predictions_rounded.groupBy("target", "prediction_bin").count().orderBy("target", "prediction_bin").show()


+------+--------------+-----+
|target|prediction_bin|count|
+------+--------------+-----+
|     0|             0|18346|
|     0|             1| 8094|
|     1|             0| 1795|
|     1|             1| 3757|
+------+--------------+-----+



In [37]:
rf_model.save("rf_model_accidents")