In [0]:
# Part 1: Data Exploration

# Register silver and gold DataFrames as temporary views
spark.read.format("delta").load("dbfs:/nyc-taxi/silver/").createOrReplaceTempView("nyc_taxi_silver")
spark.read.format("delta").load("dbfs:/nyc-taxi/gold/").createOrReplaceTempView("nyc_taxi_gold")

# 1.1: Top 10 longest trips by distance
spark.sql("""
  SELECT *
  FROM nyc_taxi_silver
  ORDER BY trip_distance DESC
  LIMIT 10
""").show()

# 1.2: Average fare amount by passenger count
spark.sql("""
  SELECT passenger_count, ROUND(AVG(fare_amount), 2) AS avg_fare
  FROM nyc_taxi_silver
  GROUP BY passenger_count
  ORDER BY passenger_count
""").show()

# 1.3: Trip count by pickup hour
spark.sql("""
  SELECT HOUR(tpep_pickup_datetime) AS pickup_hour, COUNT(*) AS trip_count
  FROM nyc_taxi_silver
  GROUP BY pickup_hour
  ORDER BY pickup_hour
""").show()

# 1.4: Count nulls in key columns
spark.sql("""
  SELECT 
    SUM(CASE WHEN passenger_count IS NULL THEN 1 ELSE 0 END) AS null_passenger_count,
    SUM(CASE WHEN trip_distance IS NULL THEN 1 ELSE 0 END) AS null_trip_distance,
    SUM(CASE WHEN fare_amount IS NULL THEN 1 ELSE 0 END) AS null_fare_amount
  FROM nyc_taxi_silver
""").show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|         source_file|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+
|       2| 2023-01-01 19:40:43|  2023-01-01 20:21:48|            1.0|     62359.52|       4.0|                 N|          7

In [0]:
# Part 2: Feature Engineering

from pyspark.sql.functions import hour, dayofweek, col, unix_timestamp, round as round_, when

silver_df = spark.read.format("delta").load("dbfs:/nyc-taxi/silver/")

# 2.1: Derive new time-based and distance features
featurized_df = silver_df \
    .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
    .withColumn("pickup_dayofweek", dayofweek("tpep_pickup_datetime")) \
    .withColumn("trip_duration_minutes", 
                (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60) \
    .withColumn("fare_per_mile", round_(col("fare_amount") / col("trip_distance"), 2)) \
    .withColumn("tip_per_mile", round_(col("tip_amount") / col("trip_distance"), 2)) \
    .withColumn("long_trip", when(col("trip_distance") > 10, 1).otherwise(0))

# 2.2: Display schema and sample
featurized_df.printSchema()
featurized_df.select("pickup_hour", "trip_duration_minutes", "fare_per_mile", "long_trip").show(5)

root
 |-- vendorid: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- ratecodeid: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pulocationid: long (nullable = true)
 |-- dolocationid: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- source_file: string (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-

In [0]:
%sql
-- Part 3: Visualizations

-- 3.1 Histogram of trip_distance (use visualization as histogram)
SELECT trip_distance
FROM nyc_taxi_silver
WHERE trip_distance <= 20;


trip_distance
0.97
1.1
2.51
1.9
1.43
1.84
1.66
11.7
2.95
3.01


Databricks visualization. Run in Databricks to view.

In [0]:
trip_distance_df = _sqldf

In [0]:
%sql
-- 3.2 Distribution of passenger_count
SELECT passenger_count, COUNT(*) AS num_trips
FROM nyc_taxi_silver
GROUP BY passenger_count
ORDER BY passenger_count;

passenger_count,num_trips
0.0,49986
1.0,2231083
2.0,447158
3.0,105231
4.0,52612
5.0,42537
6.0,28037
7.0,5
8.0,7


Databricks visualization. Run in Databricks to view.

In [0]:
passenger_dist_df = _sqldf

In [0]:
%sql
-- 3.3 Average trip_distance per passenger_count
SELECT passenger_count, ROUND(AVG(trip_distance), 2) AS avg_distance
FROM nyc_taxi_silver
GROUP BY passenger_count
ORDER BY passenger_count;

passenger_count,avg_distance
0.0,2.83
1.0,3.38
2.0,3.97
3.0,3.7
4.0,3.89
5.0,3.29
6.0,3.26
7.0,5.09
8.0,7.93


Databricks visualization. Run in Databricks to view.

In [0]:
avg_distance_df = _sqldf

In [0]:
# Part 4: ML Feature Preparation

# 4.1: Drop nulls and select relevant columns for ML
ml_df = featurized_df.select(
    "trip_distance", "trip_duration_minutes", "pickup_hour", "pickup_dayofweek",
    "passenger_count", "fare_amount", "tip_amount", "fare_per_mile", "long_trip"
).dropna()

# 4.2: Save to curated ML features Delta table
ml_features_path = "dbfs:/nyc-taxi/ml_features/"
ml_df.write.format("delta").mode("overwrite").save(ml_features_path)

# Register table
spark.sql("DROP TABLE IF EXISTS nyc_taxi_ml_features")
spark.sql(f"""
    CREATE TABLE nyc_taxi_ml_features 
    USING DELTA 
    LOCATION '{ml_features_path}'
""")

print("ML feature set saved and registered as table nyc_taxi_ml_features")

ML feature set saved and registered as table nyc_taxi_ml_features
