In [0]:
configs = {
  "fs.azure.account.auth.type.synapseadlsgen2sakshi.dfs.core.windows.net": "OAuth",
  "fs.azure.account.oauth.provider.type.synapseadlsgen2sakshi.dfs.core.windows.net": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id.synapseadlsgen2sakshi.dfs.core.windows.net": "f6476d6a-bbeb-4c87-bb09-2596163c7513",
  "fs.azure.account.oauth2.client.secret.synapseadlsgen2sakshi.dfs.core.windows.net": ".RA8Q~PuV7vzyPtH4tc6Kku0NW5GLDc6zagD9bnz",
  "fs.azure.account.oauth2.client.endpoint.synapseadlsgen2sakshi.dfs.core.windows.net": "https://login.microsoftonline.com/7540734b-e567-46c3-9ad3-ec9fb9e50140/oauth2/token"
}

In [0]:
df = spark.read.format("csv").option("header", "true").load("/Volumes/workspace/default/dataset")

In [0]:
dbutils.fs.ls("/Volumes/workspace/default/dataset")

[FileInfo(path='dbfs:/Volumes/workspace/default/dataset/yellow-tripdata-2024-01.csv', name='yellow-tripdata-2024-01.csv', size=334631894, modificationTime=1752671900000)]

Data Exploration

In [0]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- Airport_fee: string (nullable = true)



In [0]:
df.limit(5).display()

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01 00:57:55.000000,2024-01-01 01:17:43.000000,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0,1,22.7,2.5,0
1,2024-01-01 00:03:00.000000,2024-01-01 00:09:36.000000,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0,1,18.75,2.5,0
1,2024-01-01 00:17:06.000000,2024-01-01 00:35:01.000000,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0,1,31.3,2.5,0
1,2024-01-01 00:36:38.000000,2024-01-01 00:44:56.000000,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0,1,17.0,2.5,0
1,2024-01-01 00:46:51.000000,2024-01-01 00:52:57.000000,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0,1,16.1,2.5,0


In [0]:
df.count()

2964624

In [0]:
from pyspark.sql.functions import col, to_timestamp, unix_timestamp, round, expr

# Load raw data from Volume
df_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .load("/Volumes/workspace/default/dataset/yellow-tripdata-2024-01.csv")

# Drop rows with nulls in important columns
df_clean = df_bronze.dropna(subset=[
    "passenger_count", "trip_distance", "fare_amount", 
    "tpep_pickup_datetime", "tpep_dropoff_datetime"
])

# Use try_cast to handle invalid formats gracefully
df_silver = df_clean \
    .withColumn("passenger_count", expr("try_cast(passenger_count AS int)")) \
    .withColumn("trip_distance", expr("try_cast(trip_distance AS double)")) \
    .withColumn("fare_amount", expr("try_cast(fare_amount AS double)")) \
    .withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
    .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))

# Filter invalid numeric values and nulls after casting
df_silver = df_silver.filter(
    (col("passenger_count").isNotNull()) & (col("passenger_count") > 0) &
    (col("trip_distance").isNotNull()) & (col("trip_distance") > 0) &
    (col("fare_amount").isNotNull()) & (col("fare_amount") > 0)
)

# Add derived column: trip duration in minutes
df_silver = df_silver.withColumn(
    "trip_duration_minutes",
    round((unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0, 2)
)

# Write Silver Layer to Volumes (overwrite mode)
df_silver.write.mode("overwrite").format("parquet") \
    .save("/Volumes/workspace/default/silver/yellow_taxi")

# OPTIONAL: Preview the data
df_silver.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|trip_duration_minutes|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|        

Displaying Data


In [0]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2|2024-01-01 00:57:...| 2024-01-01 01:17:...|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|    1|    0.5|         

Describe Data

In [0]:
df.describe(["trip_distance", "fare_amount", "passenger_count"]).show()

+-------+------------------+------------------+------------------+
|summary|     trip_distance|       fare_amount|   passenger_count|
+-------+------------------+------------------+------------------+
|  count|           2964624|           2964624|           2964624|
|   mean| 3.652169178958276|18.175061916789456|1.3392808966805005|
| stddev|225.46257238220005|18.949547705905285|0.8502816924800881|
|    min|                 0|             -0.01|                 0|
|    max|                99|             99.64|                \N|
+-------+------------------+------------------+------------------+



Checking for Null values

In [0]:
from pyspark.sql.functions import col, isnan, isnull

df.select([
    col(c).isNull().alias(c + "_isnull") for c in df.columns
]).summary().show()


+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    25%|
|    50%|
|    75%|
|    max|
+-------+



Bronze Layer

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("NYC Yellow Taxi ETL").getOrCreate()

# Read raw CSV from Unity Volume (only works in CE under /Volumes path)
df_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .load("/Volumes/workspace/default/dataset/yellow-tripdata-2024-01.csv")

# Save as Bronze layer in Parquet format
df_bronze.write.mode("overwrite").parquet("/Volumes/workspace/default/bronze/yellow_taxi")

In [0]:
df.display("/mnt/bronze/yellow_taxi")

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01 00:57:55.000000,2024-01-01 01:17:43.000000,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1,22.7,2.5,0.0
1,2024-01-01 00:03:00.000000,2024-01-01 00:09:36.000000,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1,18.75,2.5,0.0
1,2024-01-01 00:17:06.000000,2024-01-01 00:35:01.000000,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1,31.3,2.5,0.0
1,2024-01-01 00:36:38.000000,2024-01-01 00:44:56.000000,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1,17.0,2.5,0.0
1,2024-01-01 00:46:51.000000,2024-01-01 00:52:57.000000,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1,16.1,2.5,0.0
1,2024-01-01 00:54:08.000000,2024-01-01 01:26:31.000000,1,4.7,1,N,148,141,1,29.6,3.5,0.5,6.9,0.0,1,41.5,2.5,0.0
2,2024-01-01 00:49:44.000000,2024-01-01 01:15:47.000000,2,10.82,1,N,138,181,1,45.7,6.0,0.5,10.0,0.0,1,64.95,0.0,1.75
1,2024-01-01 00:30:40.000000,2024-01-01 00:58:40.000000,0,3.0,1,N,246,231,2,25.4,3.5,0.5,0.0,0.0,1,30.4,2.5,0.0
2,2024-01-01 00:26:01.000000,2024-01-01 00:54:12.000000,1,5.44,1,N,161,261,2,31.0,1.0,0.5,0.0,0.0,1,36.0,2.5,0.0
2,2024-01-01 00:28:08.000000,2024-01-01 00:29:16.000000,1,0.04,1,N,113,113,2,3.0,1.0,0.5,0.0,0.0,1,8.0,2.5,0.0


In [0]:
from pyspark.sql.functions import col, to_timestamp, unix_timestamp, round

# Read raw data from Volume
df_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .load("/Volumes/workspace/default/dataset/yellow-tripdata-2024-01.csv")


Silver Layer Transformation

In [0]:
# Drop nulls from key columns
df_silver = df_bronze.dropna(subset=[
    "passenger_count", "trip_distance", "fare_amount", 
    "tpep_pickup_datetime", "tpep_dropoff_datetime"
])

# Filter out invalid records
df_silver = df_silver.filter(
    (col("passenger_count") > 0) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0)
)

# Convert string columns to timestamps and add trip duration
df_silver = df_silver.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
                     .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime")) \
                     .withColumn("trip_duration_minutes",
                         round(
                             (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0, 2
                         )
                     )


In [0]:
from pyspark.sql.functions import expr, to_timestamp, unix_timestamp, round

# Load Silver layer
df_raw = spark.read.parquet("/Volumes/workspace/default/silver/yellow_taxi")

# Safe try_cast for all relevant fields
df_casted = df_raw \
    .withColumn("VendorID", expr("try_cast(VendorID as int)")) \
    .withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
    .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime")) \
    .withColumn("passenger_count", expr("try_cast(passenger_count as int)")) \
    .withColumn("trip_distance", expr("try_cast(trip_distance as double)")) \
    .withColumn("RatecodeID", expr("try_cast(RatecodeID as int)")) \
    .withColumn("PULocationID", expr("try_cast(PULocationID as int)")) \
    .withColumn("DOLocationID", expr("try_cast(DOLocationID as int)")) \
    .withColumn("payment_type", expr("try_cast(payment_type as int)")) \
    .withColumn("fare_amount", expr("try_cast(fare_amount as double)")) \
    .withColumn("extra", expr("try_cast(extra as double)")) \
    .withColumn("mta_tax", expr("try_cast(mta_tax as double)")) \
    .withColumn("tip_amount", expr("try_cast(tip_amount as double)")) \
    .withColumn("tolls_amount", expr("try_cast(tolls_amount as double)")) \
    .withColumn("improvement_surcharge", expr("try_cast(improvement_surcharge as double)")) \
    .withColumn("total_amount", expr("try_cast(total_amount as double)")) \
    .withColumn("congestion_surcharge", expr("try_cast(congestion_surcharge as double)")) \
    .withColumn("Airport_fee", expr("try_cast(Airport_fee as double)"))

# Drop rows with NULLs in core fields
df_cleaned = df_casted.dropna(subset=[
    "passenger_count", "trip_distance", "fare_amount", 
    "tpep_pickup_datetime", "tpep_dropoff_datetime"
])

# Add derived column: trip_duration_minutes
df_final = df_cleaned.withColumn(
    "trip_duration_minutes",
    round((unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0, 2)
)

# Show output sample
df_final.select("trip_distance", "fare_amount", "passenger_count", "trip_duration_minutes").show(10)


+-------------+-----------+---------------+---------------------+
|trip_distance|fare_amount|passenger_count|trip_duration_minutes|
+-------------+-----------+---------------+---------------------+
|         1.72|       17.7|              1|                 19.8|
|          1.8|       10.0|              1|                  6.6|
|          4.7|       23.3|              1|                17.92|
|          1.4|       10.0|              1|                  8.3|
|          0.8|        7.9|              1|                  6.1|
|          4.7|       29.6|              1|                32.38|
|        10.82|       45.7|              2|                26.05|
|         5.44|       31.0|              1|                28.18|
|         0.04|        3.0|              1|                 1.13|
|         0.75|        7.9|              2|                 6.32|
+-------------+-----------+---------------+---------------------+
only showing top 10 rows


In [0]:
# Save cleaned Silver layer to Volumes (overwrite if already exists)
# Save the cleaned and casted Silver data
df_final.write.mode("overwrite").parquet("/Volumes/workspace/default/silver/verified_yellow_taxi")

Gold Layer

In [0]:
from pyspark.sql.functions import avg, sum, count, round

# Load Silver (verified) layer
df_silver = spark.read.parquet("/Volumes/workspace/default/silver/verified_yellow_taxi")

# 1️. Average fare & trip distance by passenger count
df_agg_passenger = df_silver.groupBy("passenger_count").agg(
    round(avg("trip_distance"), 2).alias("avg_distance"),
    round(avg("fare_amount"), 2).alias("avg_fare"),
    count("*").alias("trip_count")
)

# 2. Total revenue by payment type
df_agg_payment = df_silver.groupBy("payment_type").agg(
    round(sum("total_amount"), 2).alias("total_revenue"),
    count("*").alias("transactions")
)

# 3. Average trip duration per passenger count
df_agg_duration = df_silver.groupBy("passenger_count").agg(
    round(avg("trip_duration_minutes"), 2).alias("avg_trip_duration")
)

# Save all aggregations to Gold layer
df_agg_passenger.write.mode("overwrite").parquet("/Volumes/workspace/default/gold/fare_by_passenger")
df_agg_payment.write.mode("overwrite").parquet("/Volumes/workspace/default/gold/revenue_by_payment_type")
df_agg_duration.write.mode("overwrite").parquet("/Volumes/workspace/default/gold/trip_duration_by_passenger")

# Show previews
print("Avg Fare & Distance by Passenger Count:")
df_agg_passenger.orderBy("passenger_count").show()

print("Total Revenue by Payment Type:")
df_agg_payment.orderBy("payment_type").show()

print("Avg Trip Duration by Passenger Count:")
df_agg_duration.orderBy("passenger_count").show()


Avg Fare & Distance by Passenger Count:
+---------------+------------+--------+----------+
|passenger_count|avg_distance|avg_fare|trip_count|
+---------------+------------+--------+----------+
|              1|        3.18|   17.91|   2134627|
|              2|        3.83|   20.59|    395093|
|              3|         3.7|   20.41|     88966|
|              4|        3.99|   21.98|     49592|
|              5|        3.09|   17.59|     33306|
|              6|        2.97|   17.22|     22178|
|              7|        3.67|   50.84|         5|
|              8|        2.14|    86.8|        37|
|              9|         1.8|    11.4|         1|
+---------------+------------+--------+----------+

Total Revenue by Payment Type:
+------------+-------------+------------+
|payment_type|total_revenue|transactions|
+------------+-------------+------------+
|           1|6.389077665E7|     2273580|
|           2|   9984412.65|      417812|
|           3|    220226.38|        9806|
|           4