In [34]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
import os
plt.style.use("fivethirtyeight")
import warnings
warnings.filterwarnings("ignore")

# Preparing Data

## Create SparkSession

In [35]:
spark = SparkSession.builder.appName("NYC_ETA").getOrCreate()

## Read Parquet Files

In [36]:
# Merge all data parquet files

jul_data_path = "../data/yellow_tripdata_2024-07.parquet"
aug_data_path = "../data/yellow_tripdata_2024-08.parquet"
sep_data_path = "../data/yellow_tripdata_2024-09.parquet"
df_jul = spark.read.parquet(jul_data_path)
df_aug = spark.read.parquet(aug_data_path)
df_sep = spark.read.parquet(sep_data_path)

df = df_jul.union(df_aug).union(df_sep)

output_path = "../data/merged_yellow_tripdata_2024_Q3.parquet"
# df.write.parquet(output_path)

df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-07-01 00:34:56|  2024-07-01 00:46:49|              1|          3.2|         1|                 N|         140|          79|           1|       15.6|  3.5|    0.5|       3.

In [37]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



![data_schema.png](./data_schema.png)

### Merge Taxi Zone Data

In [38]:
zones_df = spark.read.csv("../data/taxi_zone_lookup.csv", header=True, inferSchema=True)
zones_df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



#### Merge **zones_df** with **df** by **Pick Up Location**

In [39]:
df = df.join(zones_df, df.PULocationID == zones_df.LocationID, "inner")

df = df \
    .withColumnRenamed("Borough", "PU_Borough") \
    .withColumnRenamed("Zone", "PU_Zone") \
    .withColumnRenamed("service_zone", "PU_service_zone")

df = df.drop("LocationID")

df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+--------------------+---------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|PU_Borough|             PU_Zone|PU_service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+--------------------+---------------+
|       1| 2024-07-01 00:34:56|  2024-07-01

#### Merge **zones_df** with **df** by **Drop Off Location**

In [40]:
df = df.join(zones_df, df.DOLocationID == zones_df.LocationID, "inner")

df = df \
    .withColumnRenamed("Borough", "DO_Borough") \
    .withColumnRenamed("Zone", "DO_Zone") \
    .withColumnRenamed("service_zone", "DO_service_zone")

df = df.drop("LocationID")

df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+--------------------+---------------+----------+--------------------+---------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|PU_Borough|             PU_Zone|PU_service_zone|DO_Borough|             DO_Zone|DO_service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+--------

#### Drop some irrelevant columns which has no or little impact in estimate time arrival

In [41]:
columns_to_drop = [
    "store_and_fwd_flag", 
    "payment_type", 
    "fare_amount", 
    "extra", 
    "mta_tax", 
    "tip_amount", 
    "tolls_amount", 
    "improvement_surcharge", 
    "total_amount", 
    "PU_service_zone", 
    "DO_service_zone"
]

df = df.drop(*columns_to_drop)

df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------+------------+--------------------+-----------+----------+--------------------+----------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|PULocationID|DOLocationID|congestion_surcharge|Airport_fee|PU_Borough|             PU_Zone|DO_Borough|             DO_Zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------+------------+--------------------+-----------+----------+--------------------+----------+--------------------+
|       1| 2024-07-01 00:34:56|  2024-07-01 00:46:49|              1|          3.2|         1|         140|          79|                 2.5|        0.0| Manhattan|     Lenox Hill East| Manhattan|        East Village|
|       2| 2024-06-30 23:48:58|  2024-07-01 00:28:04|              1|        19.48|         2|         132|         113|        

# Preprocessing Data

## Data Cleaning

#### trip_distance


In [42]:
from pyspark.sql.functions import col, mean as spark_mean, stddev

# Calculate mean and standard deviation
trip_dis_stats = df.select(
    spark_mean(col("trip_distance")).alias("mean"),
    stddev(col("trip_distance")).alias("stddev")
).collect()[0]
trip_dis_stats

Row(mean=5.296567671395089, stddev=433.93545298571394)

In [43]:
trip_dis_mean = trip_dis_stats["mean"]
trip_dis_stddev = trip_dis_stats["stddev"]

print(f"Mean for Trip Duration is: {trip_dis_mean}")
print(f"Standard Deviation for Trip Duration is: {trip_dis_stddev}")

# Filter rows within 2 standard deviations
df = df.filter(
    (col("trip_distance") >= trip_dis_mean - 2 * trip_dis_stddev) &
    (col("trip_distance") <= trip_dis_mean + 2 * trip_dis_stddev) &
    (col("trip_distance") >= 1)
)

df.describe().show()

Mean for Trip Duration is: 5.296567671395089
Standard Deviation for Trip Duration is: 433.93545298571394




+-------+------------------+------------------+------------------+------------------+------------------+-----------------+--------------------+-------------------+----------+--------------------+----------+--------------------+
|summary|          VendorID|   passenger_count|     trip_distance|        RatecodeID|      PULocationID|     DOLocationID|congestion_surcharge|        Airport_fee|PU_Borough|             PU_Zone|DO_Borough|             DO_Zone|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+--------------------+-------------------+----------+--------------------+----------+--------------------+
|  count|           7463340|           6606139|           7463340|           6606139|           7463340|          7463340|             6606139|            6606139|   7463340|             7463340|   7463340|             7463340|
|   mean| 1.770373452100534|1.3486086199518357|4.4061404505223205|2.7390846604953363|160

                                                                                

#### passenger_count

Filter rows where `passenger_count` is 7, 8, 9 or 0 because it is uncommon in NYC taxi data
> *(most taxis accommodate up to 6 passengers)*

In [45]:
from pyspark.sql.functions import col
df = df.filter(~(col("passenger_count").isin([7, 8, 9, 0])))

##### airport_fee to enter_airport

In [48]:
from pyspark.sql.functions import when

df = df.withColumn(
    "enter_airport", 
    when(col("airport_fee") > 0, "1").otherwise("0")
)
df.select("enter_airport").show(5)


+-------------+
|enter_airport|
+-------------+
|            0|
|            1|
|            0|
|            1|
|            0|
+-------------+
only showing top 5 rows



In [49]:
df = df.drop("airport_fee")

##### pick_up and drop_off datetime

In [50]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss")) \
       .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss"))

df.select("tpep_pickup_datetime", "tpep_dropoff_datetime").show(truncate=False)


+--------------------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|
+--------------------+---------------------+
|2024-07-01 07:34:56 |2024-07-01 07:46:49  |
|2024-07-01 06:48:58 |2024-07-01 07:28:04  |
|2024-07-01 07:23:18 |2024-07-01 07:29:51  |
|2024-07-01 07:07:55 |2024-07-01 07:34:34  |
|2024-07-01 07:19:42 |2024-07-01 07:32:13  |
|2024-07-01 07:34:04 |2024-07-01 07:43:13  |
|2024-07-01 07:59:52 |2024-07-01 08:05:27  |
|2024-07-01 07:13:50 |2024-07-01 07:28:03  |
|2024-07-01 07:43:34 |2024-07-01 08:01:43  |
|2024-07-01 07:52:25 |2024-07-01 08:17:53  |
|2024-07-01 07:51:18 |2024-07-01 08:01:42  |
|2024-07-01 07:33:24 |2024-07-01 07:54:44  |
|2024-07-01 07:56:52 |2024-07-01 08:12:59  |
|2024-07-01 07:36:26 |2024-07-01 07:50:42  |
|2024-07-01 07:59:32 |2024-07-01 08:07:58  |
|2024-07-01 07:16:06 |2024-07-01 07:45:42  |
|2024-07-01 07:56:04 |2024-07-01 08:13:10  |
|2024-07-01 07:18:04 |2024-07-01 07:49:00  |
|2024-07-01 07:09:49 |2024-07-01 07:36:44  |
|2024-07-0

`tpep_pickup_datetime`

In [54]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

df = df.withColumn("pickup_date", F.to_date(F.col("tpep_pickup_datetime")))
df = df.withColumn("pickup_hour", F.hour(F.col("tpep_pickup_datetime")))
df = df.withColumn("pickup_minute", F.minute(F.col("tpep_pickup_datetime")))
df = df.withColumn("pickup_weekday", F.dayofweek(F.col("tpep_pickup_datetime")) - 1)  # Adjust to 0-based (Sunday: 0, Monday: 1,...)
df = df.withColumn("pickup_weekofyear", F.weekofyear(F.col("tpep_pickup_datetime")))
df = df.withColumn("pickup_dayofyear", F.dayofyear(F.col("tpep_pickup_datetime")))
df = df.withColumn("pickup_month", F.month(F.col("tpep_pickup_datetime")))


# Weekly hour feature (captures patterns based on the time of day and the day of the week)
df = df.withColumn(
    "pickup_week_hour", F.col("pickup_weekday") * 24 + F.col("pickup_hour")  # 0 (Sunday midnight) to 167 (Saturday 11:00 PM)
)

`tpep_dropoff_datetime`

In [56]:
df = df.withColumn("dropoff_date", F.to_date(F.col("tpep_dropoff_datetime")))
df = df.withColumn("dropoff_hour", F.hour(F.col("tpep_dropoff_datetime")))
df = df.withColumn("dropoff_minute", F.minute(F.col("tpep_dropoff_datetime")))
df = df.withColumn("dropoff_weekday", F.dayofweek(F.col("tpep_dropoff_datetime")) - 1)  # Adjust to 0-based (Sunday: 0, Monday: 1,...)
df = df.withColumn("dropoff_month", F.month(F.col("tpep_dropoff_datetime")))

df.select(
    "pickup_date", "pickup_hour", "pickup_weekday", "pickup_weekofyear", "pickup_dayofyear", "pickup_month", "pickup_minute", 
    "pickup_week_hour", "dropoff_date", "dropoff_hour", "dropoff_minute", "dropoff_weekday", "dropoff_month"
).show(truncate=False)


+-----------+-----------+--------------+-----------------+----------------+------------+-------------+----------------+------------+------------+--------------+---------------+-------------+
|pickup_date|pickup_hour|pickup_weekday|pickup_weekofyear|pickup_dayofyear|pickup_month|pickup_minute|pickup_week_hour|dropoff_date|dropoff_hour|dropoff_minute|dropoff_weekday|dropoff_month|
+-----------+-----------+--------------+-----------------+----------------+------------+-------------+----------------+------------+------------+--------------+---------------+-------------+
|2024-07-01 |7          |1             |27               |183             |7           |34           |31              |2024-07-01  |7           |46            |1              |7            |
|2024-07-01 |6          |1             |27               |183             |7           |48           |30              |2024-07-01  |7           |28            |1              |7            |
|2024-07-01 |7          |1             |27   

In [None]:
# df = df.drop("tpep_pickup_datetime", "tpep_dropoff_datetime")