Perform some SQL queries on data using pyspark SQL. 

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/08 20:12:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/08 20:12:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Import partitioned data from parquet 

In [3]:
parquet_dir = "./data/parquet/*"
oct_yellow_taxi_data = spark.read.parquet(parquet_dir)

                                                                                

In [8]:
oct_yellow_taxi_data.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-12 04:16:59|  2024-10-12 04:24:37|              2|         2.07|         1|                 N|         125|         100|           1|       10.7|  1.0|    0.5|       2.

Question: How many taxi trips were there on the 15th of October?
Consider only trips that started on the 15th of October.

In [13]:
sql_query = """
SELECT COUNT(*)
FROM oct_yellow_taxi
WHERE tpep_pickup_datetime >= "2024-10-15 00:00" AND tpep_pickup_datetime < "2024-10-16 00:00"
"""

In [14]:
oct_yellow_taxi_data.registerTempTable('oct_yellow_taxi')  # register the df as a SQL table 
spark.sql(sql_query).show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



sanity check - count total number of taxi rides in oct 2024 

In [16]:
sql_query = """
SELECT COUNT(*)
FROM oct_yellow_taxi
"""
spark.sql(sql_query).show()

+--------+
|count(1)|
+--------+
| 3833771|
+--------+



What is the length of the longest trip in the dataset in hours?

In [22]:
# unix_timestamp() returns the seconds from a unix-epoch time (jan 1 1970). 
# Then, subtracting gives us the seconds between pickup and dropoff and just needs conversion from seconds -> hours 

sql_query = """
WITH oct_taxi_total_time AS (
    SELECT (unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 3600  AS total_time_hrs
    FROM oct_yellow_taxi
)
SELECT total_time_hrs 
FROM oct_taxi_total_time
ORDER BY total_time_hrs DESC 
LIMIT 5 
"""
spark.sql(sql_query).show()

[Stage 19:>                                                         (0 + 2) / 2]

+------------------+
|    total_time_hrs|
+------------------+
|162.61777777777777|
|           143.325|
|137.76055555555556|
|114.83472222222223|
| 89.89833333333333|
+------------------+



                                                                                

What is the name of the LEAST frequent pickup location Zone?

In [23]:
data_dir = "./data/taxi_zone_lookup.csv"
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv -O $data_dir

--2025-03-08 20:36:30--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.72, 54.230.209.126, 54.230.209.200, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.72|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘./data/taxi_zone_lookup.csv’


2025-03-08 20:36:31 (162 MB/s) - ‘./data/taxi_zone_lookup.csv’ saved [12331/12331]



In [27]:
# read dataframe and show first few rows
zone_data_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(data_dir)
zone_data_df.show()

# register the zone data df as a SQL table 
zone_data_df.registerTempTable('zone_data')  

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [40]:
sql_query = """
SELECT COUNT(*), zone_data.Zone
FROM oct_yellow_taxi 
JOIN zone_data
ON zone_data.LocationID = oct_yellow_taxi.PULocationID
GROUP BY zone_data.Zone
ORDER BY COUNT(*) ASC
"""

In [41]:
spark.sql(sql_query).show()



+--------+--------------------+
|count(1)|                Zone|
+--------+--------------------+
|       1|Governor's Island...|
|       2|       Arden Heights|
|       2|       Rikers Island|
|       3|         Jamaica Bay|
|       3| Green-Wood Cemetery|
|       4|Charleston/Totten...|
|       4|Eltingville/Annad...|
|       4|   Rossville/Woodrow|
|       4|       Port Richmond|
|       4|       West Brighton|
|       6|        Crotona Park|
|       6|         Great Kills|
|       7|Heartland Village...|
|       7|     Mariners Harbor|
|       9|Saint George/New ...|
|       9|             Oakwood|
|      10|New Dorp/Midland ...|
|      10|       Broad Channel|
|      12|         Westerleigh|
|      12|     Pelham Bay Park|
+--------+--------------------+
only showing top 20 rows



                                                                                