In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Q1

In [3]:
spark

In [5]:
df = spark.read.parquet('code/data/fhvhv/')

In [6]:
df \
    .repartition(24) \
    .write.parquet('code/report/fhvhv/')

## Q2

In [7]:
!ls -lh code/report/fhvhv

total 511M
-rw-r--r-- 1 tanaw 197609   0 Jun 10 18:38 _SUCCESS
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00000-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00001-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00002-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00003-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00004-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00005-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00006-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-00007-18c51aad-6606-40f2-a0b8-19cc133c5d2b-c000.snappy.parquet
-rw-r--r-- 1 tanaw 197609 22M Jun 10 18:38 part-0

## Q3

In [24]:
from pyspark.sql import functions as F

In [34]:
df_pd = df.withColumn('pickup_date', F.to_date(df.pickup_datetime))

In [35]:
df_pd.head()

Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', originating_base_num='B02764', request_datetime=datetime.datetime(2021, 2, 1, 6, 59), on_scene_datetime=datetime.datetime(2021, 2, 1, 7, 10, 19), pickup_datetime=datetime.datetime(2021, 2, 1, 7, 10, 40), dropoff_datetime=datetime.datetime(2021, 2, 1, 7, 21, 9), PULocationID=35, DOLocationID=39, trip_miles=2.06, trip_time=629, base_passenger_fare=17.14, tolls=0.0, bcf=0.51, sales_tax=1.52, congestion_surcharge=0.0, airport_fee=None, tips=0.0, driver_pay=9.79, shared_request_flag='N', shared_match_flag='N', access_a_ride_flag=' ', wav_request_flag='N', wav_match_flag='N', pickup_date=datetime.date(2021, 2, 1))

In [36]:
df_pd.registerTempTable("fhvhv_table")

In [41]:
df_hm_trip = spark.sql("""
SELECT
    COUNT(1)
FROM
    fhvhv_table
WHERE
    pickup_date="2021-02-15"
""")
df_hm_trip.show()

+--------+
|count(1)|
+--------+
|  425928|
+--------+



In [50]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter('pickup_date = "2021-02-15"') \
    .count()

425928

## Q4

In [58]:
df \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
    .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2021-02-11|        75540|
| 2021-02-17|        57221|
| 2021-02-20|        44039|
| 2021-02-04|        40653|
| 2021-02-26|        35010|
+-----------+-------------+



In [62]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    Max(CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60 AS duration
FROM
    fhvhv_table
GROUP BY 
    1
ORDER BY
    2 desc
LIMIT 10;
""").show()

+-----------+-----------------+
|pickup_date|         duration|
+-----------+-----------------+
| 2021-02-11|           1259.0|
| 2021-02-17|953.6833333333333|
| 2021-02-20|733.9833333333333|
| 2021-02-04|           677.55|
| 2021-02-26|            583.5|
| 2021-02-18|576.8666666666667|
| 2021-02-10|541.2666666666667|
| 2021-02-25|           540.65|
| 2021-02-22|           537.05|
| 2021-02-06|524.1166666666667|
+-----------+-----------------+



## Q5

In [66]:
spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(1)
FROM
    fhvhv_table
GROUP BY 
    1
ORDER BY
    2 desc
LIMIT 10;
""").show()

+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
|              B02869|  429720|
|              B02887|  322331|
|              B02871|  312364|
|              B02864|  311603|
|              B02866|  311089|
+--------------------+--------+



In [68]:
df \
    .groupBy('dispatching_base_num') \
    .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
+--------------------+-------+



Q6

In [94]:
from pyspark.sql import types

schema = types.StructType([
types.StructField("LocationID", types.StringType(), True),
types.StructField("Borough", types.StringType(), True),
types.StructField("Zone", types.StringType(), True),
types.StructField("service_zone", types.StringType(), True)
])

df_zone = spark.read \
                .option('haeder', 'true') \
                .schema(schema) \
                .csv("code/data/zone/")
df_zone.registerTempTable("zone")

In [95]:
df_zone.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [96]:
spark.sql("""
SELECT * FROM zone LIMIT 5;
""").show()

+----------+---------+--------------------+------------+
|LocationID|  Borough|                Zone|service_zone|
+----------+---------+--------------------+------------+
|LocationID|  Borough|                Zone|service_zone|
|         1|      EWR|      Newark Airport|         EWR|
|         2|   Queens|         Jamaica Bay|   Boro Zone|
|         3|    Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|Manhattan|       Alphabet City| Yellow Zone|
+----------+---------+--------------------+------------+



In [101]:
spark.sql("""
SELECT 
    concat(COALESCE(znpu.Zone, 'Unknown'), ' / ', COALESCE(zndo.Zone, 'Unknown')) AS pickup_dropoff,
    COUNT(1)
FROM 
    fhvhv_table AS F
JOIN zone AS znpu
    ON F.PULocationID = znpu.LocationID
JOIN zone AS zndo
    ON F.DOLocationID = zndo.LocationID
GROUP BY 
    1
ORDER BY 
    2 DESC
LIMIT 5;
""").take(5)

[Row(pickup_dropoff='East New York / East New York', count(1)=45041),
 Row(pickup_dropoff='Borough Park / Borough Park', count(1)=37329),
 Row(pickup_dropoff='Canarsie / Canarsie', count(1)=28026),
 Row(pickup_dropoff='Crown Heights North / Crown Heights North', count(1)=25976),
 Row(pickup_dropoff='Bay Ridge / Bay Ridge', count(1)=17934)]