In [37]:
import pyspark
from pyspark.sql import SparkSession

In [38]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [39]:

!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet

--2026-02-25 14:14:19--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.160.94.99, 18.160.94.225, 18.160.94.135, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.160.94.99|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 71134255 (68M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2025-11.parquet.1’


2026-02-25 14:14:19 (94.0 MB/s) - ‘yellow_tripdata_2025-11.parquet.1’ saved [71134255/71134255]



In [40]:
df = spark.read.parquet("yellow_tripdata_2025-11.parquet")

In [42]:
import pandas as pd

In [47]:
df_head = df.limit(1001)

df_head.write.mode("overwrite").parquet("head.parquet")

In [44]:
import pyarrow.parquet as pq

In [48]:
df_pandas = pd.read_parquet("head.parquet")

In [49]:
df_pandas.dtypes

VendorID                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag                  str
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
Airport_fee                     float64
cbd_congestion_fee              float64
dtype: object

In [50]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True), StructField('cbd_congestion_fee', DoubleT

In [51]:
from pyspark.sql import types

In [52]:
df = df.repartition(4)

In [53]:
df.write.parquet('yellow/2025/11')

                                                                                

In [60]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'cbd_congestion_fee']

In [74]:
df.createOrReplaceTempView("yellow_2025_11")

In [None]:
spark.sql("""
SELECT
     count(1)
FROM
    yellow_2025_11
WHERE
    tpep_pickup_datetime >= '2025-11-15' AND
    tpep_pickup_datetime < '2025-11-16'
""").show()

In [67]:
spark.sql("""
SELECT
  (unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 3600.0 AS duration_hours
FROM yellow_2025_11
ORDER BY duration_hours DESC
LIMIT 10
""").show(truncate=False)

[Stage 29:>                                                         (0 + 4) / 4]

+--------------+
|duration_hours|
+--------------+
|90.646667     |
|76.948333     |
|76.213889     |
|69.288611     |
|67.080556     |
|63.368333     |
|56.382222     |
|48.147778     |
|47.478333     |
|45.444167     |
+--------------+



                                                                                

In [68]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2026-02-25 14:51:11--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.160.94.135, 18.160.94.10, 18.160.94.225, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.160.94.135|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2026-02-25 14:51:11 (154 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [72]:
df_zones = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [70]:
df_zones.createOrReplaceTempView("taxi_zone_lookup")

In [87]:
spark.sql("""
SELECT
    z.zone,
    count(1) AS num_trips
FROM yellow_2025_11 y
JOIN taxi_zone_lookup z
    ON y.PULocationID = z.LocationID
GROUP BY z.zone
ORDER BY num_trips ASC
LIMIT 10
""").show(truncate=False)

+---------------------------------------------+---------+
|zone                                         |num_trips|
+---------------------------------------------+---------+
|Eltingville/Annadale/Prince's Bay            |1        |
|Governor's Island/Ellis Island/Liberty Island|1        |
|Arden Heights                                |1        |
|Port Richmond                                |3        |
|Rikers Island                                |4        |
|Rossville/Woodrow                            |4        |
|Great Kills                                  |4        |
|Green-Wood Cemetery                          |4        |
|Jamaica Bay                                  |5        |
|Westerleigh                                  |12       |
+---------------------------------------------+---------+

