In [1]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, dayofmonth
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

## Spark Connect Setup

In [2]:
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()

## Load Datasets from GCS

### FHV Dataset

In [3]:
raw_fhv = spark.read.parquet("gs://iobruno-lakehouse-raw/nyc_tlc_dataset/fhv_trip_data/fhv_tripdata_2019-10.snappy.parquet")

In [4]:
raw_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

### Performance Optimization: Caching

The `fhv` DataFrame is used multiple times throughout this notebook (for counting, joins, aggregations, and writing).  
Calling `.cache()` stores the DataFrame in memory after its first computation, avoiding repeated reads from GCS on subsequent actions. The `.count()` triggers the caching immediately.

In [5]:
fhv = raw_fhv.select(
    col('dispatching_base_num'),
    col('Affiliated_base_number').alias('affiliated_base_num'),
    col('PUlocationID').alias('pickup_location_id'),
    col('DOlocationID').alias('dropoff_location_id'),
    col('pickup_datetime').cast('timestamp'),
    col('dropOff_datetime').cast('timestamp').alias('dropoff_datetime'),
    col('SR_Flag').alias('sr_flag'),
).cache()

fhv.count()  # Trigger caching

1897493

In [6]:
fhv.createOrReplaceTempView('fhv')

### Zone Lookup Dataset

In [7]:
zones_schema = StructType([
    StructField("LocationID", IntegerType(), True),
    StructField("Borough", StringType(), True),
    StructField("Zone", StringType(), True),
    StructField("service_zone", StringType(), True)
])

In [8]:
zones = spark.read.csv(
    path="gs://iobruno-lakehouse-raw/nyc_tlc_dataset/zone_lookup/*.csv.gz",
    header=True,
    inferSchema=False,
    schema=zones_schema,
)

In [9]:
zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [10]:
zones = zones.select(
    col('LocationID').alias('location_id'),
    col('Borough').alias('borough'),
    col('Zone').alias('zone'),
    col('service_zone')
)

In [11]:
zones.createOrReplaceTempView('zones')

## Assignment

### Question 

**FHV October 2019**  
- Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons
- Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)?  Select the answer which most closely matches
- [ ] 1 MB
- [x] 6 MB
- [ ] 25 MB
- [ ] 87 MB

### Performance Optimization: coalesce(n) vs repartition(n)

When reducing the number of partitions, `coalesce(n)` is more efficient than `repartition(n)`.  
Coalesce avoids a full shuffle by combining existing partitions locally, while repartition always performs a full shuffle of all data across the cluster.

Note: `coalesce` may result in uneven partition sizes; use `repartition` if even distribution is critical for downstream operations.

In [12]:
df = spark.sql("""select * from fhv""")

In [13]:
df.coalesce(6)\
    .write\
    .mode("overwrite")\
    .parquet("gs://iobruno-lakehouse-raw/tmp_output/")

### Question

**Count records**  

How many taxi trips were there on the 15th of October?  
Consider only trips that started on the 15th of October

- [ ] 108,164
- [ ] 12,856
- [ ] 452,470
- [x] 62,610

In [14]:
spark.sql("""
    select count(1) as num_trips
    from fhv
    where 
        month(pickup_datetime) = 10
        and dayofmonth(pickup_datetime) = 15
""").take(1)

[Row(num_trips=62610)]

### Question

**Longest trip for each day**  

What is the length of the longest trip in the dataset in hours?
- [x] 631,152.50 Hours
- [ ] 243.44 Hours
- [ ] 7.68 Hours
- [ ] 3.32 Hours

### Performance Optimization: max( ) vs Window function

Using `max()` is more efficient than a window function with `dense_rank()` when finding the maximum value.  
Window functions require sorting all data to a single partition (shown in the warning), while `max()` can compute in parallel across partitions and only combine the partial results.

In [15]:
spark.sql("""
    select max((cast(dropoff_datetime as long) - cast(pickup_datetime as long))/3600) as max_duration_hours
    from fhv
""").take(1)

[Row(max_duration_hours=631152.5)]

### Question 

**Least frequent pickup location zone**

Load the zone lookup data into a temp view in Spark [Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)  
Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

- [ ] East Chelsea
- [x] Jamaica Bay
- [ ] Union Sq
- [ ] Crown Heights North

### Performance Optimization: Broadcast Join

The `zones` lookup table is small (~265 rows) compared to the `fhv` dataset (1.9M rows).  When joining these tables, Spark would normally shuffle the large dataset across the cluster.  
By using a broadcast hint (`/*+ BROADCAST(zones) */`), we instruct Spark to send the small `zones` table to all worker nodes, allowing the join to happen locally without shuffling the large `fhv` dataset.

In [16]:
spark.sql("""
    with trips_per_location AS (
        select
            pickup_location_id,
            count(1) as num_trips,
            dense_rank() over (order by count(1) asc) as rnk
        from
            fhv
        group by
            pickup_location_id
    )

    select /*+ BROADCAST(pu) */
        pu.zone,
        t.num_trips,
        t.rnk
    from
        trips_per_location t
    inner join
        zones pu on t.pickup_location_id = pu.location_id
    where
        rnk = 1
""").take(1)

[Row(zone='Jamaica Bay', num_trips=1, rnk=1)]