In [1]:
import pyspark
from pyspark.sql import SparkSession

!spark-shell --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.5
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.26
Branch HEAD
Compiled by user ubuntu on 2025-02-23T20:30:46Z
Revision 7c29c664cdc9321205a98a14858aaf8daaa19db2
Url https://github.com/apache/spark
Type --help for more information.


In [2]:
# Create SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('test-spark') \
                    .getOrCreate()

print(f'The PySpark {spark.version} version is running...')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/08 23:32:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


The PySpark 3.5.5 version is running...


## 2. Yellow October 2024

Read the October 2024 Yellow into a Spark Dataframe.

Repartition the Dataframe to 4 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

* 6MB
* 25MB
* 75MB
* 100MB


In [4]:
df = spark.read.parquet("yellow_tripdata_2024-10.parquet")

df.printSchema()
df.show(5)  


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------

In [5]:
df.repartition(4).write.mode("overwrite").parquet("yellow_tripdata_output")

                                                                                

In [6]:
import os

# get all the parquet files
parquet_files = [f for f in os.listdir("yellow_tripdata_output") if f.endswith(".parquet")]
file_sizes = [os.path.getsize(f"yellow_tripdata_output/{f}") for f in parquet_files]

# calculate the average size
average_size_mb = sum(file_sizes) / len(file_sizes) / (1024 * 1024)
print(f"Average Parquet File Size: {average_size_mb:.2f} MB")


Average Parquet File Size: 23.79 MB


## 3. Cound records
How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

* 85,567
* 105,567
* 125,567
* 145,567


In [8]:
from pyspark.sql.functions import col, to_date

trip_count = df.filter(to_date(col("tpep_pickup_datetime")) == "2024-10-15").count()

print(f"Total trips on 2024-10-15: {trip_count}")


Total trips on 2024-10-15: 128893


## 4. Longest trip
What is the length of the longest trip in the dataset in hours?

* 122
* 142
* 162
* 182

In [10]:
from pyspark.sql.functions import col, unix_timestamp

df_with_duration = df.withColumn(
    "trip_duration_hours",
    (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600
)

max_trip_duration = df_with_duration.agg({"trip_duration_hours": "max"}).collect()[0][0]

print(f"Longest trip duration: {max_trip_duration:.2f} hours")


Longest trip duration: 162.62 hours


## 6: Least frequent pickup location zone
Load the zone lookup data into a temp view in Spark:

wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

* Governor's Island/Ellis Island/Liberty Island
* Arden Heights
* Rikers Island
* Jamaica Bay


In [11]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv 

--2025-03-08 23:45:49--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:2759:2c00:b:20a5:b140:21, 2600:9000:2759:9400:b:20a5:b140:21, 2600:9000:2759:be00:b:20a5:b140:21, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2759:2c00:b:20a5:b140:21|:443... connected.
200 OKequest sent, awaiting response... 
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-08 23:45:49 (3,83 GB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [12]:
df_zones = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")
df_zones.createOrReplaceTempView("zones")
df_zones.printSchema()
df_zones.show(5)

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [15]:
from pyspark.sql.functions import col

pickup_counts = df.groupBy("PULocationID").count()
pickup_counts_with_zone = pickup_counts.join(df_zones, pickup_counts.PULocationID == df_zones.LocationID, "left").select("Zone", "count").orderBy(col("count").asc())
least_frequent_zone = pickup_counts_with_zone.limit(1).collect()[0]["Zone"]
print(f"Least frequent pickup location zone: {least_frequent_zone}")


Least frequent pickup location zone: Governor's Island/Ellis Island/Liberty Island
