<a href="https://colab.research.google.com/github/rhezapal/digitalskola_project5/blob/main/Project5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project 5

Tugas : Download the February 2021 data from TLC Trip Record website (For-Hired Vehicle
Trip Records) (https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) and use
Pyspark to analyze and answer these questions below. Upload your script into Github or
Gdrive.
1. How many taxi trips were there on February 15?
2. Find the longest trip for each day ?
3. Find Top 5 Most frequent `dispatching_base_num` ?
4. Find Top 5 Most common location pairs (`PUlocationID` and `DOlocationID`) ?
5. Write all of the result to BigQuery table (additional - point plus)
*dapat menggunakan google colab untuk menyelesaikan tugas

In [3]:
!pip install -q findspark

In [4]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=8739fceb1ddc8325661f20a08be1d78e91087e2d3c6e90df096ab2b4b58d29c1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [5]:
import findspark

In [6]:
#Initialization Spark
findspark.init()

In [7]:
#Import Package
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [8]:
#Create SparkSession
spark = SparkSession.builder.master('local[*]').appName('project5').getOrCreate()

In [9]:
spark

In [10]:
#Load dataset parquet
df = spark.read.parquet('/content/fhv_tripdata_2021-02.parquet')

In [11]:
#Show dataset
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 00:14:03|2021-02-01 00:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 00:27:48|2021-02-01 00:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 00:12:50|2021-02-01 00:26:38|        null|       225.0|   null|                B00037|
|              B00037|2021-02-01 00:00:3

In [12]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropOff_datetime: timestamp_ntz (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



#1. How many taxi trips were there on February 15?

In [13]:
df_1 = df.withColumn('pickup_datetime', to_date(col('pickup_datetime'),"yyyy-MM-dd"))
#column pickup_datetime = yyyy-MM-dd HH-mm-ss --> yyyy-MM-dd = withColumn (name_column, col), col -> change the date type.
#col = (to_date(name_column), dateformat)

df_1_result = df_1.groupBy ('pickup_datetime') \
              .agg(count("*").alias("total_trip")) \
              .filter (df_1.pickup_datetime =='2021-02-15')
#aggregation-> grouping by pickup_datetime, counting, and filter for spesific date.

In [14]:
df_1_result.show()

+---------------+----------+
|pickup_datetime|total_trip|
+---------------+----------+
|     2021-02-15|     34814|
+---------------+----------+



#2. Find the longest trip for each day ?

In [46]:
#data type drofOff_datetime and pickup_datetime = timestamp_ntz, change to yyyy-MM-dd HH:mm:ss,
#harus diganti, karena tipe data awalnya tidak bisa diganti langsung.
df_2 = df
df_2 = df_2.withColumn('pickup_datetime', to_timestamp(col('pickup_datetime'), "yyyy-MM-dd HH:mm:ss"))
df_2 = df_2.withColumn('dropOff_datetime', to_timestamp(col('dropOff_datetime'), "yyyy-MM-dd HH:mm:ss"))

#Agregasi lama perjalanan = kurangkan dropOff_datetime dan pickup_datetime, 
#sekaligus diubah ke type data double dibagi 60, agar hasilnya jadi menit
df_2 = df_2.withColumn('trip_duration_minutes', (col('dropOff_datetime').cast("long") - col('pickup_datetime').cast("long")) / 60)

#Kolom baru : date_group
df_2 = df_2.withColumn('date_group', to_date(col('pickup_datetime'),"yyyy-MM-dd"))

#Partition by Window, sorting trip_duration DESC and rank
windowSpec  = Window.partitionBy('date_group').orderBy(col("trip_duration_minutes").desc())
df_2=df_2.withColumn("row_number",row_number().over(windowSpec))

#show the longest trip each day, by row_number, filter row_number=1 (the longest), sort by date, and drop thw row_number
longest_trip = df_2.select ('date_group','dispatching_base_num','trip_duration_minutes','row_number') \
                   .filter (col('row_number')==1) \
                   .sort('date_group') \
                   .drop('row_number') 
longest_trip.show()

+----------+--------------------+---------------------+
|date_group|dispatching_base_num|trip_duration_minutes|
+----------+--------------------+---------------------+
|2021-02-01|              B00477|              46290.0|
|2021-02-02|              B01899|   1390.7833333333333|
|2021-02-03|              B02653|    40034.88333333333|
|2021-02-04|              B03297|             110919.0|
|2021-02-05|              B03242|   3184.6833333333334|
|2021-02-06|     B00889         |               1440.0|
|2021-02-07|              B03284|   1121.3333333333333|
|2021-02-08|              B03292|    9424.916666666666|
|2021-02-09|     B02416         |   1459.9833333333333|
|2021-02-10|              B01899|   3219.8166666666666|
|2021-02-11|     B03268         |   2819.2166666666667|
|2021-02-12|              B03211|               4344.0|
|2021-02-13|              B03275|    8422.683333333332|
|2021-02-14|     B00889         |               1519.4|
|2021-02-15|              B03275|             14

# 3. Find Top 5 Most frequent dispatching_base_num ?

In [51]:
df_3 = df
df_3 = df_3.groupBy ('dispatching_base_num') \
           .agg (count('dispatching_base_num').alias('total')) \
           .sort(desc('total')) \
           .limit (5)
df_3.show()
#group by dispatching_base_num, count, sort DESC by total, limit

+--------------------+-----+
|dispatching_base_num|total|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



#4.Find Top 5 Most common location pairs (`PUlocationID` and `DOlocationID`) ?

In [55]:
df_4 = df.filter (col('PUlocationID').isNotNull() & col('DOlocationID').isNotNull())
df_4 = df_4.groupBy('PUlocationID','DOlocationID') \
           .agg (count('dispatching_base_num').alias('total')) \
           .sort(desc('total')) \
           .limit (5)
df_4.show()

+------------+------------+-----+
|PUlocationID|DOlocationID|total|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
+------------+------------+-----+



# 5.Write all of the result to BigQuery table (additional - point plus)

In [56]:
!pip install pandas

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [57]:
!pip install pandas-gbq

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [58]:
import os
import time
import pandas_gbq, pandas

In [61]:
path_to_credentials = '/content/digitalskola-rheza-f5faffb7283c.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_credentials

project_id="digitalskola-rheza"
dataset_id="project_5"

## Upload Jawaban Soal 1

In [67]:
table_id="total_trip_per_day"
destination_table = f"{dataset_id}.{table_id}"
pandas_df = df_1_result.toPandas()
pandas_df['pickup_datetime'] = pandas_df['pickup_datetime'].astype('string') #need change datatype to string
pandas_gbq.to_gbq(pandas_df, destination_table, project_id=project_id, if_exists='replace')
print(pandas_df.dtypes)

100%|██████████| 1/1 [00:00<00:00, 8594.89it/s]

pickup_datetime    string
total_trip          int64
dtype: object





## Upload Jawaban Soal 2

In [66]:
table_id="longest_trip_per_day"
destination_table = f"{dataset_id}.{table_id}"
pandas_df = longest_trip.toPandas()
pandas_df['date_group'] = pandas_df['date_group'].astype('string') #need change datatype to string
pandas_gbq.to_gbq(pandas_df, destination_table, project_id=project_id, if_exists='replace')
print(pandas_df.dtypes)

100%|██████████| 1/1 [00:00<00:00, 2362.99it/s]

date_group                string
dispatching_base_num      object
trip_duration_minutes    float64
dtype: object





## 3. Upload Jawaban Soal 3

In [68]:
#Find Top 5 Most frequent dispatching_base_num ?
table_id="top_5_frequent_dispatching_base"
destination_table = f"{dataset_id}.{table_id}"
pandas_df = df_3.toPandas()
pandas_gbq.to_gbq(pandas_df, destination_table, project_id=project_id, if_exists='replace')
print(pandas_df.dtypes)

100%|██████████| 1/1 [00:00<00:00, 8050.49it/s]

dispatching_base_num    object
total                    int64
dtype: object





## 4. Upload Jawaban Soal 4

In [69]:
#Find Top 5 Most common location pairs (`PUlocationID` and `DOlocationID`) ?
table_id="top_5_location_pairs"
destination_table = f"{dataset_id}.{table_id}"
pandas_df = df_4.toPandas()
pandas_gbq.to_gbq(pandas_df, destination_table, project_id=project_id, if_exists='replace')
print(pandas_df.dtypes)

100%|██████████| 1/1 [00:00<00:00, 2445.66it/s]

PUlocationID    float64
DOlocationID    float64
total             int64
dtype: object



