<div style="text-align:center;">
<h1>PySpark Analysis NYC TLC Trips Records Data Feb 2021</h1>
<hr>
<sub>Muhammad Difagama Ivanka</sub><br><br>
<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/b/b0/Old_checker_cab.jpg/2560px-Old_checker_cab.jpg" alt="Paris" style="width: 100%; max-width: 534px; height: 400px;"><br>
<sub><sub>Photo by <a href="https://www.flickr.com/photos/zombieite/4987163977/">Flickr</a></sub></sub></div>

## Start

In [2]:
import numpy as np
np.random.seed(233)
import pandas as pd
import pyspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F



In [3]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('nyc_spark') \
    .getOrCreate()

In [7]:
DATA_PATH = './resources/data'

!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet -O {DATA_PATH}/yellow_tripdata_2021-02.parquet
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet -O {DATA_PATH}/green_tripdata_2021-02.parquet
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet -O {DATA_PATH}/fhv_tripdata_2021-02.parquet

--2022-12-31 01:28:50--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.161.108.231, 18.161.108.141, 18.161.108.77, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.161.108.231|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21777258 (21M) [application/x-www-form-urlencoded]
Saving to: ‘./resources/data/yellow_tripdata_2021-02.parquet’


utime(./resources/data/yellow_tripdata_2021-02.parquet): Operation not permitted
2022-12-31 01:29:05 (1.56 MB/s) - ‘./resources/data/yellow_tripdata_2021-02.parquet’ saved [21777258/21777258]

--2022-12-31 01:29:06--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... ::ffff:18.161.108.231, ::ffff:18.161.108.141, ::ffff:18.161.108.77, ...
Connecting to d37ci6vzurychx.cloudfront.net (

In [10]:
df_yellow = spark.read.parquet(f'{DATA_PATH}/yellow_tripdata_2021-02.parquet')
df_green = spark.read.parquet(f'{DATA_PATH}/green_tripdata_2021-02.parquet')
df_fhv = spark.read.parquet(f'{DATA_PATH}/fhv_tripdata_2021-02.parquet')

In [11]:
# Only get the February 2021 data
def filtered_data(the_df, pickup_col: str, dropoff_col: str):
    pickup_col = F.col(pickup_col)
    dropoff_col = F.col(dropoff_col)

    the_df = the_df.where((pickup_col >= "2021-02-01 00:00:00")
    & (pickup_col< "2021-03-01 00:00:00"))
    the_df = the_df.where((dropoff_col >= "2021-02-01 00:00:00"))
    return the_df

df_yellow = filtered_data(df_yellow, 'tpep_pickup_datetime', 'tpep_dropoff_datetime')
df_green = filtered_data(df_green, 'lpep_pickup_datetime', 'lpep_dropoff_datetime')
df_fhv = filtered_data(df_fhv, 'pickup_datetime', 'dropOff_datetime')

print(df_yellow, "\n")
print(df_green, "\n")
print(df_fhv)

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double] 

DataFrame[VendorID: bigint, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: int, improvement_surcharge: double, total_amount: double, payment_type: double, trip_type: double, congestion_surcharge: double] 

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dr

## 1. How many taxi trips were there on February 15?

In [12]:
def total_trips_cnt(the_df, pickup_time_col: str, beautify = True):
    pickup_nm = F.col(pickup_time_col)
    the_df = the_df.where((F.to_date(pickup_nm) == "2021-02-15"))

    if beautify:
        the_df = the_df.count()
    else:
        the_df = the_df.groupBy(F.to_date(pickup_nm)).count()
        the_df = the_df.withColumnRenamed(f'to_date({pickup_time_col})', 'Pickup Date')\
            .withColumnRenamed('count', 'Total Trips')
    return the_df

dbn_col = F.col('dispatching_base_num')
df_fhv_dbn = df_fhv.groupBy(dbn_col).count()
df_fhv_dbn = df_fhv_dbn.orderBy(F.col('count').desc())

yel_cnt = total_trips_cnt(df_yellow, 'tpep_pickup_datetime')
grn_cnt = total_trips_cnt(df_green, 'lpep_pickup_datetime')
fhv_cnt = total_trips_cnt(df_fhv, 'pickup_datetime')

print(f"Yellow Taxi Trips on 15 February 2021\t\t\t: {yel_cnt}")
print(f"Green Taxi Trips on 15 February 2021\t\t\t: {grn_cnt}")
print(f"For-Hire Vehicle (FHV) Trips on 15 February 2021\t: {fhv_cnt}")
print(f"All Taxi Total Trips on 15 February 2021\t\t: {np.sum([yel_cnt,grn_cnt,fhv_cnt])}")

Yellow Taxi Trips on 15 February 2021			: 43734
Green Taxi Trips on 15 February 2021			: 1798
For-Hire Vehicle (FHV) Trips on 15 February 2021	: 35523
All Taxi Total Trips on 15 February 2021		: 81055


In [13]:
print('Yellow Taxi Trip Records')
total_trip_df_y = total_trips_cnt(df_yellow, 'tpep_pickup_datetime', False)
total_trip_df_y.show()

Yellow Taxi Trip Records
+-----------+-----------+
|Pickup Date|Total Trips|
+-----------+-----------+
| 2021-02-15|      43734|
+-----------+-----------+



In [14]:
print('Green Taxi Trip Records')
total_trip_df_g = total_trips_cnt(df_green, 'lpep_pickup_datetime', False)
total_trip_df_g.show()

Green Taxi Trip Records
+-----------+-----------+
|Pickup Date|Total Trips|
+-----------+-----------+
| 2021-02-15|       1798|
+-----------+-----------+



In [16]:
print('FHV Trip Records')
total_trip_df_fhv = total_trips_cnt(df_fhv, 'pickup_datetime', False)
total_trip_df_fhv.show()

FHV Trip Records
+-----------+-----------+
|Pickup Date|Total Trips|
+-----------+-----------+
| 2021-02-15|      35523|
+-----------+-----------+



## 2. The longest trip for each day

### The Longest Duration (hours)

In [17]:
def duration_trips_cal(the_df, pickup_col: str, dropoff_col: str, type_find = 'duration',
res_name_col = 'Longest Duration', max_col = 'trip_duration'):
    '''
    * the_df: dataframe.
    * pickup_col: name of pickup datetime column.
    * dropoff_col: name of dropoff datetime column.
    * type_find: distance or duration.
    * res_name_col: name of target column.
    * max_col: name of datetime column groupby.\n
    -> return in hour(s) if use default value of type_find.
    '''
    date_col = F.to_date(pickup_col)
    org_col = 'Pickup Date'
    df_temp = the_df

    if type_find == 'duration':
        df_temp = the_df.withColumn(
            max_col,
            F.round((F.col(dropoff_col).cast('long') - F.col(pickup_col).cast('long'))/3600, 2)
        )
    
    df_temp = df_temp.withColumn(max_col, F.round(F.col(max_col), 2))
    df_temp = df_temp.groupBy(date_col).max(max_col)
    df_temp = df_temp.withColumnRenamed(f'max({max_col})', res_name_col)\
        .withColumnRenamed(f'to_date({pickup_col})', org_col)
    org_col = F.col(org_col)
    df_temp = df_temp.orderBy(org_col.asc())
    return df_temp

N_show, TRCT = 28, False

print('\nYellow Taxi Trip Records')
long_trip_df_y = duration_trips_cal(df_yellow, 'tpep_pickup_datetime', 'tpep_dropoff_datetime')
long_trip_df_y.show(n=N_show, truncate=TRCT)


Yellow Taxi Trip Records
+-----------+----------------+
|Pickup Date|Longest Duration|
+-----------+----------------+
|2021-02-01 |23.7            |
|2021-02-02 |23.98           |
|2021-02-03 |23.99           |
|2021-02-04 |24.0            |
|2021-02-05 |23.99           |
|2021-02-06 |23.98           |
|2021-02-07 |23.99           |
|2021-02-08 |23.99           |
|2021-02-09 |23.98           |
|2021-02-10 |23.99           |
|2021-02-11 |23.98           |
|2021-02-12 |23.99           |
|2021-02-13 |27.78           |
|2021-02-14 |23.97           |
|2021-02-15 |23.98           |
|2021-02-16 |23.98           |
|2021-02-17 |23.99           |
|2021-02-18 |23.98           |
|2021-02-19 |23.98           |
|2021-02-20 |23.98           |
|2021-02-21 |23.98           |
|2021-02-22 |23.98           |
|2021-02-23 |23.99           |
|2021-02-24 |23.99           |
|2021-02-25 |24.0            |
|2021-02-26 |23.98           |
|2021-02-27 |23.98           |
|2021-02-28 |23.99           |
+-----------+

In [18]:
print('\nGreen Taxi Trip Records')
long_trip_df_g = duration_trips_cal(df_green, 'lpep_pickup_datetime', 'lpep_dropoff_datetime')
long_trip_df_g.show(n=N_show, truncate=TRCT)


Green Taxi Trip Records
+-----------+----------------+
|Pickup Date|Longest Duration|
+-----------+----------------+
|2021-02-01 |1.4             |
|2021-02-02 |23.12           |
|2021-02-03 |23.67           |
|2021-02-04 |23.83           |
|2021-02-05 |23.77           |
|2021-02-06 |23.86           |
|2021-02-07 |7.89            |
|2021-02-08 |23.75           |
|2021-02-09 |23.6            |
|2021-02-10 |23.59           |
|2021-02-11 |23.91           |
|2021-02-12 |23.68           |
|2021-02-13 |23.84           |
|2021-02-14 |23.54           |
|2021-02-15 |4.36            |
|2021-02-16 |23.99           |
|2021-02-17 |23.61           |
|2021-02-18 |22.67           |
|2021-02-19 |23.66           |
|2021-02-20 |23.85           |
|2021-02-21 |21.86           |
|2021-02-22 |23.87           |
|2021-02-23 |23.97           |
|2021-02-24 |23.99           |
|2021-02-25 |23.96           |
|2021-02-26 |23.79           |
|2021-02-27 |23.61           |
|2021-02-28 |23.42           |
+-----------+-

In [19]:
print('\nFHV Trip Records')
long_trip_df_fhv = duration_trips_cal(df_fhv, 'pickup_datetime', 'dropOff_datetime')
long_trip_df_fhv.show(n=N_show, truncate=TRCT)


FHV Trip Records
+-----------+----------------+
|Pickup Date|Longest Duration|
+-----------+----------------+
|2021-02-01 |771.5           |
|2021-02-02 |23.18           |
|2021-02-03 |20.77           |
|2021-02-04 |667.25          |
|2021-02-05 |1848.65         |
|2021-02-06 |45.88           |
|2021-02-07 |21.77           |
|2021-02-08 |157.08          |
|2021-02-09 |24.33           |
|2021-02-10 |23.83           |
|2021-02-11 |53.66           |
|2021-02-12 |72.4            |
|2021-02-13 |140.38          |
|2021-02-14 |25.32           |
|2021-02-15 |244.5           |
|2021-02-16 |80.27           |
|2021-02-17 |71.41           |
|2021-02-18 |45.82           |
|2021-02-19 |150.2           |
|2021-02-20 |45.02           |
|2021-02-21 |24.85           |
|2021-02-22 |216.69          |
|2021-02-23 |672.53          |
|2021-02-24 |46.13           |
|2021-02-25 |674.82          |
|2021-02-26 |44.63           |
|2021-02-27 |284.73          |
|2021-02-28 |262.72          |
+-----------+--------

### The Longest Distance (miles)

In [20]:
print('\nYellow Taxi Trip Records')
long_dis_df_y = duration_trips_cal(df_yellow, 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'distance',
'Longest Distance', 'trip_distance')
long_dis_df_y.show(n=N_show, truncate=TRCT)


Yellow Taxi Trip Records
+-----------+----------------+
|Pickup Date|Longest Distance|
+-----------+----------------+
|2021-02-01 |38.89           |
|2021-02-02 |73.24           |
|2021-02-03 |186079.73       |
|2021-02-04 |82.19           |
|2021-02-05 |91134.16        |
|2021-02-06 |48.35           |
|2021-02-07 |186510.67       |
|2021-02-08 |186617.92       |
|2021-02-09 |89416.24        |
|2021-02-10 |116.74          |
|2021-02-11 |54.4            |
|2021-02-12 |34346.07        |
|2021-02-13 |54381.65        |
|2021-02-14 |115928.92       |
|2021-02-15 |52.89           |
|2021-02-16 |221188.25       |
|2021-02-17 |95.3            |
|2021-02-18 |140145.44       |
|2021-02-19 |75.81           |
|2021-02-20 |188054.03       |
|2021-02-21 |47327.62        |
|2021-02-22 |55.87           |
|2021-02-23 |62.45           |
|2021-02-24 |90073.44        |
|2021-02-25 |48.53           |
|2021-02-26 |90796.21        |
|2021-02-27 |91.05           |
|2021-02-28 |29486.5         |
+-----------+

In [21]:
print('\nGreen Taxi Trip Records')
long_dis_df_g = duration_trips_cal(df_green, 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'distance',
'Longest Distance', 'trip_distance')
long_dis_df_g.show(n=N_show, truncate=TRCT)


Green Taxi Trip Records
+-----------+----------------+
|Pickup Date|Longest Distance|
+-----------+----------------+
|2021-02-01 |27.52           |
|2021-02-02 |48.1            |
|2021-02-03 |36.33           |
|2021-02-04 |102620.98       |
|2021-02-05 |36.37           |
|2021-02-06 |38.75           |
|2021-02-07 |90.0            |
|2021-02-08 |5634.0          |
|2021-02-09 |34.64           |
|2021-02-10 |60382.7         |
|2021-02-11 |43174.56        |
|2021-02-12 |66659.27        |
|2021-02-13 |15872.69        |
|2021-02-14 |58.03           |
|2021-02-15 |44.04           |
|2021-02-16 |16191.56        |
|2021-02-17 |16240.75        |
|2021-02-18 |29501.25        |
|2021-02-19 |34.95           |
|2021-02-20 |4876.81         |
|2021-02-21 |34.29           |
|2021-02-22 |43.46           |
|2021-02-23 |79.3            |
|2021-02-24 |30195.95        |
|2021-02-25 |50422.63        |
|2021-02-26 |34.85           |
|2021-02-27 |34.5            |
|2021-02-28 |34.76           |
+-----------+-

## 3. Top 5 Most frequent `dispatching_base_num`

In [22]:
dbn_col = F.col('dispatching_base_num')
df_fhv_dbn = df_fhv.groupBy(dbn_col).count()
df_fhv_dbn = df_fhv_dbn.orderBy(F.col('count').desc())

df_fhv_dbn.show(n=5)

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|34605|
|              B01312|32845|
|              B01145|30614|
|              B02794|29998|
|              B03016|29520|
+--------------------+-----+
only showing top 5 rows



## 4. Top 5 Most common location pairs (PUlocationID and DOlocationID)

In [23]:
def top_5_pd(the_df):
    pld_col = 'PULocId_DOLocId-pairs'
    the_df = the_df.withColumn(pld_col, F.struct(F.col('PULocationID'), F.col('DOLocationID')))
    pld_col = F.col(pld_col)
    the_df = the_df.groupBy(pld_col).count()
    the_df = the_df.orderBy(F.col('count').desc())
    return the_df

print('Yellow Taxi Trip Records')
df_5_pd_y = top_5_pd(df_yellow)
df_5_pd_y.show(5)

Yellow Taxi Trip Records
+---------------------+-----+
|PULocId_DOLocId-pairs|count|
+---------------------+-----+
|           {237, 236}|11378|
|           {236, 237}| 9814|
|           {236, 236}| 8754|
|           {237, 237}| 7257|
|           {264, 264}| 5677|
+---------------------+-----+
only showing top 5 rows



In [24]:
print('Green Taxi Trip Records')
df_5_pd_g = top_5_pd(df_green)
df_5_pd_g.show(5)

Green Taxi Trip Records
+---------------------+-----+
|PULocId_DOLocId-pairs|count|
+---------------------+-----+
|             {74, 75}|  986|
|             {75, 74}|  943|
|             {74, 74}|  636|
|             {41, 42}|  529|
|             {74, 42}|  491|
+---------------------+-----+
only showing top 5 rows



In [25]:
pu_col_f = F.col('PULocationID')
do_col_f = F.col('DOLocationID')
df_fhv_pu_cl = df_fhv.where(pu_col_f.isNotNull())
df_fhv_do_cl = df_fhv_pu_cl.where(do_col_f.isNotNull())
df_fhv_do_cl = df_fhv_do_cl.withColumn('PULocationID', pu_col_f.cast('int'))\
    .withColumn('DOLocationID', do_col_f.cast('int'))

print('FHV Trip Records')
df_5_pd_fhv = top_5_pd(df_fhv_do_cl)
df_5_pd_fhv.show(5)

FHV Trip Records
+---------------------+-----+
|PULocId_DOLocId-pairs|count|
+---------------------+-----+
|           {206, 206}| 2340|
|           {221, 206}| 2089|
|           {129, 129}| 1879|
|               {7, 7}| 1808|
|           {179, 179}| 1714|
+---------------------+-----+
only showing top 5 rows



## 5. Write Result to BigQuery

### Direct way

In [29]:
# https://googleapis.dev/python/pandas-gbq/latest/writing.html
import pandas_gbq

# TODO: Set project_id to your Google Cloud Platform project ID.
# project_id = "my-project"

# TODO: Set table_id to the full destination table ID (including the
#       dataset ID).
# table_id = 'my_dataset.my_table'

df = df_5_pd_fhv

pandas_gbq.to_gbq(df, table_id, project_id=project_id)

In [28]:
# https://github.com/GoogleCloudDataproc/spark-bigquery-connector
df_5_pd_fhv.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("pyspark_nyc.five_fhv_pd")

### Indirect way

In [191]:
from google.cloud import storage, bigquery

def save_as_parquet(the_df, filename) -> None:
    the_df.write.parquet(f'{filename}.parquet')

def upload_to_gcs(bucket: str, object_name: str, local_file: str) -> None:
    """
    Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
    * bucket: GCS bucket name (existed)
    * object_name: target path & file-name
    * local_file: source path & file-name\n
    -> return log
    """
    storage_client = storage.Client()

    buckt = storage_client.bucket(bucket)

    blob = buckt.blob(object_name)
    blob.upload_from_filename(local_file)

def gcs_to_bq(table_id: str, gcs_parquet_file: list) -> None:
    """
    https://github.com/googleapis/python-bigquery/blob/main/samples/load_table_uri_parquet.py
    * table_id: YOUR_GCP_PROJECT.YOUR_DATASET.YOUR_TABLE_NAME
    * gcs_parquet_file: list of GCS file to transfer to BQ by the url 
    -> 'gs://YOUR_GCP_BUCKET_NAME/PATH_TO_THE_FILE_PARQUET'
    """
    # Construct a BigQuery client object.
    client = bigquery.Client()

    # # Perform a query.
    # QUERY = (
    #     f'CREATE OR REPLACE EXTERNAL TABLE {table_id}'
    #     'OPTIONS ('
    #         'format = "PARQUET",'
    #         f'uris = {gcs_parquet_file}'
    #     ')'
    # )

    QUERY = (
        f'LOAD DATA INTO {table_id}'
        'OPTIONS ('
            'format = "PARQUET",'
            f'uris = {gcs_parquet_file}'
        ')'
    )

    query_job = client.query(QUERY)  # API request
    # rows = query_job.result()  # Waits for query to finish

### The End.......