In [43]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

from pyspark.sql import types

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

#### Data Preparation

In [None]:
fhvhv_schema = types.StructType([
    types.StructField("Hvfhs_license_num", types.StringType(), True),
    types.StructField("Dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.IntegerType(), True),
])

year = 2021
month = 2

input_path = 'fhvhv_tripdata_2021-02.csv'
output_path = f'data/pq/fhvhv/{year}/{month:02d}/'

df_fhvhv = spark.read \
    .option("header", "true") \
    .schema(fhvhv_schema) \
    .csv(input_path)

df_fhvhv \
    .repartition(24) \
    .write.parquet(output_path)

df_fhvhv = spark.read.parquet('data/pq/fhvhv/*/*')

df_fhvhv.registerTempTable('fhvhv_trips_data')

In [None]:
taxi_zone_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)
])

input_path = 'taxi+_zone_lookup.csv'
output_path = f'data/taxi_zone/'

df_taxi_zone = spark.read \
    .option("header", "true") \
    .schema(taxi_zone_schema) \
    .csv(input_path)

df_taxi_zone \
    .write.parquet(output_path)

df_taxi_zone.registerTempTable('taxi_zone_data')

#### Question 3

In [14]:
spark.sql("""
SELECT
    COUNT(*)
FROM
    fhvhv_trips_data
WHERE
    CAST(pickup_datetime AS DATE)='2021-02-15'
""").show()

[Stage 8:>                                                          (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

#### Question 4

In [35]:
spark.sql("""
SELECT
    CAST(pickup_datetime AS DATE)
FROM
    fhvhv_trips_data
ORDER BY
    unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime) DESC
LIMIT
    1
""").show()

[Stage 22:>                                                         (0 + 4) / 4]

+---------------+
|pickup_datetime|
+---------------+
|     2021-02-11|
+---------------+



                                                                                

#### Question 5

In [41]:
spark.sql("""
SELECT
    Dispatching_base_num, COUNT(*)
FROM
    fhvhv_trips_data
GROUP BY Dispatching_base_num
ORDER BY COUNT(*) DESC
""").show()

+--------------------+--------+
|Dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
|              B02869|  429720|
|              B02887|  322331|
|              B02871|  312364|
|              B02864|  311603|
|              B02866|  311089|
|              B02878|  305185|
|              B02682|  303255|
|              B02617|  274510|
|              B02883|  251617|
|              B02884|  244963|
|              B02882|  232173|
|              B02876|  215693|
|              B02879|  210137|
|              B02867|  200530|
|              B02877|  198938|
+--------------------+--------+
only showing top 20 rows



#### Question 6

In [71]:
spark.sql("""
SELECT
    ftd.PULocationID, ftd.DOLocationID, COUNT(*)
FROM
    fhvhv_trips_data ftd
GROUP BY ftd.PULocationID, ftd.DOLocationID
ORDER BY COUNT(*) DESC
    
""").show()



+------------+------------+--------+
|PULocationID|DOLocationID|count(1)|
+------------+------------+--------+
|          76|          76|   45041|
|          26|          26|   37329|
|          39|          39|   28026|
|          61|          61|   25976|
|          14|          14|   17934|
|           7|           7|   14688|
|         129|         129|   14688|
|          42|          42|   14481|
|          37|          37|   14424|
|          89|          89|   13976|
|         216|         216|   13716|
|          35|          35|   12829|
|         132|         265|   12542|
|         188|          61|   11814|
|          95|          95|   11548|
|          36|          37|   11491|
|          37|          36|   11487|
|          61|         188|   11462|
|          61|         225|   11342|
|         188|         188|   11308|
+------------+------------+--------+
only showing top 20 rows



                                                                                