In [None]:
# Import libraries
import pyspark
form pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [None]:
# Create spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('nyc-taxi') \
    .getOrCreate()

In [None]:
# pysprk version
print(pyspark.__version__)

In [None]:
%%capture

# Download the Parquet file for Yellow Taxi Trip Data from October 2024
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet -O yellow_tripdata_2025-11.parquet

**Analysis of Partitioned Parquet File Sizes**
This section examines the average size (in megabytes) of all files generated with the .parquet extension, focusing on determining which value most closely represents the typical file size among those created.

In [None]:
# Read the downloaded Parquet file into a Spark DataFrame
df_yellow = spark.read.parquet('yellow_tripdata_2025-11.parquet')

In [None]:
df_yellow.show()

In [None]:
# Repartition the DataFrame to 4 partitions and save it as Parquet files to a specified directory
df = df_yellow \
          .repartition(4) \
          .write.mode("overwrite").parquet('yellow/2025/11/')

**Analysis of November 15th Taxi Trips**

This section focuses on counting the total number of taxi trips that commenced on the 15th of November, considering only trips with a pickup date within that specific day.

In [None]:
# Create a temporary SQL view from the DataFrame
df_yellow.createOrReplaceTempView("yellow_tripdata_2025_11")

# Execute a Spark SQL query to count trips on November 15th, 2025
df_result = spark.sql("""
SELECT
    COUNT(*) AS trips_count
FROM
    `yellow_tripdata_2025_11`
WHERE
    tpep_pickup_datetime >= "2025-11-15 00:00:00"
    AND
    tpep_pickup_datetime < "2025-11-16 00:00:00"
""").show()

In [None]:
from pyspark.sql.functions import col

# Filter the DataFrame to include only trips on November 15th, 2025 using PySpark API
df_nov_15_pyspark = df_yellow.filter(
    (col("tpep_pickup_datetime") >= "2025-11-15 00:00:00") &
    (col("tpep_pickup_datetime") < "2025-11-16 00:00:00")
)

# Count the number of trips on October 15th
trips_count_pyspark = df_nov_15_pyspark.count()

# Print the count of trips
print(f"Number of taxi trips on November 15th (PySpark API): {trips_count_pyspark}")

**Analysis of the Longest Trip Duration**
This section identifies the maximum trip length recorded within the dataset, measuring the duration in hours to determine which trip represents the longest continuous travel time.

In [None]:
# Execute a Spark SQL query to find the longest trip duration in hours
df_result = spark.sql("""
SELECT
    MAX((UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / 3600) AS longest_trip_hours
FROM
    `yellow_tripdata_2025_11`
""").show()

In [None]:
from pyspark.sql.functions import col, unix_timestamp, max

# Calculate trip duration in seconds by subtracting pickup timestamp from dropoff timestamp
df_with_duration = df_yellow.withColumn(
    "duration_seconds",
    unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))
)

# Convert duration from seconds to hours
df_with_duration_hours = df_with_duration.withColumn(
    "duration_hours",
    col("duration_seconds") / 3600
)

# Find the maximum duration in hours from the DataFrame
longest_trip_hours = df_with_duration_hours.select(max("duration_hours")).collect()[0][0]

# Print the length of the longest trip, formatted to two decimal places
print(f"The length of the longest trip in the dataset is: {longest_trip_hours:.2f} hours")

**Analysis of Spark Application Interface**
This section focuses on identifying the local port used by Spark’s built‑in User Interface, which provides the application dashboard for monitoring job execution and cluster activity.

In [None]:
# Retrieve the Spark UI port from the SparkSession configuration
spark_ui_url = spark.sparkContext.uiWebUrl
print(f"Spark UI is running on: {spark_ui_url}")

**Analysis of Least Frequent Pickup Zone**
This section focuses on identifying the pickup location zone with the lowest trip frequency. Using the zone lookup data—loaded into a temporary Spark view—and the Yellow Taxi trip data from October 2024, the objective is to determine which zone name appears least often as a pickup location in the dataset.

In [None]:
%%capture

# Download the taxi zone lookup CSV file
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv -O taxi_zone_lookup.csv

In [None]:
# Read the taxi zone lookup CSV file into a Spark DataFrame, inferring schema and assuming header
df_lookup = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [None]:
# Create a temporary SQL view named 'taxi_zone_lookup' from the DataFrame 'df_lookup'
df_lookup.createOrReplaceTempView("taxi_zone_lookup")

In [None]:
df_result_sql = spark.sql("""
SELECT
    tz.Zone,
    COUNT(yt.PULocationID) AS trip_count
FROM
    `yellow_tripdata_2025_11` yt
JOIN
    `taxi_zone_lookup` tz ON yt.PULocationID = tz.LocationID
GROUP BY
    tz.Zone
ORDER BY
    trip_count ASC
LIMIT 1
""").show()

In [1]:
from pyspark.sql.functions import col, count

# Join the yellow trip data DataFrame with the zone lookup DataFrame on PULocationID and LocationID
df_joined = df_yellow.join(
    df_lookup,
    df_yellow.PULocationID == df_lookup.LocationID,
    "left" # Use a left join to keep all records from the yellow trip data
)

# Group the joined DataFrame by 'Zone' and count the number of trips (PULocationID) in each zone
df_zone_counts = df_joined.groupBy("Zone").agg(count("PULocationID").alias("trip_count"))

# Order the zones by trip count in ascending order and get the first row (least frequent zone)
least_frequent_zone = df_zone_counts.orderBy(col("trip_count")).first()

# Check if a least frequent zone was found and print the result
if least_frequent_zone:
    print(f"The LEAST frequent pickup location Zone is: {least_frequent_zone['Zone']} with {least_frequent_zone['trip_count']} trips.")
else:
    print("Could not determine the least frequent pickup location zone.")

NameError: name 'df_yellow' is not defined