In [None]:
!pip install findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import requests

url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet'
filename = 'fhv_tripdata_2021-02.parquet'
path = '.'

response = requests.get(url)
with open(path+'/'+filename, 'wb') as f:
    f.write(response.content)
    print("Download Success")

Download Success


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project 5").getOrCreate()

In [None]:
df = spark.read.parquet("./fhv_tripdata_2021-02.parquet")
df.printSchema()
df.show(5)

# Register the DataFrame as a SQL table
df.createOrReplaceTempView("fhv_tripdata_2021")


root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropOff_datetime: timestamp_ntz (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 00:14:03|2021-02-0

## Analyze Data


### How many taxi trips were there on February 15?


In [None]:
taxi_trips = spark.sql("SELECT COUNT(*) AS total_trip FROM fhv_tripdata_2021\
                        WHERE DATE_TRUNC('day', pickup_datetime) = '2021-02-15'\
                        ")
taxi_trips.show(5)

+----------+
|total_trip|
+----------+
|     34814|
+----------+



### Find the longest trip for each day ?

In [None]:
longest_trip = spark.sql("SELECT date_trunc('day', pickup_datetime) as pickup_date ,\
                          SUM(TIMESTAMPDIFF(MINUTE, pickup_datetime, dropOff_datetime)) as duration_minute\
                          FROM fhv_tripdata_2021\
                          GROUP BY pickup_date\
                          ORDER BY duration_minute desc\
                          ")
print("10 Top Longest Trip each day")
longest_trip.show(10)

10 Top Longest Trip each day
+-------------------+---------------+
|        pickup_date|duration_minute|
+-------------------+---------------+
|2021-02-04 00:00:00|        1075420|
|2021-02-25 00:00:00|        1026330|
|2021-02-03 00:00:00|        1003721|
|2021-02-23 00:00:00|         962173|
|2021-02-17 00:00:00|         957779|
|2021-02-24 00:00:00|         956015|
|2021-02-26 00:00:00|         952434|
|2021-02-10 00:00:00|         939262|
|2021-02-05 00:00:00|         937278|
|2021-02-11 00:00:00|         925845|
+-------------------+---------------+
only showing top 10 rows



### Find Top 5 Most frequent `dispatching_base_num` ?

In [None]:
# Cleansing Data
most_frequent = spark.sql(
    "SELECT TRIM(dispatching_base_num) as dispatching_base_num, \
    count(dispatching_base_num) as total \
    FROM fhv_tripdata_2021\
    GROUP BY dispatching_base_num\
    ORDER BY total desc"
)

print("Find Top 5 Most frequent dispatching_base_num")
most_frequent.show(10)


Find Top 5 Most frequent dispatching_base_num
+--------------------+-----+
|dispatching_base_num|total|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
|              B01239|24591|
|              B02735|21031|
|              B00310|18141|
|              B01899|16563|
|              B00900|16024|
+--------------------+-----+
only showing top 10 rows



### Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [None]:
location_pairs_common = spark.sql(
    "SELECT PULocationID, DOLocationID, COUNT(*) AS total_pasangan \
    FROM fhv_tripdata_2021 \
    WHERE PULocationID IS NOT NULL AND DOLocationID IS NOT NULL\
    GROUP BY PULocationID,  DOlocationID\
    ORDER by total_pasangan DESC\
    "
)

location_pairs_common.show(10)

+------------+------------+--------------+
|PULocationID|DOLocationID|total_pasangan|
+------------+------------+--------------+
|       206.0|       206.0|          2374|
|       221.0|       206.0|          2112|
|       129.0|       129.0|          1902|
|         7.0|         7.0|          1829|
|       179.0|       179.0|          1736|
|       221.0|       221.0|          1562|
|       223.0|       223.0|          1522|
|        92.0|        92.0|          1383|
|       206.0|       221.0|          1309|
|        56.0|        56.0|          1073|
+------------+------------+--------------+
only showing top 10 rows



### Write all of the result to BigQuery table (additional - point plus)

In [None]:
from google.oauth2 import service_account
from google.cloud import bigquery

credentials = service_account.Credentials.from_service_account_file(
    './private-key-bq.json')
project_id = 'learning-379714'
data_set = 'project_5'
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

In [None]:
import pandas as pd

taxi_trip

In [None]:
taxi_trip_pd = taxi_trips.toPandas()
taxi_trip_df = pd.DataFrame(taxi_trip_pd)

In [None]:
# Get a reference to the table
table_ref = client.dataset(data_set).table('taxi_trip')

# Load the data into the table
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job = client.load_table_from_dataframe(taxi_trip_df, table_ref, job_config=job_config)
job.result()  # Wait for the job to complete

LoadJob<project=learning-379714, location=US, id=fe9fbfba-ddde-488c-b36d-eefe21875f88>

longest_trip_pd

In [None]:
longest_trip_pd = longest_trip.toPandas()
longest_trip_df = pd.DataFrame(longest_trip_pd)
longest_trip_df.head(5)

  series = series.astype(t, copy=False)


Unnamed: 0,pickup_date,duration_minute
0,2021-02-04,1075420
1,2021-02-25,1026330
2,2021-02-03,1003721
3,2021-02-23,962173
4,2021-02-17,957779


In [None]:
# Get a reference to the table
table_ref = client.dataset(data_set).table('longest_trip')

# Load the data into the table
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job = client.load_table_from_dataframe(longest_trip_df, table_ref, job_config=job_config)
job.result()  # Wait for the job to complete

LoadJob<project=learning-379714, location=US, id=c84da8fb-36f0-4b44-84ad-4a0d7a2fb78c>

 
frequent dispatching_base_num 

In [None]:
most_frequent_pd = most_frequent.toPandas()
most_frequent_df = pd.DataFrame(most_frequent_pd)

# Get a reference to the table
table_ref = client.dataset(data_set).table('frequent_dispatching_base_num')

# Load the data into the table
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job = client.load_table_from_dataframe(most_frequent_df, table_ref, job_config=job_config)
job.result()  # Wait for the job to complete

LoadJob<project=learning-379714, location=US, id=54d7bb66-1840-4d2a-98a9-f0b0b4eb07bd>

location_pairs_common_pd

In [None]:
location_pairs_common_pd = location_pairs_common.toPandas()
location_pairs_common_df = pd.DataFrame(location_pairs_common_pd)
location_pairs_common_df.head(5)

Unnamed: 0,PULocationID,DOLocationID,total_pasangan
0,206.0,206.0,2374
1,221.0,206.0,2112
2,129.0,129.0,1902
3,7.0,7.0,1829
4,179.0,179.0,1736


In [None]:
# Get a reference to the table
table_ref = client.dataset(data_set).table('location_pairs_common')

# Load the data into the table
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job = client.load_table_from_dataframe(location_pairs_common_pd, table_ref, job_config=job_config)
job.result()  # Wait for the job to complete

LoadJob<project=learning-379714, location=US, id=f2662aa6-6aa6-4f5b-b0df-081a5b756677>