In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
import os
os.environ["JAVA_HOME"] = "/Users/schwinger42/miniforge3/envs/de-zoomcamp/lib/jvm"
!echo $JAVA_HOME
!$JAVA_HOME/bin/java -version

/Users/schwinger42/miniforge3/envs/de-zoomcamp/lib/jvm
openjdk version "23.0.2" 2025-01-21
OpenJDK Runtime Environment Zulu23.32+11-CA (build 23.0.2+7)
OpenJDK 64-Bit Server VM Zulu23.32+11-CA (build 23.0.2+7, mixed mode, sharing)


In [4]:
active_session = SparkSession.getActiveSession()
if active_session:
    active_session.stop()

spark = SparkSession.builder \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .config("spark.hadoop.hadoop.security.authentication", "simple") \
    .config("spark.hadoop.hadoop.security.authorization", "false") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.sql.shuffle.partitions", "20") \
    .config("spark.eventLog.gcMetrics.youngGenerationGarbageCollectors", "G1 Young Generation") \
    .config("spark.eventLog.gcMetrics.oldGenerationGarbageCollectors", "G1 Old Generation,G1 Concurrent GC") \
    .master("local[*]") \
    .appName("module5_taxi") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 04:33:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/07 04:33:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
# show spark version
print(f"PySpark {spark.version} is running with Java at {os.environ.get('JAVA_HOME')}")

PySpark 3.5.5 is running with Java at /Users/schwinger42/miniforge3/envs/de-zoomcamp/lib/jvm


In [6]:
# download files from
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-07 04:33:22--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.39.117, 52.85.39.65, 52.85.39.97, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.39.117|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-07 04:33:29 (11.6 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]

--2025-03-07 04:33:29--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.39.65, 52.85.39.97, 52.85.39.153, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.39.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_look

In [7]:
# read the October 2024 yellow taxi data into a Spark DataFrame, and repartition it to 4 partitions and save it as a parquet file.
df = spark.read.parquet("yellow_tripdata_2024-10.parquet")
df_repartitioned = df.repartition(4)
df_repartitioned.write.mode("overwrite").parquet("yellow_taxi_repartitioned.parquet")

                                                                                

In [8]:
# Calculate average parquet file size
parquet_files = [f for f in os.listdir("yellow_taxi_repartitioned.parquet") if f.endswith('.parquet')]
total_size = sum(os.path.getsize(f"yellow_taxi_repartitioned.parquet/{f}") for f in parquet_files)
avg_size_mb = (total_size / len(parquet_files)) / (1024 * 1024)
print(f"Average parquet file size: {avg_size_mb:.2f} MB")

Average parquet file size: 22.40 MB


In [11]:
df_repartitioned.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [12]:
# show the first 5 rows of the DataFrame
df_repartitioned.show(5)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-10-07 16:40:43|  2024-10-07 18:10:56|              1|         14.8|        99|                 N|         127|         225|           1|       47.5|  0.0|    0.5|       0.

In [13]:
# count trip on October 15th, considering the tpep_pickup_datetime is at October 15th(turn it into a date format)
from pyspark.sql import functions as F
trips_oct15 = df_repartitioned.filter(F.to_date("tpep_pickup_datetime") == "2024-10-15").count()
print(f"Number of trips on October 15th: {trips_oct15}")

Number of trips on October 15th: 128893


In [16]:
# find out the longest trip in hours
longest_trip = df_repartitioned.withColumn("trip_duration_hours", (F.unix_timestamp(df_repartitioned.tpep_dropoff_datetime) - F.unix_timestamp(df_repartitioned.tpep_pickup_datetime)) / 3600).agg(F.max("trip_duration_hours").alias("max_duration")).collect()[0]["max_duration"]
print(f"Longest trip duration in hours: {longest_trip:.2f} hours")

Longest trip duration in hours: 162.62 hours


In [18]:
# find the least frequent pickup location zone from file taxi_zone_lookup.csv(need to load the file into spaprk DataFrame)
zones_df = spark.read.csv("taxi_zone_lookup.csv", header=True)
pickup_count =  df_repartitioned.groupBy("PULocationID").count()

In [20]:
zones_df.printSchema()

root
 |-- locationid: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [19]:
pickup_count.printSchema()

root
 |-- PULocationID: integer (nullable = true)
 |-- count: long (nullable = false)



In [21]:

# join with zone lookup
zone_frequencies = pickup_count.join(zones_df, pickup_count.PULocationID == zones_df.locationid).orderBy("count")
least_frequent = zone_frequencies.select("Zone", "count").first()
print(f"Least frequent zone: {least_frequent['Zone']} with {least_frequent['count']} pickups")

Least frequent zone: Governor's Island/Ellis Island/Liberty Island with 1 pickups


In [22]:
spark.stop()