In [0]:
# Inferential Statistics vs Big Data Analytics
## NYC Yellow Taxi Trips (2022â€“2024)

from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from scipy.stats import ttest_ind


### Charger le dataset SAMPLE & EDA

### 

In [0]:
sample_path = "/Volumes/workspace/default/filestore/yellowtaxisample1pct_hybrid_stratified.csv"

df_sample = pd.read_csv(sample_path)
df_sample.shape


In [0]:
df_sample.head()

In [0]:
df_sample.isnull().sum()

Analyse des valeurs existantes

In [0]:
cols_focus = [
    "passenger_count",
    "RatecodeID",
    "store_and_fwd_flag",
    "congestion_surcharge",
    "airport_fee",
]

for col in cols_focus:
    print(f"\n* {col}")
    print(df_sample[col].value_counts())

In [0]:
(df_sample["tpep_pickup_datetime"] > df_sample["tpep_dropoff_datetime"]).sum()


### Nettoyage Sample dataset

In [0]:
pop_path = "/Volumes/workspace/default/filestore/Data_as_paquets/"

df_pop = spark.read.parquet(pop_path)

df_pop.printSchema()
df_pop.count()



### **ðŸ“Œ Description des colonnes â€“ NYC Yellow Taxi**
### 
**VendorID**
Identifiant du fournisseur de taxi **(ex : entreprise ou systÃ¨me de dispatch)**.

**tpep_pickup_datetime**
Date et heure de dÃ©but de la course **(prise en charge du client)**.

**tpep_dropoff_datetime**
Date et heure de fin de la course.

**passenger_count**
Nombre de passagers dans le taxi.

**trip_distance**
Distance de la course en miles.

**RatecodeID**
Code tarifaire appliquÃ© Ã  la course **(tarif standard, aÃ©roport, hors ville, etc.)**.

**store_and_fwd_flag**
Indique si les donnÃ©es de la course ont Ã©tÃ© stockÃ©es temporairement avant transmission
**(Y = oui, N = non)**.

**PULocationID**
Identifiant de la zone de prise en charge **(Pickup Location)**.

**DOLocationID**
Identifiant de la zone de dÃ©pose **(Dropoff Location)**.

**payment_type**
MÃ©thode de paiement utilisÃ©e :

1 â†’ Carte

2 â†’ Cash

Autres â†’ non standard

**fare_amount**
Montant de base de la course **(hors taxes et supplÃ©ments)**.

**extra**
SupplÃ©ments **(heures de nuit, heures de pointe, etc.)**.

**mta_tax**
Taxe MTA **(taxe fixe de transport Ã  NYC)**.

**tip_amount**
Montant du pourboire.

**tolls_amount**
Frais de pÃ©age.

**improvement_surcharge**
SupplÃ©ment rÃ©glementaire pour amÃ©lioration du service.

**congestion_surcharge**
SupplÃ©ment liÃ© Ã  la congestion dans certaines zones.

**airport_fee**
Frais supplÃ©mentaires pour les trajets vers/depuis les aÃ©roports.

**total_amount**
Montant total payÃ© par le client **(tous frais inclus)**.

In [0]:
files = dbutils.fs.ls(pop_path)

df_list = []
for f in files:
    df_list.append(spark.read.parquet(f.path))

df_pop = df_list[0]
for df in df_list[1:]:
    df_pop = df_pop.unionByName(df, allowMissingColumns=True)

df_pop.count()


In [0]:
# Calcul des NULL
null_summary = df_pop.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_pop.columns
])

# RÃ©cupÃ©rer les rÃ©sultats en dictionnaire
null_counts = null_summary.first().asDict()

# Garder uniquement les colonnes avec NULL
cols_with_nulls = [c for c, v in null_counts.items() if v > 0]

### les colonnes avec des NULL

In [0]:
null_summary.select(cols_with_nulls).show(truncate=False)


In [0]:
# from pyspark.sql.functions import col

# # Lister toutes les colonnes qui correspondent Ã  airport_fee (case-insensitive)
# airport_cols = [c for c in df_pop.columns if c.lower() == "airport_fee"]

# # Si plusieurs variantes existent
# if len(airport_cols) > 1:
#     # Garder la premiÃ¨re et supprimer les autres aprÃ¨s fusion
#     base_col = airport_cols[0]
    
#     df_pop = df_pop.withColumn("airport_fee", col(base_col))
    
#     for c in airport_cols:
#         if c != base_col:
#             df_pop = df_pop.drop(c)

# elif len(airport_cols) == 1 and airport_cols[0] != "airport_fee":
#     # Une seule colonne mais mal nommÃ©e
#     df_pop = df_pop.withColumnRenamed(airport_cols[0], "airport_fee")


In [0]:
# if "Airport_fee" in df_pop.columns:
#     df_pop = df_pop.withColumnRenamed("Airport_fee", "airport_fee")


In [0]:
from pyspark.sql.functions import col

invalid_count = df_pop.filter(
    col("tpep_pickup_datetime") > col("tpep_dropoff_datetime")
).count()

print(f"Nombre de courses avec date dÃ©but > date fin : {invalid_count}")



In [0]:
# renommer la colonne Airport_fee en airport_fee
from pyspark.sql.functions import col

df_pop = df_pop.withColumnRenamed("Airport_fee", "airport_fee")


In [0]:
# VÃ©rifier les types
from pyspark.sql.types import IntegerType, DoubleType, LongType

df_pop = df_pop \
    .withColumn("VendorID", col("VendorID").cast(LongType())) \
    .withColumn("passenger_count", col("passenger_count").cast(IntegerType())) \
    .withColumn("RatecodeID", col("RatecodeID").cast(IntegerType())) \
    .withColumn("PULocationID", col("PULocationID").cast(IntegerType())) \
    .withColumn("DOLocationID", col("DOLocationID").cast(IntegerType())) \
    .withColumn("payment_type", col("payment_type").cast(IntegerType()))


In [0]:
from pyspark.sql.functions import when

df_pop_clean = (
    df_pop
    .withColumn(
        "pickup_tmp",
        when(
            col("tpep_pickup_datetime") > col("tpep_dropoff_datetime"),
            col("tpep_dropoff_datetime")
        ).otherwise(col("tpep_pickup_datetime"))
    )
    .withColumn(
        "dropoff_tmp",
        when(
            col("tpep_pickup_datetime") > col("tpep_dropoff_datetime"),
            col("tpep_pickup_datetime")
        ).otherwise(col("tpep_dropoff_datetime"))
    )
    .drop("tpep_pickup_datetime", "tpep_dropoff_datetime")
    .withColumnRenamed("pickup_tmp", "tpep_pickup_datetime")
    .withColumnRenamed("dropoff_tmp", "tpep_dropoff_datetime")
)


In [0]:
# Remplir les valeurs manquantes
from pyspark.sql.functions import when

df_pop_clean = df_pop \
    .fillna({"store_and_fwd_flag": "N"}) \
    .fillna({"congestion_surcharge": 0, "airport_fee": 0}) \
    .withColumn(
        "passenger_count",
        when(col("passenger_count").isNull(), 1).otherwise(col("passenger_count"))
    ) \
    .withColumn(
        "RatecodeID",
        when(col("RatecodeID").isNull(), 1).otherwise(col("RatecodeID"))
    )


In [0]:
df_pop_clean.filter(
    col("tpep_pickup_datetime") > col("tpep_dropoff_datetime")
).count()


In [0]:
# VÃ©rifier les valeurs manquantes
df_pop_clean.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_pop_clean.columns
]).show()


In [0]:
# Calculer la durÃ©e du trajet
from pyspark.sql.functions import unix_timestamp

df_pop_clean = df_pop_clean.withColumn(
    "trip_duration_min",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60
).filter(col("trip_duration_min") > 0)


In [0]:
#1. Prix moyen dâ€™une course
from pyspark.sql.functions import avg

df_pop.select(avg("fare_amount").alias("avg_fare_pop")).show()


In [0]:
#2. Distance moyenne
df_pop.select(avg("trip_distance").alias("avg_distance_pop")).show()


In [0]:
#3. DurÃ©e moyenne des courses
from pyspark.sql.functions import unix_timestamp

df_pop_duration = df_pop.withColumn(
    "trip_duration_min",
    (unix_timestamp("tpep_dropoff_datetime") - 
     unix_timestamp("tpep_pickup_datetime")) / 60
)
df_pop_duration.select(avg("trip_duration_min").alias("avg_duration_pop")).show()


In [0]:
#4. Proportion des courses avec tip > 0
from pyspark.sql.functions import when, count

tip_pop = df_pop.select(
    (count(when(col("tip_amount") > 0, True)) / count("*"))
    .alias("tip_rate_pop")
)
tip_pop.show()

In [0]:
#5. Heures de pointe
from pyspark.sql.functions import hour

df_pop.groupBy(hour("tpep_pickup_datetime").alias("hour")) \
      .count().orderBy("hour").show()


In [0]:
#Top 10 zones de pickup avec les fares moyens les plus Ã©levÃ©s
from pyspark.sql.functions import avg

prix_par_zone_pickup_pop = (
    df_pop
    .groupBy("PULocationID")
    .agg(avg("fare_amount").alias("avg_fare"))
    .orderBy("avg_fare", ascending=False)
)

prix_par_zone_pickup_pop.show(10)

In [0]:
#Top 10 zones de dropoff avec les fares moyens les plus Ã©levÃ©s
prix_par_zone_dropoff_pop = (
    df_pop
    .groupBy("DOLocationID")
    .agg(avg("fare_amount").alias("avg_fare"))
    .orderBy("avg_fare", ascending=False)
)

prix_par_zone_dropoff_pop.show(10)

In [0]:

from pyspark.sql.functions import hour

hour_pop = (
    df_pop
    .withColumn("hour", hour("tpep_pickup_datetime"))
    .groupBy("hour")
    .count()
    .withColumn("proportion", col("count") / total_pop)
    .orderBy("hour")
)

hour_pop.show()

In [0]:
#Distribution par JOUR DE LA SEMAINE
from pyspark.sql.functions import date_format

day_pop = (
    df_pop
    .withColumn("day_of_week", date_format("tpep_pickup_datetime", "EEEE"))
    .groupBy("day_of_week")
    .count()
    .withColumn("proportion", col("count") / total_pop)
)

day_pop.show()

In [0]:
from pyspark.sql.functions import weekofyear, col
#CrÃ©er la variable semaine de lâ€™annÃ©e
df_pop_week = df_pop.withColumn(
    "week",
    weekofyear(col("tpep_pickup_datetime"))
)


In [0]:
#Nombre de courses par semaine (population)
eek_dist_pop = (
    df_pop_week
    .groupBy("week")
    .count()
    .orderBy("week")
)
week_dist_pop.show(10)


In [0]:
#Proportion des courses par semaine
week_dist_pop.orderBy(col("proportion").desc()).show()


In [0]:
# Calculer les courses les plus frÃ©quentes par quartier
geo_pop = df_pop_clean.groupBy("PULocationID").count().orderBy("count", ascending=False)
geo_pop.show(10)


In [0]:
df_pop_clean.select(
    "fare_amount",
    "trip_distance",
    "tip_amount",
    "passenger_count",
    "total_amount"
).describe().show()


In [0]:
outliers_pop = df_pop_clean.filter(
    (col("fare_amount") > 200) |
    (col("trip_distance") > 50) |
    (col("trip_duration_min") > 180)
)

outliers_pop.count()


In [0]:
#Courses trÃ¨s chÃ¨res (fare_amount > 200)
from pyspark.sql.functions import avg, col

# filtrer les courses trÃ¨s chÃ¨res (population)
df_expensive_pop = df_pop.filter(col("fare_amount") > 200)

# moyenne des fares Ã©levÃ©es
df_expensive_pop.select(
    avg("fare_amount").alias("mean_fare_expensive_pop")
).show()

In [0]:
#Courses trÃ¨s longues (trip_distance > 30 miles)
# filtrer les courses trÃ¨s longues (population)
df_long_pop = df_pop.filter(col("trip_distance") > 30)

# moyenne des distances longues
df_long_pop.select(
    avg("trip_distance").alias("mean_distance_long_pop")
).show()

In [0]:
#8. Ratio tip / fare par type de paiement
from pyspark.sql.functions import sum
# Calculer le pourcentage de pourboire par mode de paiement
ratio_pop = df_pop_clean.groupBy("payment_type").agg(
    (sum("tip_amount") / sum("fare_amount")).alias("tip_fare_ratio")
)
ratio_pop.show()
