In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import LongType, DoubleType, StringType, TimestampType

import os

folder = "/Volumes/workspace/trips/yellow_taxi"
files = dbutils.fs.ls(folder)

dfs = []

for f in files:
    if f.path.endswith(".parquet"):
        tmp = spark.read.parquet(f.path)
        # Uniformiser les types et noms
        tmp = tmp.withColumn("VendorID", col("VendorID").cast(LongType())) \
                 .withColumn("passenger_count", col("passenger_count").cast(DoubleType())) \
                 .withColumn("PULocationID", col("PULocationID").cast(LongType())) \
                 .withColumn("DOLocationID", col("DOLocationID").cast(LongType())) \
                 .withColumnRenamed("Airport_fee", "airport_fee")  # uniformiser majuscule
        dfs.append(tmp)

# Combiner tous les DataFrames
from functools import reduce
from pyspark.sql import DataFrame

df = reduce(DataFrame.unionByName, dfs)
df.printSchema()
df.show(5)


In [0]:
# Réécrire tous les fichiers Parquet avec un schéma uniforme
df.write.mode("overwrite").parquet("/Volumes/workspace/trips/yellow_taxi_clean")

Création d'un objet de type pyspark.sql.dataframe.DataFrame

In [0]:
df = spark.read.parquet("/Volumes/workspace/trips/yellow_taxi_clean")
df.show(5)

# Exploratory Data Analysis (EDA)

In [0]:
df.columns

## Comptage des valeurs null

In [0]:
from pyspark.sql.functions import col, sum

# Pour chaque colonne, compter les valeurs nulles
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

certaines colonnes (`passenger_count`, `RatecodeID`, `store_and_fwd_flag`, `congestion_surcharge`, `airport_fee`) contiennent un nombre identique de valeurs nulles.

Cela indique que **ces valeurs manquantes se trouvent toutes sur les mêmes lignes**.

In [0]:
df.count()

## Statistiques descriptives (numériques)

In [0]:
df.select(
    "trip_distance",
    "fare_amount",
    "total_amount",
    "tip_amount",
    "passenger_count"
).describe().show()

## Analyse des variables catégorielles

In [0]:
# payment_type
df.groupBy("payment_type").count().orderBy("count", ascending=False).show()

In [0]:
# RatecodeID : Type de tarif appliqué
df.groupBy("RatecodeID").count().orderBy("count", ascending=False).show()

In [0]:
# store_and_fwd_flag : Indique si les données ont été stockées puis envoyées plus tard
df.groupBy("store_and_fwd_flag").count().orderBy("count", ascending=False).show()

## Analyse temporelle
Création des features temporelles

In [0]:
from pyspark.sql.functions import hour, dayofweek

df = df.withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
       .withColumn("pickup_day", dayofweek("tpep_pickup_datetime"))


In [0]:
# heures de pointe
df.groupBy("pickup_hour").count().orderBy("count", ascending=False).show(10)

## Durée du trajet

In [0]:
from pyspark.sql.functions import unix_timestamp

df = df.withColumn(
    "trip_duration_min",
    (unix_timestamp("tpep_dropoff_datetime") -
     unix_timestamp("tpep_pickup_datetime")) / 60
)


In [0]:
df.select("trip_duration_min").describe().show()

## Vérification cohérence financière
total_amount ≈ somme des composantes

In [0]:
df.selectExpr(
    "avg(total_amount)",
    "avg(fare_amount + extra + mta_tax + tip_amount + tolls_amount)"
).show()


## Fixer un seuil pour les valeurs très élevées
Interprétation :
- 99 % des valeurs sont normales
- 1 % extrêmes → anomalies

Souvent : seuil = percentile 99


In [0]:
seuil_fare_amount = df.approxQuantile("fare_amount", [0.99], 0.01)[0]
seuil_fare_amount

In [0]:
seuil_trip_distance = df.approxQuantile("trip_distance", [0.99], 0.01)[0]
seuil_trip_distance

In [0]:
seuil_total_amount = df.approxQuantile("total_amount", [0.99], 0.01)[0]
seuil_total_amount

In [0]:
seuil_mta_tax = df.approxQuantile("mta_tax", [0.99], 0.01)[0]
seuil_mta_tax

In [0]:
seuil_passenger_count = df.approxQuantile("passenger_count", [0.99], 0.01)[0]
seuil_passenger_count

In [0]:
df[(df["passenger_count"]==seuil_passenger_count) | (df["trip_distance"]==seuil_trip_distance) | (df["fare_amount"]==seuil_fare_amount) | (df["total_amount"]==seuil_total_amount)].show()

## Vérifier les lignes contenant des anomalies

In [0]:
from pyspark.sql.functions import col

all_anomalies_condition = (
    (
        col("passenger_count").isNull() |
        (col("passenger_count") == 0) |
        (col("passenger_count") >= seuil_passenger_count)
    ) &

    (
        (col("trip_distance") <= 0) |
        (col("trip_distance") >= seuil_trip_distance)
    ) &

    (
        (col("fare_amount") <= 0) |
        (col("fare_amount") >= seuil_fare_amount)
    ) &

    (
        col("tip_amount") < 0
    ) &

    (
        (col("total_amount") <= 0) |
        (col("total_amount") >= seuil_total_amount)
    ) &

    col("RatecodeID").isNull() &
    col("store_and_fwd_flag").isNull() &
    col("congestion_surcharge").isNull() &
    col("airport_fee").isNull()
)


In [0]:
df_anomalies = df.filter(all_anomalies_condition)
df_anomalies.count()

In [0]:
df_anomalies.show()

## Nettoyage des anomalies

In [0]:
df_clean = df.filter(~all_anomalies_condition)

### Durée du trajet

In [0]:
from pyspark.sql.functions import col, when, abs
# Supprimer durées négatives
df_clean = df_clean.filter(col("trip_duration_min") > 0)


In [0]:
duration_99 = df_clean.approxQuantile("trip_duration_min", [0.99], 0.01)[0]
# Flag durée extrême (99e percentile)
df_clean = df_clean.withColumn(
    "flag_duration_extreme",
    col("trip_duration_min") > duration_99
)

### Distance du trajet

In [0]:
# Flag distance zéro
df_clean = df_clean.withColumn(
    "flag_distance_zero",
    col("trip_distance") == 0
)
# Supprimer distance = 0 avec fare > 0
df_clean = df_clean.filter(
    ~((col("trip_distance") == 0) & (col("fare_amount") > 0))
)

### Fare amount

In [0]:
df_clean = df_clean.filter(col("fare_amount") >= 0)
df_clean = df_clean.withColumn(
    "flag_fare_extreme",
    col("fare_amount") >= seuil_fare_amount
)
# Winsorisation (plafonnement)
df_clean = df_clean.withColumn(
    "fare_amount",
    when(col("fare_amount") >= seuil_fare_amount, seuil_fare_amount).otherwise(col("fare_amount"))
)


### Tip amount

In [0]:
df_clean = df_clean.filter(col("tip_amount") >= 0)
# Corriger tips > fare
df_clean = df_clean.withColumn(
    "tip_amount",
    when(col("tip_amount") > col("fare_amount"), col("fare_amount"))
    .otherwise(col("tip_amount"))
)

# Flag tips extrêmes
tip_99 = df_clean.approxQuantile("tip_amount", [0.99], 0.01)[0]

df_clean = df_clean.withColumn(
    "flag_tip_extreme",
    col("tip_amount") >= tip_99
)

# Winsorisation
df_clean = df_clean.withColumn(
    "tip_amount",
    when(col("tip_amount") >= tip_99, tip_99).otherwise(col("tip_amount"))
)

### Total amount

In [0]:
df_clean = df_clean.filter(col("total_amount") >= 0)

# Recalcul du total
from pyspark.sql.functions import coalesce, lit

df_clean = df_clean.withColumn(
    "total_recalculated",
    col("fare_amount")
    + coalesce(col("extra"), lit(0))
    + coalesce(col("mta_tax"), lit(0))
    + coalesce(col("tip_amount"), lit(0))
    + coalesce(col("tolls_amount"), lit(0))
    + coalesce(col("improvement_surcharge"), lit(0))
    + coalesce(col("congestion_surcharge"), lit(0))
)

# Filtrer incohérences (> 0.5$)
df_clean = df_clean.filter(
    abs(col("total_amount") - col("total_recalculated")) < 0.5
)

### Montants annexes

In [0]:
financial_cols = [
    "mta_tax",
    "tolls_amount",
    "improvement_surcharge",
    "congestion_surcharge",
    "airport_fee"
]

for c in financial_cols:

    # supprimer négatifs
    df_clean = df_clean.filter(col(c) >= 0)

    # seuil 99%
    q99 = df_clean.approxQuantile(c, [0.99], 0.01)[0]

    # flag extrêmes
    df_clean = df_clean.withColumn(
        f"flag_{c}_extreme",
        col(c) > q99
    )

    # winsorisation
    df_clean = df_clean.withColumn(
        c,
        when(col(c) > q99, q99).otherwise(col(c))
    )


### Passenger count

In [0]:
# Flag anomalies rares
df_clean = df_clean.withColumn(
    "flag_passenger_anomaly",
    col("passenger_count").isin([0, 7, 8, 9])
)

### RatecodeID

In [0]:
df_clean = df_clean.withColumn(
    "flag_ratecode_rare",
    col("RatecodeID") == 99
)

### Résumé final

In [0]:
print("Avant nettoyage :", df.count())
print("Après nettoyage :", df_clean.count())

## NETTOYAGE DES VALEURS MANQUANTES

### airport_fee
Logique : si NaN et zone ≠ aéroport (132, 138) → alors 0

In [0]:
df_clean = df_clean.withColumn(
    "airport_fee",
    when(
        col("airport_fee").isNull() &
        (~col("PULocationID").isin([132, 138])),
        lit(0)
    ).otherwise(col("airport_fee"))
)

### passenger_count
Logique : imputation par la médiane selon PULocationID

En PySpark, on ne peut pas faire .transform("median") directement,
donc on procède en 2 étapes.

In [0]:
# Étape 1: médiane par zone
median_passenger_by_zone = (
    df_clean
    .groupBy("PULocationID")
    .agg({"passenger_count": "median"})
    .withColumnRenamed("median(passenger_count)", "median_passenger")
)

# Étape 2: jointure + imputation
df_clean = (
    df_clean
    .join(median_passenger_by_zone, on="PULocationID", how="left")
    .withColumn(
        "passenger_count",
        when(
            col("passenger_count").isNull(),
            col("median_passenger")
        ).otherwise(col("passenger_count"))
    )
    .drop("median_passenger")
)

### RatecodeID
Logique : imputation par le mode selon PULocationID

Spark n’a pas de mode() automatique → on le calc.

### Étape 1: calcul du mode par zone

In [0]:
# Étape 1: calcul du mode par zone
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.partitionBy("PULocationID").orderBy(col("count").desc())

ratecode_mode = (
    df_clean
    .groupBy("PULocationID", "RatecodeID")
    .count()
    .withColumn("rn", row_number().over(w))
    .filter(col("rn") == 1)
    .select("PULocationID", col("RatecodeID").alias("mode_ratecode"))
)

# Étape 2: jointure + imputation
df_clean = (
    df_clean
    .join(ratecode_mode, on="PULocationID", how="left")
    .withColumn(
        "RatecodeID",
        when(
            col("RatecodeID").isNull(),
            col("mode_ratecode")
        ).otherwise(col("RatecodeID"))
    )
    .drop("mode_ratecode")
)
# s’il reste encore des null → valeur par défaut = 1
df_clean = df_clean.fillna({"RatecodeID": 1})

### store_and_fwd_flag

In [0]:
df_clean = df_clean.fillna({"store_and_fwd_flag": "N"})

### Montants annexes

In [0]:
money_cols = ["congestion_surcharge", "airport_fee"]

df_clean = df_clean.fillna(
    {c: 0 for c in money_cols}
)

### Vérification finale

In [0]:
from pyspark.sql.functions import sum

df_clean.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_clean.columns
]).show()