#       NYC Yellow Taxi — PySpark ETL Project

End-to-end ETL pipeline based on NYC Yellow Taxi data.
Implements ingestion, cleaning, feature engineering, aggregations and Delta Lake outputs organized into Bronze, Silver and Gold layers.

In [0]:
import pyspark.sql.functions as sf

### 

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.nyt_silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.nyt_gold")

DataFrame[]

In [0]:
input_path = "/Volumes/workspace/nyt/bronze"
output_path_silver = "/Volumes/workspace/nyt_silver"
output_path_gold = "/Volumes/workspace/nyt_gold"

#### Bronze Layer — Data Ingestion

- Load raw monthly taxi trip files (csv.gz) from the bronze volume.
- Infer schema and parse timestamps.
- No transformations are applied at this stage.
- Data is stored exactly as received.

In [0]:
df = spark.read.option("header", True).option("inferSchema", True).csv(f"{input_path}/yellow_tripdata_2019-0*.csv.gz")

df.printSchema()
df.count()
print("Row count:", df.count())

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

Row count: 14687167


#### Silver Layer — Data Cleaning & Feature Engineering

Produces a cleaned, analytics-ready dataset.
Key operations:
- Calculate trip duration in minutes from pickup/dropoff timestamps.
- Extract pickup hour and pickup day of week.
- Compute average speed
- Apply data-quality filters
- Save cleaned dataset as a Delta table in the silver layer.

In [0]:
df = df.withColumn('trip_duration_min', sf.round((sf.unix_timestamp('tpep_dropoff_datetime') - sf.unix_timestamp('tpep_pickup_datetime')) / 60, 2))
df = df.withColumn('pickup_hour', sf.hour('tpep_pickup_datetime'))
df = df.withColumn('pickup_dow', sf.dayofweek('tpep_pickup_datetime'))
df = df.withColumn('speed_mph', sf.col('trip_distance') / (sf.col('trip_duration_min') / 60))

In [0]:
df_clean = df.filter((sf.col('trip_duration_min') > 0) & (sf.col('trip_duration_min') <= 240) & (sf.col('trip_distance') > 0) & (sf.col('fare_amount') > 0) & (sf.col('speed_mph') > 1) & (sf.col('speed_mph') < 120))

print("Row count:", df_clean.count())

Row count: 14511778


In [0]:
df_clean.write.mode('overwrite').format("delta").saveAsTable("workspace.nyt_silver.trips_clean")

#### Gold Layer — Aggregations & Business Metrics

Generates curated, aggregated tables for analytics dashboards.

Metrics:
- Trips by hour: trip count, avg duration, avg distance, avg fare.
- Trips by day of week: same metrics aggregated by DOW.
- Payment statistics: count, avg fare, avg tip by payment type.
- Speed metrics: avg/min/max speed per pickup hour.
- Trip length buckets: short/medium/long distance segmentation.

All aggregated outputs are stored as Delta tables in the gold layer.

In [0]:
trips_by_hour = df_clean.groupBy('pickup_hour').agg(sf.count('*').alias('trip_count'), sf.round(sf.avg('trip_duration_min'), 2).alias('avg_duration'), sf.round(sf.avg('trip_distance'), 2).alias('avg_distance'), sf.round(sf.avg('total_amount'), 2).alias('avg_total_amount')).sort('pickup_hour', ascending=True)

trips_by_dow = df_clean.groupBy('pickup_dow').agg(sf.count('*').alias('trip_count'), sf.round(sf.avg('trip_duration_min'), 2).alias('avg_duration'), sf.round(sf.avg('trip_distance'), 2).alias('avg_distance'), sf.round(sf.avg('total_amount'), 2).alias('avg_total_amount')).sort('pickup_dow', ascending=True)

payment_stats = df_clean.groupBy('payment_type').agg(sf.count('*').alias('trip_count'), sf.round(sf.avg('total_amount'), 2).alias('avg_amount'), sf.round(sf.avg('tip_amount'), 2).alias('avg_tip_amount'))

speed_by_hour = df_clean.groupBy('pickup_hour').agg(sf.count('*').alias('trip_count'), sf.round(sf.avg('speed_mph'), 2).alias('avg_speed_mph'), sf.round(sf.min('speed_mph'), 2).alias('min_speed_mph'), sf.round(sf.max('speed_mph'), 2).alias('max_speed_mph')).sort('pickup_hour', ascending=True)

+-----------+----------+------------+------------+----------------+
|pickup_hour|trip_count|avg_duration|avg_distance|avg_total_amount|
+-----------+----------+------------+------------+----------------+
|          0|    398639|       12.31|        3.38|           17.69|
|          1|    283547|       11.52|        3.16|           16.69|
|          2|    205084|       11.05|        3.03|           16.07|
|          3|    145133|       11.14|         3.3|           16.75|
|          4|    112168|       11.85|        4.17|            19.3|
|          5|    135530|       11.63|        4.51|           20.54|
|          6|    322145|       11.19|        3.37|           16.68|
|          7|    560717|       12.63|        2.86|           16.01|
|          8|    696590|       13.85|        2.56|           16.05|
|          9|    691696|       13.92|        2.52|           16.12|
|         10|    680481|       13.61|        2.57|           16.08|
|         11|    704387|       13.47|         2.

In [0]:
df_clean.createOrReplaceTempView('trips')

trips_by_hour_sql = spark.sql('''
                SELECT pickup_hour, 
                    COUNT(*) AS trip_count, 
                    ROUND(AVG(trip_duration_min), 2) AS avg_duration, 
                    ROUND(AVG(trip_distance), 2) AS avg_distance, 
                    ROUND(AVG(total_amount), 2) AS avg_total_amount 
                FROM trips 
                GROUP BY pickup_hour 
                ORDER BY pickup_hour ASC
''')

trips_by_dow_sql = spark.sql('''
                SELECT pickup_dow, 
                    COUNT(*) AS trip_count, 
                    ROUND(AVG(trip_duration_min), 2) AS avg_duration, 
                    ROUND(AVG(trip_distance), 2) AS avg_distance, 
                    ROUND(AVG(total_amount), 2) AS avg_total_amount 
                FROM trips 
                GROUP BY pickup_dow
                ORDER BY pickup_dow ASC
''')

slowest_hours_sql = spark.sql('''
                SELECT pickup_hour,
                    COUNT(*) AS trip_count,
                    ROUND(AVG(speed_mph) , 2) AS avg_speed_mph
                FROM trips
                GROUP BY pickup_hour
                ORDER BY avg_speed_mph ASC
                LIMIT 10
''')

trip_length_buckets_sql = spark.sql('''
                SELECT CASE 
                        WHEN trip_distance < 2 THEN 'short'
                        WHEN trip_distance BETWEEN 2 AND 6 THEN 'medium'
                        ELSE 'long'
                           END AS trip_length_bucket,
                    COUNT(*) AS trip_count, 
                    ROUND(AVG(trip_duration_min), 2) AS avg_duration, 
                    ROUND(AVG(total_amount), 2) AS avg_total_amount,
                    ROUND(AVG(speed_mph) , 2) AS avg_speed_mph
                FROM trips
                GROUP BY trip_length_bucket
''')

In [0]:

trips_by_hour.write.mode("overwrite").format("delta").saveAsTable('workspace.nyt_gold.trips_by_hour')
trips_by_dow.write.mode("overwrite").format("delta").saveAsTable('workspace.nyt_gold.trips_by_dow')
payment_stats.write.mode("overwrite").format("delta").saveAsTable('workspace.nyt_gold.payment_stats')
speed_by_hour.write.mode("overwrite").format("delta").saveAsTable('workspace.nyt_gold.speed_by_hour')
slowest_hours_sql.write.mode("overwrite").format("delta").saveAsTable('workspace.nyt_gold.slowest_hours')
trip_length_buckets_sql.write.mode("overwrite").format("delta").saveAsTable('workspace.nyt_gold.trip_length_buckets')