In [1]:
from pyspark.sql import SparkSession

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("NYC Taxi Trip ETL") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/05 01:05:11 WARN Utils: Your hostname, codespaces-c87dc6, resolves to a loopback address: 127.0.0.1; using 10.0.4.242 instead (on interface eth0)
25/09/05 01:05:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/05 01:05:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load the yellow trip data from parquet
trip_df = spark.read.parquet("data/raw/yellow_tripdata_2023-01.parquet")

                                                                                

In [4]:
# Show the schema and sample rows
trip_df.printSchema()
trip_df.show(5)

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



[Stage 1:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

                                                                                

In [5]:
# Load the zone lookup table
zone_df = spark.read.option("header", "true").csv("data/raw/taxi_zone_lookup.csv")

zone_df.printSchema()
zone_df.show(5)

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows


In [6]:
# Rename columns to avoid confusion
zone_df = zone_df.withColumnRenamed("LocationID", "pickup_location_id") \
                 .withColumnRenamed("Borough", "pickup_borough") \
                 .withColumnRenamed("Zone", "pickup_zone")

In [7]:
# Join pickup location
trip_df = trip_df.join(zone_df, trip_df.PULocationID == zone_df.pickup_location_id, "left")

trip_df.select("PULocationID", "pickup_zone", "pickup_borough", "tpep_pickup_datetime").show(5)

+------------+-----------------+--------------+--------------------+
|PULocationID|      pickup_zone|pickup_borough|tpep_pickup_datetime|
+------------+-----------------+--------------+--------------------+
|         161|   Midtown Center|     Manhattan| 2023-01-01 00:32:10|
|          43|     Central Park|     Manhattan| 2023-01-01 00:55:08|
|          48|     Clinton East|     Manhattan| 2023-01-01 00:25:04|
|         138|LaGuardia Airport|        Queens| 2023-01-01 00:03:48|
|         107|         Gramercy|     Manhattan| 2023-01-01 00:10:29|
+------------+-----------------+--------------+--------------------+
only showing top 5 rows


In [8]:
trip_df.write.mode("overwrite").parquet("output/parquet/yellow_tripdata_enriched")

                                                                                

In [9]:
df = spark.read.parquet("output/parquet/yellow_tripdata_enriched")
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+--------------+-----------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|pickup_location_id|pickup_borough|      pickup_zone|service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+--------------+-----------------+------

In [10]:
from pyspark.sql.functions import col, unix_timestamp, round

# Calculate trip duration in minutes
trip_df = trip_df.withColumn(
    "trip_duration_minutes",
    round((unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60, 2)
)

trip_df.select("tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_duration_minutes").show(5)

+--------------------+---------------------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_duration_minutes|
+--------------------+---------------------+---------------------+
| 2023-01-01 00:32:10|  2023-01-01 00:40:36|                 8.43|
| 2023-01-01 00:55:08|  2023-01-01 01:01:27|                 6.32|
| 2023-01-01 00:25:04|  2023-01-01 00:37:49|                12.75|
| 2023-01-01 00:03:48|  2023-01-01 00:13:25|                 9.62|
| 2023-01-01 00:10:29|  2023-01-01 00:21:19|                10.83|
+--------------------+---------------------+---------------------+
only showing top 5 rows


In [11]:
# Filter out bad rows
trip_df_cleaned = trip_df.filter(
    (col("tpep_pickup_datetime").isNotNull()) &
    (col("tpep_dropoff_datetime").isNotNull()) &
    (col("trip_duration_minutes") > 1) &
    (col("trip_duration_minutes") < 180) &
    (col("fare_amount") > 0) &
    (col("trip_distance") > 0)
)

trip_df_cleaned.select("trip_duration_minutes", "fare_amount", "trip_distance").show(5)

+---------------------+-----------+-------------+
|trip_duration_minutes|fare_amount|trip_distance|
+---------------------+-----------+-------------+
|                 8.43|        9.3|         0.97|
|                 6.32|        7.9|          1.1|
|                12.75|       14.9|         2.51|
|                 9.62|       12.1|          1.9|
|                10.83|       11.4|         1.43|
+---------------------+-----------+-------------+
only showing top 5 rows


In [12]:
from pyspark.sql.functions import year, month, dayofmonth, hour

trip_df_cleaned = trip_df_cleaned.withColumn("pickup_year", year("tpep_pickup_datetime")) \
                                 .withColumn("pickup_month", month("tpep_pickup_datetime")) \
                                 .withColumn("pickup_day", dayofmonth("tpep_pickup_datetime")) \
                                 .withColumn("pickup_hour", hour("tpep_pickup_datetime"))

trip_df_cleaned.select("pickup_year", "pickup_month", "pickup_day", "pickup_hour").show(5)

+-----------+------------+----------+-----------+
|pickup_year|pickup_month|pickup_day|pickup_hour|
+-----------+------------+----------+-----------+
|       2023|           1|         1|          0|
|       2023|           1|         1|          0|
|       2023|           1|         1|          0|
|       2023|           1|         1|          0|
|       2023|           1|         1|          0|
+-----------+------------+----------+-----------+
only showing top 5 rows


In [13]:
trip_df_cleaned.write.mode("overwrite").parquet("output/cleaned/yellow_tripdata_cleaned")

25/09/05 01:05:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [14]:
top_pickup_zones = trip_df_cleaned.groupBy("pickup_zone") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

top_pickup_zones.show()



+--------------------+------+
|         pickup_zone| count|
+--------------------+------+
|         JFK Airport|152822|
|Upper East Side S...|145786|
|Upper East Side N...|136259|
|      Midtown Center|133025|
|Penn Station/Madi...|107407|
|        Midtown East|103540|
| Lincoln Square East| 98463|
|Times Sq/Theatre ...| 96761|
|   LaGuardia Airport| 87724|
|         Murray Hill| 86601|
+--------------------+------+



                                                                                

In [15]:
top_pickup_zones.write.mode("overwrite").csv("output/summaries/top_10_pickup_zones", header=True)

                                                                                

In [16]:
avg_fare_by_borough = trip_df_cleaned.groupBy("pickup_borough") \
    .agg({"fare_amount": "avg"}) \
    .withColumnRenamed("avg(fare_amount)", "avg_fare") \
    .orderBy(col("avg_fare").desc())

avg_fare_by_borough.show()



+--------------+------------------+
|pickup_borough|          avg_fare|
+--------------+------------------+
|           EWR| 87.60538461538462|
|           N/A| 66.04131736526946|
| Staten Island|58.477109004739475|
|        Queens| 52.32474880674893|
|         Bronx| 30.31594920221405|
|       Unknown| 27.12036107418224|
|      Brooklyn|26.334792620865493|
|     Manhattan|14.904216290651584|
+--------------+------------------+



                                                                                

In [17]:
avg_fare_by_borough.write.mode("overwrite").csv("output/summaries/avg_fare_by_borough", header=True)

                                                                                

In [18]:
trips_by_hour = trip_df_cleaned.groupBy("pickup_hour") \
    .count() \
    .orderBy("pickup_hour")

trips_by_hour.show()



+-----------+------+
|pickup_hour| count|
+-----------+------+
|          0| 82276|
|          1| 57754|
|          2| 40431|
|          3| 26143|
|          4| 16614|
|          5| 16946|
|          6| 42299|
|          7| 84563|
|          8|114053|
|          9|127969|
|         10|140214|
|         11|150526|
|         12|165696|
|         13|174163|
|         14|186726|
|         15|191373|
|         16|190672|
|         17|204378|
|         18|210840|
|         19|188333|
+-----------+------+
only showing top 20 rows


                                                                                

In [19]:
trips_by_hour.write.mode("overwrite").csv("output/summaries/trips_by_hour", header=True)

                                                                                