In [9]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types

from pyspark.sql.functions import col, lag, unix_timestamp
from pyspark.sql import functions as F


In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [13]:
#Question 1.

spark.version

'3.2.0'

In [14]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv \
    -O ./data/raw/fhvhv/2021/02/fhvhv_tripdata_2021-02.csv

--2022-02-24 03:10:32--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.248.84
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.248.84|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘./data/raw/fhvhv/2021/02/fhvhv_tripdata_2021-02.csv’


2022-02-24 03:10:47 (48.8 MB/s) - ‘./data/raw/fhvhv/2021/02/fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [60]:
fhvhv_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])


In [4]:
year = 2021
month = 2



input_path = f'data/raw/fhvhv/{year}/{month:02d}/'
output_path = f'data/pq/fhvhv/{year}/{month:02d}/'

df_fhvhv = spark.read \
    .option("header", "true") \
    .csv(input_path)

In [6]:
df_fhvhv.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

In [11]:
fhvhv_schema =  types.StructType([
    types.StructField('hvfhs_license_num',types.StringType(),True),
    types.StructField('dispatching_base_num',types.StringType(),True),
    types.StructField('pickup_datetime',types.TimestampType(),True),
    types.StructField('dropoff_datetime',types.TimestampType(),True),
    types.StructField('PULocationID',types.IntegerType(),True),
    types.StructField('DOLocationID',types.IntegerType(),True),
    types.StructField('SR_Flag',types.StringType(),True)
])

In [12]:
year = 2021
month = 2



input_path = f'data/raw/fhvhv/{year}/{month:02d}/'
output_path = f'data/pq/fhvhv/{year}/{month:02d}/'

df_fhvhv = spark.read \
    .option("header", "true") \
    .schema(fhvhv_schema) \
    .csv(input_path)

In [13]:
df_fhvhv.count()

11613942

In [14]:
df_fhvhv = df_fhvhv.repartition(24)


In [15]:
df_fhvhv.rdd.getNumPartitions()

24

In [16]:
df_fhvhv \
    .write.parquet(output_path,mode='overwrite')

In [17]:
!du -h ./data/pq/fhvhv/2021/02 


211M	./data/pq/fhvhv/2021/02


In [18]:
#Question 2
!ls -hs ./data/pq/fhvhv/2021/02 

total 209M
   0 _SUCCESS
8.7M part-00000-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00001-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00002-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00003-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00004-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00005-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00006-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00007-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00008-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00009-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00010-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00011-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-00012-f85e9f4b-4370-4500-af94-bb4b7110dcb5-c000.snappy.parquet
8.7M part-0

In [19]:
input_path = './data/pq/fhvhv/2021/02'

df = spark.read \
    .parquet(input_path)

In [20]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [22]:
df.select(col("pickup_datetime")).head(5)

[Row(pickup_datetime=datetime.datetime(2021, 2, 5, 18, 53, 32)),
 Row(pickup_datetime=datetime.datetime(2021, 2, 3, 20, 4, 21)),
 Row(pickup_datetime=datetime.datetime(2021, 2, 4, 0, 26, 3)),
 Row(pickup_datetime=datetime.datetime(2021, 2, 5, 8, 21, 19)),
 Row(pickup_datetime=datetime.datetime(2021, 2, 1, 21, 54, 16))]

In [23]:
df.filter(col("pickup_datetime").between('2021-02-15','2021-02-16')).count()

367174

In [24]:
df.filter("pickup_datetime > date'2021-02-15'").filter("pickup_datetime < date'2021-02-16'").count()

367165

# Question 4. Longest trip for each day
Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [41]:
#Calculate Time difference in Seconds
input_path = './data/pq/fhvhv/2021/02'

df = spark.read \
    .parquet(input_path)


df2 = df.withColumn('pickup_date', F.date_trunc('DAY', col('pickup_datetime'))) \
        .withColumn('trip_duration_seconds',col("dropoff_datetime").cast("long") - col('pickup_datetime').cast("long")) \
        .select('pickup_date','trip_duration_seconds')
  
df2.groupBy('pickup_date').agg(F.max('trip_duration_seconds').alias('max_trip_duration_seconds')) \
            .orderBy(F.desc('max_trip_duration_seconds')).show(5)


+-------------------+-------------------------+
|        pickup_date|max_trip_duration_seconds|
+-------------------+-------------------------+
|2021-02-11 00:00:00|                    75540|
|2021-02-17 00:00:00|                    57221|
|2021-02-20 00:00:00|                    44039|
|2021-02-03 00:00:00|                    40653|
|2021-02-19 00:00:00|                    37577|
+-------------------+-------------------------+
only showing top 5 rows



# Question 5. Most frequent dispatching_base_num

Now find the most frequently occurring dispatching_base_num in this dataset.

How many stages does this spark job has?

Note: the answer may depend on how you write the query, so there are multiple correct answers. 
Select the one you have.

In [46]:
input_path = './data/pq/fhvhv/2021/02'

df = spark.read \
    .parquet(input_path)


In [47]:
df.groupBy('dispatching_base_num')\
    .agg(F.count(col('pickup_datetime')).alias('trip_count')) \
        .orderBy(F.desc('trip_count')) \
            .take(1)

[Row(dispatching_base_num='B02510', trip_count=3233664)]

Skipped Stages: 73
Completed Stages: 1

![Details of Job](hw5_q5_stages.png)

# Question 6. Most common locations pair
Find the most common pickup-dropoff pair.

For example:
"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash
If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East".

In [50]:
input_path = './data/pq/fhvhv/2021/02'

df = spark.read \
    .parquet(input_path)

df.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02866|2021-02-05 18:53:32|2021-02-05 19:00:26|          91|         155|   null|
|           HV0003|              B02876|2021-02-03 20:04:21|2021-02-03 20:19:09|          29|         154|   null|
|           HV0003|              B02888|2021-02-04 00:26:03|2021-02-04 00:40:33|         151|         229|   null|
|           HV0003|              B02887|2021-02-05 08:21:19|2021-02-05 08:26:49|         215|         215|   null|
|           HV0003|              B02764|2021-02-01 21:54:16|2021-02-01 22:09:23|          25|          49|   null|
+-----------------+--------------------+-------------------+-------------------+

In [69]:
lkup = spark.read.parquet('zones')
lkup = lkup.filter(~(col('Borough') == 'Unknown'))
lkup.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [71]:
lkup.printSchema()
lkup = lkup.withColumn("LocationID",col("LocationID").cast(types.IntegerType()))
lkup.printSchema()



root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [82]:
trips = df.join(lkup,df["PULocationID"] == lkup["LocationID"])\
.withColumn('pickup_zone',col('zone'))\
.select('pickup_zone','DOLocationID')\
.join(lkup,df["DOLocationID"] == lkup["LocationID"])\
.withColumn('drop_off_zone',col('zone'))\
.select('pickup_zone','drop_off_zone')

trips.withColumn('pickup_dropoff_pair', F.concat_ws(" / ", col('pickup_zone'),col('drop_off_zone'))) \
    .groupBy('pickup_dropoff_pair').agg(F.count('pickup_dropoff_pair').alias('trip_count')) \
        .orderBy(F.desc('trip_count')) \
        .take(1)

[Row(pickup_dropoff_pair='East New York / East New York', trip_count=45041)]

# Bonus question. Join type
(not graded)

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?

And how many stages your spark job has?

![Trip Pair Count](hw5_q6_stages.png)