<a href="https://colab.research.google.com/github/mhaniffajari/Data-Engineering-IYKRA/blob/main/Practice_Case_Session_6_Spark_SQL_and_Data_Frames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparation Installation Pyspark

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [6]:
import findspark
findspark.init()

In [7]:
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [9]:
spark

# Data Preparation

In [10]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet

--2022-10-03 01:48:22--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.171, 65.8.245.178, 65.8.245.51, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.171|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21777258 (21M) [application/x-www-form-urlencoded]
Saving to: ‘yellow_tripdata_2021-02.parquet’


2022-10-03 01:48:23 (57.3 MB/s) - ‘yellow_tripdata_2021-02.parquet’ saved [21777258/21777258]



In [11]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet

--2022-10-03 01:48:23--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.171, 65.8.245.178, 65.8.245.51, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.171|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1145679 (1.1M) [binary/octet-stream]
Saving to: ‘green_tripdata_2021-02.parquet’


2022-10-03 01:48:24 (7.43 MB/s) - ‘green_tripdata_2021-02.parquet’ saved [1145679/1145679]



In [12]:
df_green = spark.read.parquet("green_tripdata_2021-02.parquet")
df_yellow = spark.read.parquet("yellow_tripdata_2021-02.parquet")

In [13]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-02-01 00:34:03|  2021-02-01 00:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.

In [14]:
df_yellow.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 00:40:47|  2021-02-01 00:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

In [15]:
df_green = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [16]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [17]:
common_colums

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [18]:
from pyspark.sql import functions as F

In [19]:
df_green_sel = df_green.select(common_colums).withColumn('service_type', F.lit('green'))

In [20]:
df_yellow_sel = df_yellow.select(common_colums).withColumn('service_type', F.lit('yellow'))

In [21]:
df_join = df_green_sel.unionAll(df_yellow_sel)

In [22]:
df_join.show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2021-02-01 00:34:03|2021-02-01 00:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.5|      10.0|         0.0|       

In [23]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet 

--2022-10-03 01:48:37--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.171, 65.8.245.178, 65.8.245.51, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.171|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet’


2022-10-03 01:48:38 (42.0 MB/s) - ‘fhv_tripdata_2021-02.parquet’ saved [10645466/10645466]



In [24]:
df_fhv = spark.read.parquet("fhv_tripdata_2021-02.parquet")

In [25]:
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 00:14:03|2021-02-01 00:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 00:27:48|2021-02-01 00:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 00:12:50|2021-02-01 00:26:38|        null|       225.0|   null|                B00037|
|              B00037|2021-02-01 00:00:3

# How many taxi trips were there on February 15?

In [26]:
df_join

DataFrame[VendorID: bigint, pickup_datetime: timestamp, dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, payment_type: double, congestion_surcharge: double, service_type: string]

In [27]:
df_1 = df_join.withColumn('pickup_datetime',F.to_date(df_join.pickup_datetime)).filter((df_join['pickup_datetime']>='2021-02-15')&(df_join['pickup_datetime']<'2021-02-16')).select('pickup_datetime').groupBy('pickup_datetime').count()

In [28]:
df_1.show()

+---------------+-----+
|pickup_datetime|count|
+---------------+-----+
|     2021-02-15|42100|
+---------------+-----+



taxi trips on February 15 is 42100

# Find the longest trip for each day ?

In [35]:
df_2 = df_join.withColumn('pickup_datetime',F.to_date(df_join.pickup_datetime)).select('pickup_datetime','trip_distance').filter(df_join['pickup_datetime']>='2021-02-01').groupBy('pickup_datetime').max('trip_distance').sort('pickup_datetime')

In [36]:
df_2.show()

+---------------+------------------+
|pickup_datetime|max(trip_distance)|
+---------------+------------------+
|     2021-02-01|             38.89|
|     2021-02-02|             73.24|
|     2021-02-03|         186079.73|
|     2021-02-04|             82.19|
|     2021-02-05|          91134.16|
|     2021-02-06|              43.5|
|     2021-02-07|         186510.67|
|     2021-02-08|         186617.92|
|     2021-02-09|           60382.7|
|     2021-02-10|           60382.7|
|     2021-02-11|          43174.56|
|     2021-02-12|          66659.27|
|     2021-02-13|         115928.92|
|     2021-02-14|             58.03|
|     2021-02-15|             52.89|
|     2021-02-16|         221188.25|
|     2021-02-17|         140145.44|
|     2021-02-18|             75.81|
|     2021-02-19|              70.4|
|     2021-02-20|         188054.03|
+---------------+------------------+
only showing top 20 rows



# Find Top 5 Most frequent `dispatching_base_num` ?

In [31]:
df_3 = df_fhv.groupBy("dispatching_base_num").count().sort('count',ascending=False).limit(5)

In [32]:
df_3.show()

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



# Find Top 5 Most common location pairs (PUlocationID and DOlocationID)

In [33]:
df_4 = df_fhv.filter('PUlocationID is not NULL AND DOlocationID is not NULL').groupBy(['PUlocationID','DOlocationID']).count().sort('count',ascending=False).limit(5)

In [34]:
df_4.show()

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
+------------+------------+-----+



# Write all of the result to BigQuery table (additional - point plus)

In [None]:
df_1.write.format('bigquery').option('table','cleaver-1.taxi.answer_1').save()

In [None]:
df_2.write.format('bigquery').option('table','cleaver-1.taxi.answer_2').save()

In [None]:
df_3.write.format('bigquery').option('table','cleaver-1.taxi.answer_3').save()

In [None]:
df_4.write.format('bigquery').option('table','cleaver-1.taxi.answer_4').save()