# Initialiser Spark et charger le dataset

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


In [4]:
spark = SparkSession.builder.appName("Vehicle_Failure_Analysis").getOrCreate()

In [None]:
spark = SparkSession.builder.appName("vehicule").getOrCreate()

In [6]:
# initialisation de la session Spark
spark = SparkSession.builder.appName("Vehicle_Failure_Analysis").getOrCreate()


In [8]:
# chargement du dataset
df = spark.read.csv("vehicle_sensor_data.csv", header=True, inferSchema=True)


In [9]:
# je verifie les colonnes et les premières lignes
df.printSchema()
df.show(5)


root
 |-- vehicle_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- temperature_engine: double (nullable = true)
 |-- tire_pressure: double (nullable = true)
 |-- battery_voltage: double (nullable = true)
 |-- speed_kmh: integer (nullable = true)
 |-- brake_status: integer (nullable = true)
 |-- failure_detected: integer (nullable = true)

+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+
|vehicle_id|          timestamp|temperature_engine|tire_pressure|battery_voltage|speed_kmh|brake_status|failure_detected|
+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+
|      V001|2024-01-01 08:00:00|              85.0|          2.5|           12.3|       50|           0|               0|
|      V001|2024-01-01 08:05:00|              92.5|          2.3|           12.1|       55|           0|               0|
|      V001|2024-01-

# Étape 2 : Analyse descriptive

In [10]:
df.describe().show()

+-------+----------+------------------+------------------+-------------------+----------------+------------------+------------------+
|summary|vehicle_id|temperature_engine|     tire_pressure|    battery_voltage|       speed_kmh|      brake_status|  failure_detected|
+-------+----------+------------------+------------------+-------------------+----------------+------------------+------------------+
|  count|        45|                45|                45|                 45|              45|                45|                45|
|   mean|      NULL| 99.54444444444445| 2.031111111111111| 11.900000000000002|            56.2|0.7111111111111111|0.7111111111111111|
| stddev|      NULL|11.633836925802322|0.4182334179847324|0.44466534914165806|7.33174914018104|0.4583677673015523|0.4583677673015523|
|    min|      V001|              75.0|               1.3|               11.1|              42|                 0|                 0|
|    max|      V009|             120.0|               2.9|    

In [11]:
from pyspark.sql.functions import mean, stddev, min, max, count

df.select(
    mean("temperature_engine").alias("Mean_Temperature"),
    stddev("temperature_engine").alias("Std_Temperature"),
    min("temperature_engine").alias("Min_Temperature"),
    max("temperature_engine").alias("Max_Temperature")
).show()


+-----------------+------------------+---------------+---------------+
| Mean_Temperature|   Std_Temperature|Min_Temperature|Max_Temperature|
+-----------------+------------------+---------------+---------------+
|99.54444444444445|11.633836925802322|           75.0|          120.0|
+-----------------+------------------+---------------+---------------+



# Étape 3 : Détection et traitement des valeurs manquantes

In [12]:
from pyspark.sql.functions import when

missing_values = df.select(
    [(count(when(col(c).isNull(), c)) / count("*")).alias(c) for c in df.columns]
)
missing_values.show()


+----------+---------+------------------+-------------+---------------+---------+------------+----------------+
|vehicle_id|timestamp|temperature_engine|tire_pressure|battery_voltage|speed_kmh|brake_status|failure_detected|
+----------+---------+------------------+-------------+---------------+---------+------------+----------------+
|       0.0|      0.0|               0.0|          0.0|            0.0|      0.0|         0.0|             0.0|
+----------+---------+------------------+-------------+---------------+---------+------------+----------------+



In [13]:
df = df.fillna({"temperature_engine": df.select(mean("temperature_engine")).collect()[0][0]})
df = df.dropna(thresh=int(len(df.columns) * 0.5))  # ici je supprime les lignes avec +50% de NaN
df

DataFrame[vehicle_id: string, timestamp: timestamp, temperature_engine: double, tire_pressure: double, battery_voltage: double, speed_kmh: int, brake_status: int, failure_detected: int]

# Étape 4 : Détection et gestion des valeurs aberrantes

### On détecte les valeurs aberrantes via la méthode de l’IQR (Interquartile Range) :

In [17]:
Q1, Q3 = df.approxQuantile("temperature_engine", [0.25, 0.75], 0.01)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

df = df.withColumn("outlier_temperature", when((col("temperature_engine") < lower_bound) | (col("temperature_engine") > upper_bound), 1).otherwise(0))
df.show()


+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+-------------------+---------------------+
|vehicle_id|          timestamp|temperature_engine|tire_pressure|battery_voltage|speed_kmh|brake_status|failure_detected|outlier_temperature|temperature_variation|
+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+-------------------+---------------------+
|      V001|2024-01-01 08:00:00|              85.0|          2.5|           12.3|       50|           0|               0|                  0|                 NULL|
|      V001|2024-01-01 08:05:00|              92.5|          2.3|           12.1|       55|           0|               0|                  0|                  7.5|
|      V001|2024-01-01 08:10:00|             110.2|          1.9|           11.8|       60|           1|               1|                  0|   17.700000000000003|
|      V001|2024

In [19]:
df = df.filter((col("temperature_engine") >= lower_bound) & (col("temperature_engine") <= upper_bound))
df

DataFrame[vehicle_id: string, timestamp: timestamp, temperature_engine: double, tire_pressure: double, battery_voltage: double, speed_kmh: int, brake_status: int, failure_detected: int, outlier_temperature: int, temperature_variation: double]

# Étape 5 : Feature Engineering

In [20]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

window_spec = Window.partitionBy("vehicle_id").orderBy("timestamp")

df = df.withColumn("temperature_variation", col("temperature_engine") - lag("temperature_engine", 1).over(window_spec))
df.show()


+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+-------------------+---------------------+
|vehicle_id|          timestamp|temperature_engine|tire_pressure|battery_voltage|speed_kmh|brake_status|failure_detected|outlier_temperature|temperature_variation|
+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+-------------------+---------------------+
|      V001|2024-01-01 08:00:00|              85.0|          2.5|           12.3|       50|           0|               0|                  0|                 NULL|
|      V001|2024-01-01 08:05:00|              92.5|          2.3|           12.1|       55|           0|               0|                  0|                  7.5|
|      V001|2024-01-01 08:10:00|             110.2|          1.9|           11.8|       60|           1|               1|                  0|   17.700000000000003|
|      V001|2024

In [21]:
df = df.withColumn(
    "risk_score",
    (col("temperature_engine") / 100) * 2 +
    (1 - col("tire_pressure") / 3) * 2 +
    (col("brake_status") * 3)
)

df.show()


+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+-------------------+---------------------+------------------+
|vehicle_id|          timestamp|temperature_engine|tire_pressure|battery_voltage|speed_kmh|brake_status|failure_detected|outlier_temperature|temperature_variation|        risk_score|
+----------+-------------------+------------------+-------------+---------------+---------+------------+----------------+-------------------+---------------------+------------------+
|      V001|2024-01-01 08:00:00|              85.0|          2.5|           12.3|       50|           0|               0|                  0|                 NULL| 2.033333333333333|
|      V001|2024-01-01 08:05:00|              92.5|          2.3|           12.1|       55|           0|               0|                  0|                  7.5| 2.316666666666667|
|      V001|2024-01-01 08:10:00|             110.2|          1.9|           11.8|    