In [7]:
!pip install pyspark




In [10]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-07 04:20:19--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.154.99.225, 18.154.99.47, 18.154.99.177, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.154.99.225|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-07 04:20:20 (138 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [24]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types

spark = SparkSession.builder \
    .appName("YellowTaxi") \
    .getOrCreate()

print(spark.version)


3.5.5


In [44]:
df_yellow = spark.read.parquet("yellow_tripdata_2024-10.parquet")
df_yellow.printSchema()
df_yellow.show(10)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------

In [25]:
df_yellow = df_yellow.repartition(4)  # Repartitioning into 4 partitions
df_yellow.write.mode("overwrite").parquet("output/yellow_tripdata")

In [31]:
!ls -lh output/yellow_tripdata

total 93M
-rw-r--r-- 1 root root 24M Mar  7 04:37 part-00000-0a606d33-92df-434c-8ed0-99f32af1b85d-c000.snappy.parquet
-rw-r--r-- 1 root root 24M Mar  7 04:37 part-00001-0a606d33-92df-434c-8ed0-99f32af1b85d-c000.snappy.parquet
-rw-r--r-- 1 root root 24M Mar  7 04:37 part-00002-0a606d33-92df-434c-8ed0-99f32af1b85d-c000.snappy.parquet
-rw-r--r-- 1 root root 24M Mar  7 04:37 part-00003-0a606d33-92df-434c-8ed0-99f32af1b85d-c000.snappy.parquet
-rw-r--r-- 1 root root   0 Mar  7 04:37 _SUCCESS


In [33]:
df_yellow.registerTempTable('trips_data')
spark.sql("""
Select count(*) from trips_data where date(tpep_pickup_datetime) == '2024-10-15'
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [35]:
spark.sql("""
select (UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / 3600 AS trip_duration_hours
from trips_data
order by trip_duration_hours desc
limit 1
""").show()

+-------------------+
|trip_duration_hours|
+-------------------+
| 162.61777777777777|
+-------------------+



In [36]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-07 04:57:48--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.154.99.47, 18.154.99.220, 18.154.99.225, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.154.99.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv.1’


2025-03-07 04:57:48 (119 MB/s) - ‘taxi_zone_lookup.csv.1’ saved [12331/12331]



In [40]:
df_lookup = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema = True)
df_lookup.show(10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 10 rows



In [43]:
df_lookup.createOrReplaceTempView("zone_lookup")
spark.sql("""
select zone
from zone_lookup z
join trips_data t
on z.LocationID = t.PULocationID
group by zone
order by count(*) asc
limit 1
""").show()


+--------------------+
|                zone|
+--------------------+
|Governor's Island...|
+--------------------+

