# Practice Case 6
#### Muhammad Zaky Aonillah

## Import Library

In [33]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Dataset Preparation

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Practice-Case6") \
    .getOrCreate()

In [None]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

In [24]:
df_yellow = spark.read.parquet("yellow_tripdata_2021-02.parquet")
df_green = spark.read.parquet("green_tripdata_2021-02.parquet")
df_fhv = spark.read.parquet("fhv_tripdata_2021-02.parquet")

In [28]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [29]:
df_yellow = df_yellow.drop('airport_fee')

In [32]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [34]:
df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

In [35]:
df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))

In [36]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [37]:
df_trips_data.groupBy('service_type').count().show()

+------------+-------+
|service_type|  count|
+------------+-------+
|       green|  64572|
|      yellow|1371709|
+------------+-------+



In [38]:
df_trips_data.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

In [68]:
df_trips_data.select("pickup_datetime","pickup_date", "trip_distance").show()

+-------------------+-----------+-------------+
|    pickup_datetime|pickup_date|trip_distance|
+-------------------+-----------+-------------+
|2021-02-01 07:34:03| 2021-02-01|         3.66|
|2021-02-01 07:04:00| 2021-02-01|          1.1|
|2021-02-01 07:18:51| 2021-02-01|         4.93|
|2021-02-01 07:53:27| 2021-02-01|          6.7|
|2021-02-01 07:57:46| 2021-02-01|         1.89|
|2021-02-01 07:33:03| 2021-02-01|          3.3|
|2021-02-01 07:18:43| 2021-02-01|         2.51|
|2021-02-01 07:06:50| 2021-02-01|         1.68|
|2021-02-01 08:25:20| 2021-02-01|         1.44|
|2021-02-01 09:56:55| 2021-02-01|          0.0|
|2021-02-01 09:53:46| 2021-02-01|          1.9|
|2021-02-01 09:34:48| 2021-02-01|          1.9|
|2021-02-01 09:32:28| 2021-02-01|         1.73|
|2021-02-01 10:40:41| 2021-02-01|         0.94|
|2021-02-01 10:58:34| 2021-02-01|         3.24|
|2021-02-01 11:46:50| 2021-02-01|         1.67|
|2021-02-01 11:59:12| 2021-02-01|          1.2|
|2021-02-01 10:59:27| 2021-02-01|       

In [67]:
df_trips_data = df_trips_data.withColumn("pickup_date", df_trips_data["pickup_datetime"].cast('date'))

In [70]:
df_trips_data.registerTempTable('trips_data')



In [85]:
df_fhv.registerTempTable('fhv_data')



## 1. How many taxi trips were there on February 15?

In [118]:
total_trip_15feb = spark.sql(""" 

    SELECT COUNT(1) as total_trip
    FROM trips_data
    WHERE pickup_datetime >= '2021-02-15 00:00:00' AND pickup_datetime < '2021-02-16 00:00:00' 

""")

In [119]:
total_trip_15feb.show()

+----------+
|total_trip|
+----------+
|     45497|
+----------+



##  2. Find the longest trip for each day ?

In [82]:
df_longtrip = spark.sql("""
    SELECT pickup_date, MAX(trip_distance) as longest_trip
    FROM trips_data
    WHERE pickup_date >= "2021-02-01" AND pickup_date < "2021-03-01"
    GROUP BY 1
    ORDER BY 1
""")

In [83]:
df_longtrip.show(35)

+-----------+------------+
|pickup_date|longest_trip|
+-----------+------------+
| 2021-02-01|       38.89|
| 2021-02-02|       73.24|
| 2021-02-03|   186079.73|
| 2021-02-04|   102620.98|
| 2021-02-05|    91134.16|
| 2021-02-06|       48.35|
| 2021-02-07|   186510.67|
| 2021-02-08|   186617.92|
| 2021-02-09|    89416.24|
| 2021-02-10|     60382.7|
| 2021-02-11|    43174.56|
| 2021-02-12|    66659.27|
| 2021-02-13|    54381.65|
| 2021-02-14|   115928.92|
| 2021-02-15|       52.89|
| 2021-02-16|   221188.25|
| 2021-02-17|   140145.44|
| 2021-02-18|    29501.25|
| 2021-02-19|       75.81|
| 2021-02-20|   188054.03|
| 2021-02-21|    47327.62|
| 2021-02-22|       55.87|
| 2021-02-23|        79.3|
| 2021-02-24|    90073.44|
| 2021-02-25|    50422.63|
| 2021-02-26|    90796.21|
| 2021-02-27|       91.05|
| 2021-02-28|     29486.5|
+-----------+------------+



## 3. Find Top 5 Most frequent `dispatching_base_num` ?

In [125]:
top_5_dbm = spark.sql("""
    SELECT dispatching_base_num, count(dispatching_base_num) as count
    FROM fhv_data
    GROUP BY 1
    ORDER BY 2 DESC
""")

In [126]:
top_5_dbm.show(5)

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+
only showing top 5 rows



## 4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [116]:
most_pu = spark.sql(""" 
    SELECT PUlocationID, COUNT(PUlocationID) as count_pu
    FROM trips_data
    GROUP BY 1
    ORDER BY 2 DESC
""").limit(5)

In [113]:
most_do = spark.sql(""" 
    SELECT DOlocationID, COUNT(DOlocationID) as count_do
    FROM trips_data
    GROUP BY 1
    ORDER BY 2 DESC
""").limit(5)


In [117]:
most_pu.show()

+------------+--------+
|PUlocationID|count_pu|
+------------+--------+
|         236|   75062|
|         237|   72922|
|         141|   46266|
|         239|   45049|
|         186|   44304|
+------------+--------+



In [114]:
most_do.show()

+------------+--------+
|DOlocationID|count_do|
+------------+--------+
|         236|   74619|
|         237|   62488|
|         141|   44876|
|         239|   42888|
|         238|   42250|
+------------+--------+

