# Data  Engineering project on tripdatas 2024 IPSLGIT32026

## Spark Test

In [0]:
df = spark.read.parquet("/Volumes/workspace/ipsldic3/mainvol/yellostaxidataset20024/yellow_tripdata_2024-01.parquet")
display(df)

## Config

### Directory config

In [0]:
CATALOG = "workspace"
SCHEMA = "ipsldic3"
VOLUME = "capstoneipsl"


### Create Volume if not exists

In [0]:
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")


### Directories for Medallion Architecture

In [0]:
# 2. Define Paths
VOLUME_ROOT = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"
BRONZE = f"{VOLUME_ROOT}/yellow_taxi/bronze"
SILVER = f"{VOLUME_ROOT}/yellow_taxi/silver"
GOLD   = f"{VOLUME_ROOT}/yellow_taxi/gold"
CHECKPOINTS = f"{VOLUME_ROOT}/yellow_taxi/_checkpoints"

# 3. Reset/Create Directories (Using standard Python os/shutil or dbutils)
# Note: dbutils.fs.mkdirs works on Volumes too
dbutils.fs.mkdirs(BRONZE)
dbutils.fs.mkdirs(SILVER)
dbutils.fs.mkdirs(GOLD)
dbutils.fs.mkdirs(CHECKPOINTS)

print(f"Created Volume directories at: {VOLUME_ROOT}")

## Ingestion

### Ingestion initiale des donnees brutes
BRONZE depuis un fichier externe ou en local

In [0]:
# 4. Download Sample Data (January 2024 Yellow Taxi)
# Use dbutils.fs.cp to copy from external location if needed, otherwise use %sh for wget

# Define the correct path using the VOLUME and BRONZE variables
RAW_PATH = f"{BRONZE}/raw"
dbutils.fs.mkdirs(RAW_PATH)

# Download the file using wget in a shell command
# Note: %sh cannot use Python variables directly, so we use f-string to construct the shell command

# file_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet"
file_url = "/Volumes/workspace/ipsldic3/mainvol/yellostaxidataset20024/yellow_tripdata_2024-02.parquet"
local_path = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/yellow_taxi/bronze/raw/yellow_2024_02.parquet"

# Use Python to run shell commands for dynamic paths
import subprocess

# subprocess.run(f"wget -q -O {local_path} {file_url}", shell=True, check=True)
subprocess.run(f"cp {file_url} {local_path}", shell=True, check=True)
subprocess.run(f"ls -lh {RAW_PATH}", shell=True, check=True)

In [0]:
display(dbutils.fs.ls(RAW_PATH))

### Load datas in a dataframe

In [0]:
# COMMAND ----------
# 5. Load the raw dataframe
# This variable `df_raw` will be the starting point.
df_raw = spark.read.parquet(f"{BRONZE}/raw/yellow_2024_02.parquet")
print("Bootstrap complete. df_raw loaded from Volume.")

In [0]:
display(df_raw.limit(5))
df_raw.printSchema()

### Analyse statistique

In [0]:
# Step 1: Load raw data (reuse df_raw from bootstrap or load again)
df = df_raw

# Step 2: Basic profiling
print("rows:", df.count())
print("columns:", len(df.columns))
df.describe(["trip_distance", "fare_amount", "total_amount"]).show()

### Audit de qualite

In [0]:
#Make a table with: column_name, spark_type, null_count, example_value.
from pyspark.sql import types as T
from pyspark.sql import functions as F
rows = []
total_count = df.count()
for field in df.schema.fields:
    colname = field.name
    nulls = df.filter(F.col(colname).isNull()).count()
    example = df.select(colname).where(F.col(colname).isNotNull()).limit(1).collect()
    example_val = example[0][0] if example else None
    rows.append((colname, field.dataType.simpleString(), nulls, str(example_val),total_count))

dict_df = spark.createDataFrame(rows, ["column", "type", "null_count", "example","total_count"])
display(dict_df.orderBy("column"))

### Analyse des plans d'execution

In [0]:
df = df_raw
# A shuffle-heavy job: groupBy
q = (df.groupBy("payment_type")
       .agg(F.count("*").alias("trips"),
            F.avg("total_amount").alias("avg_total")))

q.explain(True)  # view logical + physical plan
q.show()

### Analyse des temps de trajets

In [0]:
# cleansing
from pyspark.sql import functions as F

df = df_raw

df_feat = (df
  .withColumn("trip_date", F.to_date("tpep_pickup_datetime"))
  .withColumn("trip_hour", F.hour("tpep_pickup_datetime"))
  .withColumn("trip_duration_min",
      (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")) / 60.0
  )
  .withColumn("is_cash", (F.col("payment_type") == 2).cast("int"))
)

# Quick QC
df_feat.select("trip_date","trip_hour","trip_duration_min","trip_distance","total_amount").describe().show()
display(df_feat.limit(10))

In [0]:
#Analtics
df_feat.createOrReplaceTempView("yellow")

# Daily KPIs
daily = spark.sql("""
SELECT
  trip_date,
  COUNT(*) AS trips,
  AVG(trip_distance) AS avg_distance,
  AVG(total_amount) AS avg_total
FROM yellow
GROUP BY trip_date
ORDER BY trip_date
""")
display(daily)

# Hourly KPIs
hourly = spark.sql("""
SELECT
  trip_hour,
  COUNT(*) AS trips,
  AVG(total_amount) AS avg_total
FROM yellow
GROUP BY trip_hour
ORDER BY trip_hour
""")
display(hourly)

## Traitement des donnees Medallion **Architecture**

#### Couche BRONZE (Raw)
Chargement des donnees au format Delta

In [0]:
df = df_raw

# Write Bronze (raw) as Delta
bronze_path = f"{BRONZE}/delta"
(df.write
   .format("delta")
   .mode("overwrite")
   .save(bronze_path))

# Read back
bronze = spark.read.format("delta").load(bronze_path)
print("bronze rows:", bronze.count())

#### Couche Silver (Cleaned)
nettoyage et standardisation

In [0]:
### Silver Goals
#- Remove invalid rows (negative distance, negative duration, negative totals)
#- Standardize derived columns (trip_date, trip_hour, trip_duration_min)
bronze = spark.read.format("delta").load(bronze_path)

silver = (bronze
  .withColumn("trip_date", F.to_date("tpep_pickup_datetime"))
  .withColumn("trip_hour", F.hour("tpep_pickup_datetime"))
  .withColumn("trip_duration_min",
      (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")) / 60.0
  )
  .filter("trip_distance > 0")
  .filter("trip_duration_min > 0")
  .filter("total_amount > 0")
  .dropna(subset=["trip_date","trip_distance","total_amount"])
)

silver_path = f"{SILVER}/delta"
silver.write.format("delta").mode("overwrite").save(silver_path)

#### Couche Gold (Curated)
Donnees agrégées pour l'analyse

In [0]:
# Create analytics-ready KPI tables (daily + by pickup/dropoff).
silver = spark.read.format("delta").load(silver_path)

gold_daily = (silver.groupBy("trip_date")
  .agg(F.count("*").alias("trips"),
       F.avg("trip_distance").alias("avg_distance"),
       F.avg("total_amount").alias("avg_total"),
       F.sum("total_amount").alias("sum_total"))
)

gold_daily_path = f"{GOLD}/daily"
gold_daily.write.format("delta").mode("overwrite").save(gold_daily_path)

display(spark.read.format("delta").load(gold_daily_path).orderBy("trip_date"))

#### New files arrive
How to add datas

In [0]:
#**Scenario:** New monthly files arrive. Some trips may overlap (reprocessing). We deduplicate using a synthetic key.
from pyspark.sql import functions as F

silver = spark.read.format("delta").load(f"{SILVER}/delta")

# Create a synthetic trip key (example; adjust to your schema)
# In practice, build a stable unique key if the dataset provides one.
silver_keyed = silver.withColumn(
    "trip_key",
    F.sha2(F.concat_ws("||",
        F.col("tpep_pickup_datetime").cast("string"),
        F.col("tpep_dropoff_datetime").cast("string"),
        F.col("PULocationID").cast("string"),
        F.col("DOLocationID").cast("string"),
        F.col("total_amount").cast("string")
    ), 256)
)

target_path = f"{SILVER}/dedup"
silver_keyed.write.format("delta").mode("overwrite").save(target_path)


#### Simulating new datas

In [0]:
#Simulating, Babacar to add more explanation during  Class
'''
updates = silver_keyed.limit(1000)  # replace with real incremental month read

updates.createOrReplaceTempView("updates")
spark.sql(f"CREATE TABLE IF NOT EXISTS silver_dedup USING DELTA LOCATION '{target_path}'")

spark.sql("""
MERGE INTO silver_dedup t
USING updates s
ON t.trip_key = s.trip_key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
'''

## Auto Loader Ingestion (File-Based Incremental Ingestion)

Auto Loader provides a Structured Streaming source `cloudFiles` that incrementally processes new files as they arrive, optionally processing existing files, and is designed to scale to very large file volumes.


In [0]:
input_path = f"{BRONZE}/incoming_parquet"         # you drop new files here
schema_path = f"{BRONZE}/_schemas/yellow"         # schema location
checkpoint = f"{CHECKPOINTS}/autoloader_yellow"   # checkpoint

dbutils.fs.mkdirs(input_path)
dbutils.fs.mkdirs(schema_path)

In [0]:
bronze_stream_path = f"{BRONZE}/autoloader_delta"

In [0]:
# add a file in the bronze autoloader_date first
file_name = "yellow_tripdata_2024-03.parquet"

# file_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file_name}"
file_url = f"/Volumes/workspace/ipsldic3/mainvol/yellostaxidataset20024/{file_name}"
local_path = f"{input_path}/{file_name}"

#file_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet"
#local_path = f"{input_path}/yellow_2024_02.parquet"

# Use Python to run shell commands for dynamic paths
import subprocess

# subprocess.run(f"wget -q -O {local_path} {file_url}", shell=True, check=True)
subprocess.run(f"cp {file_url} {local_path}", shell=True, check=True)
subprocess.run(f"ls -lh {input_path}", shell=True, check=True)

In [0]:
# Streaming read with Auto Loader (Parquet), needs another cluster type
'''stream_df = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  .option("cloudFiles.schemaLocation", schema_path)
  .load(input_path)
)

# Write stream to Bronze Delta

q = (stream_df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", checkpoint)
  .start(bronze_stream_path)
)
'''

##Performance Engineering (Partitioning, Caching, File Layout)

**Tasks:**
- Partition Gold tables by date
- Compare query time before/after caching
- Inspect physical plans

In [0]:
silver = spark.read.format("delta").load(f"{SILVER}/delta")

# Partitioned write (example)
gold_by_date_path = f"{GOLD}/by_date"
(silver.write
  .format("delta")
  .mode("overwrite")
  .partitionBy("trip_date")
  .save(gold_by_date_path))

# Benchmark sample query
import time
t0 = time.time()
spark.read.format("delta").load(gold_by_date_path).filter("trip_date = '2024-01-15'").count()
print("secs:", time.time() - t0)

# Cache benchmark (not supported on serverless)
# dfp = spark.read.format("delta").load(gold_by_date_path).cache()
# dfp.count()
# t1 = time.time()
# dfp.filter("trip_date = '2024-01-15'").count()
# print("cached secs:", time.time() - t1)

Module 10 — Structured Streaming (Near-Real-Time KPIs)

We compute streaming aggregates (trips/hour) and write to a Delta sink.

In [0]:
''' Bench not supported
from pyspark.sql import functions as F

stream_src = f"{BRONZE}/incoming_parquet"
schema_path = f"{BRONZE}/_schemas/yellow_stream_kpi"
checkpoint = f"{CHECKPOINTS}/kpi_hourly"

# Read arriving Parquet files as a stream
sdf = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  .option("cloudFiles.schemaLocation", schema_path)
  .load(stream_src)
)

# Feature columns
sdf2 = (sdf
  .withColumn("pickup_ts", F.col("tpep_pickup_datetime"))
  .withColumn("trip_hour_ts", F.date_trunc("hour", F.col("pickup_ts")))
)

hourly_kpi = (sdf2.groupBy("trip_hour_ts")
  .agg(F.count("*").alias("trips"),
       F.avg("total_amount").alias("avg_total"))
)

out_path = f"{GOLD}/stream_hourly_kpi"

q = (hourly_kpi.writeStream
  .format("delta")
  .outputMode("complete")  # aggregates often use complete mode
  .option("checkpointLocation", checkpoint)
  .start(out_path)
)
'''

## MLlib 1: Feature Engineering + ML Pipelines
Goal: Create a reusable feature pipeline that outputs a `features` vector.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql import functions as F

silver = spark.read.format("delta").load(f"{SILVER}/delta")

base = (silver
  .select("trip_distance","passenger_count","PULocationID","DOLocationID",
          "payment_type","trip_duration_min","total_amount","trip_date")
  .dropna()
  .filter("trip_distance > 0 AND trip_duration_min > 0 AND total_amount > 0")
)

cat_cols = ["payment_type", "PULocationID", "DOLocationID"]
num_cols = ["trip_distance", "passenger_count", "trip_duration_min"]

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe") for c in cat_cols]

assembler = VectorAssembler(
    inputCols=[f"{c}_ohe" for c in cat_cols] + num_cols,
    outputCol="features"
)

pipe = Pipeline(stages=indexers + encoders + [assembler])
feat_model = pipe.fit(base)
featured = feat_model.transform(base)

display(featured.select("features","total_amount","trip_date").limit(5))

## MLlib 2: Modeling (Regression + Classification) + Evaluation

**Two tasks:**
1. Regression: predict `total_amount`
2. Classification: predict whether `tip_amount > 0`

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

silver = spark.read.format("delta").load(f"{SILVER}/delta")

# Build base with label columns
df_ml = (silver
  .select("trip_distance","passenger_count","PULocationID","DOLocationID",
          "payment_type","trip_duration_min","total_amount","tip_amount","trip_date")
  .dropna()
  .filter("trip_distance > 0 AND trip_duration_min > 0 AND total_amount > 0")
  .withColumn("label_tip", (F.col("tip_amount") > 0).cast("double"))
)

In [0]:
# Reuse pipeline logic from Module MLlib1
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

cat_cols = ["payment_type", "PULocationID", "DOLocationID"]
num_cols = ["trip_distance", "passenger_count", "trip_duration_min"]

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe") for c in cat_cols]
assembler = VectorAssembler(inputCols=[f"{c}_ohe" for c in cat_cols] + num_cols, outputCol="features")

pipe = Pipeline(stages=indexers + encoders + [assembler])
pipe_model = pipe.fit(df_ml)
feat = pipe_model.transform(df_ml)

# Time-aware split
dates = [r[0] for r in feat.select("trip_date").distinct().orderBy("trip_date").collect()]
cut = dates[int(len(dates)*0.8)]

train = feat.filter(F.col("trip_date") < F.lit(cut))
test  = feat.filter(F.col("trip_date") >= F.lit(cut))

In [0]:
### Regression: Total Amount
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

gbt = GBTRegressor(featuresCol="features", labelCol="total_amount", maxIter=5, maxDepth=6)
gbt_model = gbt.fit(train)
pred = gbt_model.transform(test)

rmse = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse").evaluate(pred)
mae  = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="mae").evaluate(pred)
print("Regression RMSE:", rmse, "MAE:", mae)

In [0]:
### Classification: Tip / No-Tip
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(featuresCol="features", labelCol="label_tip", maxIter=30, regParam=0.01)
lr_model = lr.fit(train)
pred2 = lr_model.transform(test)

auc = BinaryClassificationEvaluator(labelCol="label_tip", rawPredictionCol="rawPrediction", metricName="areaUnderROC").evaluate(pred2)
print("Classification AUC:", auc)