## Question 1

In [3]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [4]:
df.write.parquet('zones')

In [5]:
spark.version

'3.3.2'

## Question 2

In [8]:
from pyspark.sql import types

In [9]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [10]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [11]:
# Repartitions the DataFrame
df = df.repartition(6)

In [12]:
df.write.parquet('fhv/2019/10/')

In [13]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B01437|2019-10-02 20:29:31|2019-10-02 20:52:31|         264|         173|   null|                B01437|
|              B00900|2019-10-01 01:32:17|2019-10-01 01:35:28|         264|         258|   null|                B00900|
|              B02285|2019-10-02 19:55:00|2019-10-02 21:07:00|         264|         264|   null|                B02285|
|              B03051|2019-10-01 08:20:51|2019-10-01 08:33:39|         264|         157|   null|                B03051|
|              B01626|2019-10-02 23:54:09|2019-10-02 23:58:49|         264|         130|   null|                B01626|
|              B00821|2019-10-01 10:34:5

## Question 3

In [19]:
from pyspark.sql.functions import col
from pyspark.sql.functions import to_date

In [20]:
df = df.withColumn('pickup_date', to_date('pickup_datetime'))

In [21]:
oct_15_trips = df.filter(col("pickup_date") == "2019-10-15")

# Count the number of taxi trips on October 15th
trip_count_oct_15 = oct_15_trips.count()

In [22]:
trip_count_oct_15

62610

## Question 4

In [24]:
df \
    .withColumn('duration_hours', (df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long')) / 3600) \
    .agg({"duration_hours": "max"}) \
    .collect()

[Row(max(duration_hours)=631152.5)]

## Question 6

In [25]:
schema_zones = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [26]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(schema_zones) \
    .csv('taxi_zone_lookup.csv')

In [27]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [32]:
df \
    .join(df_zones, df["PULocationID"] == df_zones["LocationID"]) \
    .groupBy("Zone") \
    .count() \
    .orderBy(col("count")) \
    .first()["Zone"]

'Jamaica Bay'