In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [5]:
spark = SparkSession.builder.master("local").appName("test").getOrCreate()

### Question 1

In [6]:
spark.version

'3.3.2'

In [7]:
import pandas as pd
pd.DataFrame.iteritems = pd.DataFrame.items

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [8]:
df = pd.read_csv('fhv_tripdata_2019-10.csv.gz', compression='gzip')

In [9]:
df.columns

Index(['dispatching_base_num', 'pickup_datetime', 'dropOff_datetime',
       'PUlocationID', 'DOlocationID', 'SR_Flag', 'Affiliated_base_number'],
      dtype='object')

In [12]:
from pprint import pprint
pprint(df.dtypes)

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object


In [13]:
df.head(n=10)

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014
5,B00021,2019-10-01 00:00:48,2019-10-01 00:07:12,129.0,129.0,,B00021
6,B00021,2019-10-01 00:47:23,2019-10-01 00:53:25,57.0,57.0,,B00021
7,B00021,2019-10-01 00:10:06,2019-10-01 00:19:50,173.0,173.0,,B00021
8,B00021,2019-10-01 00:51:37,2019-10-01 01:06:14,226.0,226.0,,B00021
9,B00021,2019-10-01 00:28:23,2019-10-01 00:34:33,56.0,56.0,,B00021


In [21]:
fhv_schema = types.StructType(
    [
        types.StructField("dispatching_base_num", types.StringType(), True),
        types.StructField("pickup_datetime", types.TimestampType(), True),
        types.StructField("dropOff_datetime", types.TimestampType(), True),
        types.StructField("PULocationID", types.IntegerType(), True),
        types.StructField("DOlocationID", types.IntegerType(), True),
        types.StructField("SR_Flag", types.StringType(), True),
        types.StructField("Affiliated_base_number", types.StringType(), True),
    ]
)

In [23]:
fhv_df = spark.read.option("header", "true").schema(fhv_schema).csv('fhv_tripdata_2019-10.csv.gz')

In [24]:
fhv_df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

### Question 2

In [25]:
fhv_df.repartition(6).write.parquet('data/pq/')

                                                                                

In [26]:
fhv_data = spark.read.parquet('data/pq/*')

In [40]:
pprint(fhv_data.schema)

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])


In [27]:
fhv_data.registerTempTable('fhv')



### Question 3

In [56]:
spark.sql(
    """
SELECT
    date_trunc('day', pickup_datetime) AS pickup_day,
    COUNT(1) AS trip_records
FROM 
    fhv
WHERE 
    date_trunc('day', pickup_datetime) = DATE '2019-10-15'
GROUP BY
    date_trunc('day', pickup_datetime);
"""
).show()

[Stage 20:>                                                         (0 + 1) / 1]

+-------------------+------------+
|         pickup_day|trip_records|
+-------------------+------------+
|2019-10-15 00:00:00|       62610|
+-------------------+------------+



                                                                                

### Question 4

In [82]:
spark.sql(
    """
SELECT
    pickup_datetime, 
    dropOff_datetime,
    DATEDIFF(HOUR, pickup_datetime, dropOff_datetime) AS total_trip_distance
    
FROM 
    fhv
ORDER BY total_trip_distance DESC
LIMIT 1
"""
).show()

[Stage 42:>                                                         (0 + 1) / 1]

+-------------------+-------------------+-------------------+
|    pickup_datetime|   dropOff_datetime|total_trip_distance|
+-------------------+-------------------+-------------------+
|2019-10-11 18:00:00|2091-10-11 18:30:00|             631152|
+-------------------+-------------------+-------------------+



                                                                                

In [86]:
df_zones = pd.read_csv('taxi_zone_lookup.csv')
df_zones.head(n=10)

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
5,6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
6,7,Queens,Astoria,Boro Zone
7,8,Queens,Astoria Park,Boro Zone
8,9,Queens,Auburndale,Boro Zone
9,10,Queens,Baisley Park,Boro Zone


In [88]:
df_zones.dtypes

LocationID       int64
Borough         object
Zone            object
service_zone    object
dtype: object

In [89]:
zones_schema = types.StructType(
    [
        types.StructField("LocationID", types.IntegerType(), True),
        types.StructField("Borough", types.StringType(), True),
        types.StructField("Zone", types.StringType(), True),
        types.StructField("service_zone", types.StringType(), True),
    ]
)

In [90]:
df_zones = spark.read.option("header", "true").schema(zones_schema).csv("taxi_zone_lookup.csv")

In [91]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [92]:
df_zones.write.parquet('data/pq/zones/')

In [93]:
zones_data = spark.read.parquet('data/pq/zones/')

In [99]:
zones_data.registerTempTable('zones')



### Question 6

In [103]:
spark.sql(
    """

SELECT Zone AS pickup_zone,
    COUNT(1) AS zone_records
FROM 
    fhv
JOIN
    zones
ON
    fhv.PULocationID = zones.LocationID

GROUP BY
    Zone
ORDER BY zone_records
LIMIT 1
"""
).show()

+-----------+------------+
|pickup_zone|zone_records|
+-----------+------------+
|Jamaica Bay|           1|
+-----------+------------+

