In [2]:
# import libraries
from pyspark.sql import SparkSession, types, functions as F

In [3]:
# create a spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# access the Spark UI by navigating to localhost:4040

In [4]:
# find the version of spark
spark.version

'3.5.0'

In [5]:
# define schema for fhv taxi tripdata
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PUlocationID", types.IntegerType(), True),
    types.StructField("DOlocationID", types.IntegerType(), True),    
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [6]:
# read fhv taxi tripdata into a spark dataframe
df_fhv = spark.read \
            .option("header", "true") \
            .schema(fhv_schema) \
            .csv('data/fhv_tripdata_2019-10.csv.gz')

In [11]:
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [7]:
# repartition and write the dataframe as parquet files
df_fhv \
    .repartition(6) \
    .write.parquet('data/fhv_2019-10/', mode='overwrite')

In [9]:
import os
size = os.path.getsize('data/fhv_2019-10/part-00000-c99533ca-80b9-487f-b9ec-00920a1b6f66-c000.snappy.parquet') >> 20
print(f"The size of each partition is {size} MB")

The size of each partition is 6 MB


In [10]:
# tell spark that 'df_fhv' is a table
df_fhv.registerTempTable('fhv_trips')



In [16]:
# query the table to display the number of taxi trips on the 15th of October
spark.sql("""
SELECT count(*) as num_trips_oct_15
FROM fhv_trips
WHERE to_date(pickup_datetime) = '2019-10-15'
""").show()

+----------------+
|num_trips_oct_15|
+----------------+
|           62610|
+----------------+



In [19]:
# query the table to display the length of the longest trip in the dataset in hours
spark.sql("""
WITH trip_length AS (          
    SELECT
        (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/(60*60) as length_hours
    FROM fhv_trips
)
SELECT max(length_hours) as longest_trip_hours
FROM trip_length
""").show()

+------------------+
|longest_trip_hours|
+------------------+
|          631152.5|
+------------------+



In [22]:
# read taxi zones data
df_zones = spark.read \
                .option("header", "true") \
                .csv('data/taxi_zone_lookup.csv')

In [23]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [25]:
df_result = df_fhv.join(df_zones, df_fhv.PUlocationID == df_zones.LocationID)

In [26]:
df_result.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+-------+---------------+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|LocationID|Borough|           Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+-------+---------------+------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|       264|Unknown|             NV|         N/A|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|       264|Unknown|             NV|         N/A|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|       264|Unknown|             NV|      

In [27]:
# tell spark that 'df_result' is a table
df_result.registerTempTable('fhv_trips_zones')



In [32]:
# query the table to display the name of the least frequent pickup location zone
spark.sql("""        
SELECT
    zone as pickup_zone,
    count(*) as frequency
FROM fhv_trips_zones
GROUP BY 1
ORDER BY 2
""").show()

+--------------------+---------+
|         pickup_zone|frequency|
+--------------------+---------+
|         Jamaica Bay|        1|
|Governor's Island...|        2|
| Green-Wood Cemetery|        5|
|       Broad Channel|        8|
|     Highbridge Park|       14|
|        Battery Park|       15|
|Saint Michaels Ce...|       23|
|Breezy Point/Fort...|       25|
|Marine Park/Floyd...|       26|
|        Astoria Park|       29|
|    Inwood Hill Park|       39|
|       Willets Point|       47|
|Forest Park/Highl...|       53|
|  Brooklyn Navy Yard|       57|
|        Crotona Park|       62|
|        Country Club|       77|
|     Freshkills Park|       89|
|       Prospect Park|       98|
|     Columbia Street|      105|
|  South Williamsburg|      110|
+--------------------+---------+
only showing top 20 rows

