In [48]:
from pyspark.sql import SparkSession

In [49]:
spark = SparkSession.builder.appName("SmartLogitrack")\
    .config("spark.sql.warehouse.dir", "/home/khadija/spark-warehouse")  \
    .enableHiveSupport() \
    .getOrCreate()
spark

In [50]:

df = spark.read.parquet('data/dataset.parquet')

df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

# Analyse Exploratoire des données

In [51]:
# Afficher le schéma (types de colonnes) 

In [52]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



### Statistiqus descriptives 

In [53]:
df.describe().show()



+-------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-------------------+-------------------+
|summary|          VendorID|   passenger_count|    trip_distance|       RatecodeID|store_and_fwd_flag|     PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|        tip_amount|       tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        Airport_fee| cbd_congestion_fee|
+-------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+-------

                                                                                

In [54]:
# Fares and tips
df.select("fare_amount","tip_amount","total_amount").summary().show()

[Stage 7:>                                                          (0 + 1) / 1]

+-------+------------------+------------------+------------------+
|summary|       fare_amount|        tip_amount|      total_amount|
+-------+------------------+------------------+------------------+
|  count|           3475226|           3475226|           3475226|
|   mean| 17.08180276045484|2.9598127862758044|25.611291697280986|
| stddev|463.47291781729996| 3.779681153612477| 463.6584784502166|
|    min|            -900.0|             -86.0|            -901.0|
|    25%|               8.6|               0.0|              15.2|
|    50%|             12.12|              2.45|             19.95|
|    75%|              19.5|              3.93|             27.78|
|    max|         863372.12|             400.0|         863380.37|
+-------+------------------+------------------+------------------+



                                                                                

In [55]:
# Trip distance stats
df.select("trip_distance").summary("min","max","mean","stddev").show()

+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|    min|              0.0|
|    max|        276423.57|
|   mean|5.855126178843539|
| stddev|564.6015996346273|
+-------+-----------------+



### Vérification les valeurs manquantes

In [56]:
from pyspark.sql.functions import col, sum, when, isnan,count
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       0|                   0|                    0|         540149|            0|    540149|            540149|           0|    

                                                                                

### Distribution des variables catégorielles

In [57]:
# Count les Type de paiement
df.groupBy("payment_type").count().show()

# Count store_and_fwd_flag
df.groupBy("store_and_fwd_flag").count().show()


                                                                                

+------------+-------+
|payment_type|  count|
+------------+-------+
|           5|      1|
|           1|2444393|
|           3|  23773|
|           2| 390429|
|           4|  76481|
|           0| 540149|
+------------+-------+





+------------------+-------+
|store_and_fwd_flag|  count|
+------------------+-------+
|                 Y|   7646|
|                 N|2927431|
|              NULL| 540149|
+------------------+-------+



                                                                                

### Détection des valeurs aberrantes

In [58]:
# Trips avec distance <= 0
distance_bas = df.filter(col("trip_distance") <= 0)
print(f"Nombre de trajets avec distance <= 0 : {distance_bas.count()}")
# Trips avec distance très élevée (> 200 miles)
distance_high = df.filter(col("trip_distance") > 200)
print(f"Nombre de trajets avec distance > 200 miles : {distance_high.count()}")


Nombre de trajets avec distance <= 0 : 90893
Nombre de trajets avec distance > 200 miles : 122


In [59]:
print("Aperçu des trajets avec distance <= 0")

distance_bas.select("trip_distance").show(5)

print("Aperçu des trajets avec distance > 200")
distance_high.select("trip_distance").show(5)

Aperçu des trajets avec distance <= 0
+-------------+
|trip_distance|
+-------------+
|          0.0|
|          0.0|
|          0.0|
|          0.0|
|          0.0|
+-------------+
only showing top 5 rows
Aperçu des trajets avec distance > 200
+-------------+
|trip_distance|
+-------------+
|       206.45|
|      1472.37|
|        265.9|
|      1847.61|
|      4020.04|
+-------------+
only showing top 5 rows


In [93]:
# Passagers ≤ 0
passenger_invalid = df.filter(col("passenger_count") <= 0)
print(f"Nombre de trajets avec passagers ≤ 0 : {passenger_invalid.count()}")


Nombre de trajets avec passagers ≤ 0 : 24656


## Ingestion des données brutes (Bronze)

In [65]:
# Créer la base Bronze si elle n'existe pas
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")



DataFrame[]

In [64]:
# Stocker le DataFrame brut (Bronze)
df.write.format("parquet").mode("overwrite").saveAsTable("bronze.bronze_taxi")


26/01/09 15:06:35 WARN HiveExternalCatalog: Hive incompatible types found: timestamp_ntz, timestamp_ntz. Persisting data source table `spark_catalog`.`bronze`.`bronze_taxi` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In [67]:
# Vérifie que la base bronze existe dans Spark
spark.sql("SHOW DATABASES").show()

# Vérifie que la table bronze_taxi est créée
spark.sql("SHOW TABLES IN bronze").show()



+---------+
|namespace|
+---------+
|   bronze|
|  default|
+---------+

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|   bronze|bronze_taxi|      false|
+---------+-----------+-----------+



# Nettoyage & Feature Engineering

##  Zone Silver

In [118]:
# Charger les données Bronze
silver_df = spark.table("bronze.bronze_taxi")
silver_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [119]:
silver_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



### Nettoyage

#### Supprimer les doublons

In [120]:

silver_df.count(), silver_df.dropDuplicates().count()

                                                                                

(3475226, 3475226)

#### Suppression des valeurs manquantes

In [121]:

silver_df = silver_df.dropna()
silver_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [122]:
from pyspark.sql.functions import col, sum, when, isnan,count
silver_df.select([count(when(col(c).isNull(), c)).alias(c) for c in silver_df.columns]).show()



+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       0|                   0|                    0|              0|            0|         0|                 0|           0|    

                                                                                

#### Filtrage les trajets aberrants

In [123]:
# trajets avec distance > 0 ou et <= 200)
silver_df = silver_df.filter( (silver_df.trip_distance > 0) & (silver_df.trip_distance <= 200) )

silver_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [124]:
# Vérifier les distances invalides
silver_df.filter((col("trip_distance") <= 0) | (col("trip_distance") > 200)).show()




+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+----

                                                                                

In [127]:
 # les passager invalid Passagers ≤ 0
silver_df = silver_df.filter( (silver_df.passenger_count > 0))

In [126]:

# Calculer la durée du trajet en minutes
from pyspark.sql.functions import col, unix_timestamp

silver_df = silver_df.withColumn(
    "trip_duree",
    (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60
)
silver_df.select("trip_duree").show(5)

+----------+
|trip_duree|
+----------+
+----------+

