In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Question 1

In [2]:
spark.version

'3.3.2'

Installed Spark version is 3.3.2.

## Question 2

In [3]:
from pyspark.sql import types

schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True)
    ]
)

In [31]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [5]:
df = df.repartition(12)

In [6]:
df.write.parquet('fhvhv/2021/06/', mode='overwrite')

Files are approximately 24,200 KB, or ~24 MB.

## Question 3

In [8]:
df.registerTempTable('fhv_june')

In [16]:
df_result = spark.sql("""
SELECT
    date_trunc('day', pickup_datetime) AS day,
    count(1)
FROM
    fhv_june
GROUP BY
    1
ORDER BY
    1
""").show()

+-------------------+--------+
|                day|count(1)|
+-------------------+--------+
|2021-06-01 00:00:00|  417375|
|2021-06-02 00:00:00|  457339|
|2021-06-03 00:00:00|  521408|
|2021-06-04 00:00:00|  538917|
|2021-06-05 00:00:00|  604903|
|2021-06-06 00:00:00|  522753|
|2021-06-07 00:00:00|  425771|
|2021-06-08 00:00:00|  462554|
|2021-06-09 00:00:00|  483353|
|2021-06-10 00:00:00|  504108|
|2021-06-11 00:00:00|  549286|
|2021-06-12 00:00:00|  591339|
|2021-06-13 00:00:00|  509039|
|2021-06-14 00:00:00|  426672|
|2021-06-15 00:00:00|  452470|
|2021-06-16 00:00:00|  479776|
|2021-06-17 00:00:00|  497133|
|2021-06-18 00:00:00|  540056|
|2021-06-19 00:00:00|  601189|
|2021-06-20 00:00:00|  491630|
+-------------------+--------+
only showing top 20 rows



452,470 trips on June 15.

## Question 4

In [25]:
df_result = spark.sql("""
SELECT
    dropoff_datetime - pickup_datetime AS duration
FROM
    fhv_june
ORDER BY
    1 DESC
""").show()

+--------------------+
|            duration|
+--------------------+
|INTERVAL '2 18:52...|
|INTERVAL '1 01:32...|
|INTERVAL '0 19:58...|
|INTERVAL '0 18:11...|
|INTERVAL '0 16:28...|
|INTERVAL '0 14:16...|
|INTERVAL '0 13:54...|
|INTERVAL '0 11:40...|
|INTERVAL '0 11:21...|
|INTERVAL '0 10:59...|
|INTERVAL '0 10:16...|
|INTERVAL '0 09:57...|
|INTERVAL '0 09:57...|
|INTERVAL '0 09:38...|
|INTERVAL '0 09:37...|
|INTERVAL '0 09:28...|
|INTERVAL '0 09:28...|
|INTERVAL '0 09:24...|
|INTERVAL '0 09:23...|
|INTERVAL '0 09:22...|
+--------------------+
only showing top 20 rows



In [21]:
longest_hours = round(2*24 + 18 + 52/60,2)
print('The longest trip was {} hours.'.format(longest_hours))

The longest trip was 66.87 hours.


## Question 5

The Spark User Interface runs on local port 4040.

## Question 6

In [29]:
df_zone = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [35]:
df_zone = df_zone.withColumn("LocationID", df_zone.LocationID.cast(types.IntegerType()))

df_zone.schema

StructType([StructField('LocationID', IntegerType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [37]:
df_result = spark.sql("""
SELECT
    PULocationID,
    count(1) AS count
FROM
    fhv_june
GROUP BY
    1
ORDER BY
    2 DESC
""").show()

+------------+------+
|PULocationID| count|
+------------+------+
|          61|231279|
|          79|221244|
|         132|188867|
|          37|187929|
|          76|186780|
|         231|164344|
|         138|161596|
|         234|158937|
|         249|154698|
|           7|152493|
|         148|151020|
|          68|147673|
|          42|146402|
|         255|143683|
|         181|143594|
|         225|141427|
|          48|139611|
|         246|139431|
|          17|138428|
|         170|137879|
+------------+------+
only showing top 20 rows



Location ID 61 is the most frequent pickup location. Per the lookup table, this equates to Zone 'Crown Heights North'. However, it is possible that a Zone has more than 1 LocationIDs within it. Therefore, we should join the tables and group by zone to get accurate counts.

In [43]:
df_join = df.join(df_zone, df.PULocationID == df_zone.LocationID)

In [44]:
df_join.write.parquet('fhvhv/2021/06/zone/', mode='overwrite')

In [45]:
df_join.registerTempTable('fhv_june_zone')



In [46]:
df_result = spark.sql("""
SELECT
    Zone,
    count(1) AS count
FROM
    fhv_june_zone
GROUP BY
    1
ORDER BY
    2 DESC
""").show()

+--------------------+------+
|                Zone| count|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
|     Lower East Side|151020|
|        East Chelsea|147673|
|Central Harlem North|146402|
|Williamsburg (Nor...|143683|
|          Park Slope|143594|
|  Stuyvesant Heights|141427|
|        Clinton East|139611|
|West Chelsea/Huds...|139431|
|             Bedford|138428|
|         Murray Hill|137879|
+--------------------+------+
only showing top 20 rows



This confirms Crown Heights North as most frequent pickup location zone.