In [1]:
#Objective: predicting the trip duration accurately

Get relevant imports

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
class c:
    PURPLE = '\033[95m'
    BLUE = '\033[94m'
    CYAN = '\033[96m'
    GREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    END = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

Extract data we've been given and divide into reasonable chunks

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("AIProject").config("spark.driver.memory", "10g").getOrCreate()
data_frame = spark.read.csv('./training_dataset/training_dataset.csv', header=True, inferSchema=True)
data_frame.describe()


DataFrame[summary: string, ID: string, vendorid: string, passenger_count: string, trip_distance: string, ratecodeid: string, store_and_fwd_flag: string, pulocationid: string, dolocationid: string, payment_type: string, fare_amount: string, extra: string, mta_tax: string, tip_amount: string, tolls_amount: string, improvement_surcharge: string, total_amount: string, congestion_surcharge: string, airport_fee: string, duration: string]

Process the data. Remove whatever that's uncessary and check for missing values. Either remove the missing or replace them. 

But first, check what we're missing and working with.

In [4]:
# Check for missing values
missing_values = data_frame.select([sum(col(c).isNull().cast("int")).alias(c) for c in data_frame.columns])
missing_values.show()
print("Number of missing values are: ", missing_values) 

print("-------------------------------------------------")


+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------+
| ID|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|duration|
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------+
|  0|       0|                   0|                    0|        3155336|            0|   3155336|           3155336|           0|           0|     

We're missing a lot of data from these columns: 

1. passenger_count          3155336

2. ratecodeid               3155336

3. store_and_fwd_flag       3155336

4. congestion_surcharge     3155336

5. airport_fee              3155336


The first four columns are useless for the objective at hand. We'll drop them. 

Whilst we're at it we will remove all other columns that are not useful.

In [5]:
# YOUR CODE HERE
data_frame.drop("ratecodeid", "payment_type", "congestion_surcharge", "passenger_count", 
                         "ID", "airport_fee", "vendorid", "extra", "mta_tax", "tip_amount", "tolls_amount", 
                         "improvement_surcharge", "fare_amount")
data_frame.fillna(0)


DataFrame[ID: string, vendorid: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, ratecodeid: double, store_and_fwd_flag: string, pulocationid: int, dolocationid: int, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double, duration: double]

Missing values have been addressed as well as useless columns. Now, we'll process the data that is relevant. 

In [6]:
print(data_frame.describe())

DataFrame[summary: string, ID: string, vendorid: string, passenger_count: string, trip_distance: string, ratecodeid: string, store_and_fwd_flag: string, pulocationid: string, dolocationid: string, payment_type: string, fare_amount: string, extra: string, mta_tax: string, tip_amount: string, tolls_amount: string, improvement_surcharge: string, total_amount: string, congestion_surcharge: string, airport_fee: string, duration: string]


Check for weird values \
Process data \
Train-test-split \
Choose relevant models
- K-neighbour
- Random forest
- SVC
- ... 

Cross validation \
Draw on graph

In [7]:
# TODO Check for weird values and eliminate
column_names = ['trip_distance', 'pulocationid', 'dolocationid', 'total_amount', 'duration']

for column in column_names:
    print(f"{column}:")
    data_frame.groupBy(col(column)).count().show()
    print()


trip_distance:
+-------------+-----+
|trip_distance|count|
+-------------+-----+
|         3.26|21621|
|        19.98| 1114|
|         0.66|82181|
|         8.51| 3817|
|         2.86|28584|
|         15.5| 4175|
|        17.52| 3195|
|        17.95| 3212|
|         14.9| 3696|
|        10.65| 3061|
|         9.13| 4141|
|         13.4| 3997|
|        17.56| 3418|
|        26.72|  139|
|        40.11|   10|
|        23.04|  192|
|        12.32| 1291|
|        53.82|    4|
|         26.7|  390|
|        61.95|    2|
+-------------+-----+
only showing top 20 rows


pulocationid:
+------------+------+
|pulocationid| count|
+------------+------+
|         148|359744|
|         243|  8606|
|          31|   290|
|         137|349384|
|          85|  4369|
|         251|    58|
|          65| 25298|
|         255| 24980|
|          53|  1019|
|         133|  2413|
|          78|  2661|
|         155|  3434|
|         108|  1715|
|         211|270049|
|         193| 17808|
|          34|  1928

Here we can see that all headers are of type int64, which is good because we don't have to convert the values from numerical to categorical. \
It is visible that some trips are extremely long, such as the 96873 (assmued miles) which is unrealistic for a taxi trip in NYC. We can eliminate this, together with any other unrealistic values. \
We can identify these values using some simple statistical analysis.
[Q1 - 1.5 * IQR, Q3 + 1.5 * IQR]


In [8]:
# Compute Q1, Q3, and IQR using approxQuantile
quantiles = data_frame.approxQuantile("trip_distance", [0.25, 0.75], 0.01)  # 0.01 is the error tolerance

distance_Q1 = quantiles[0]  # 25th percentile
distance_Q3 = quantiles[1]  # 75th percentile
distance_IQR = distance_Q3 - distance_Q1

# Compute lower and upper bounds for outlier detection
distance_lower_bound = distance_Q1 - 1.5 * distance_IQR
distance_upper_bound = distance_Q3 + 1.5 * distance_IQR

# Identify outliers
distance_outliers = data_frame.filter((col("trip_distance") < distance_lower_bound) | (col("trip_distance") > distance_upper_bound))

# Count the number of outliers
outlier_count = distance_outliers.count()
print(f"Number of distance outliers: {outlier_count}")

# Filter out outliers to create a clean DataFrame
df_clean = data_frame.filter((col("trip_distance") >= distance_lower_bound) & (col("trip_distance") <= distance_upper_bound))


Number of distance outliers: 4249424


Now outliers for distance have been removed. Next we should look for outliers in duration. This is done in the same way to be consistent in the calculation.

In [9]:
# Compute Q1, Q3, and IQR using approxQuantile
quantiles = df_clean.approxQuantile("duration", [0.25, 0.75], 0.01)
duration_Q1 = quantiles[0]
duration_Q3 = quantiles[1]
duration_IQR = duration_Q3 - duration_Q1

# Compute lower and upper bounds
duration_lower_bound = duration_Q1 - 1.5 * duration_IQR
duration_upper_bound = duration_Q3 + 1.5 * duration_IQR

# Identify outliers
duration_outliers = df_clean.filter((col("duration") < duration_lower_bound) | (col("duration") > duration_upper_bound))

# Count the number of outliers
outlier_count = duration_outliers.count()
print(f"Number of distance outliers: {outlier_count}")

# Filter out outliers to create a clean DataFrame
df_clean = df_clean.filter(
    (col("duration") >= duration_lower_bound) &
    (col("duration") <= duration_upper_bound) &
    (col("duration").isNotNull())  # Ensure no NULL values
)


df_clean.selectExpr("max(duration)", "min(duration)", "avg(duration)").show()


Number of distance outliers: 828403
+-------------+-------------+-----------------+
|max(duration)|min(duration)|    avg(duration)|
+-------------+-------------+-----------------+
|       1956.0|          0.0|744.1257937917635|
+-------------+-------------+-----------------+



Now that distance and time outliers have been dealt with, next step is to train the models. 

In [14]:
from pyspark.sql.functions import col, hour
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Ensure pickup_hour is extracted correctly
df_clean = df_clean.withColumn("pickup_hour", hour("tpep_pickup_datetime"))

# One-Hot Encoding for Pickup and Dropoff Locations
indexer_pu = StringIndexer(inputCol="pulocationid", outputCol="pulocationid_indexed")
indexer_do = StringIndexer(inputCol="dolocationid", outputCol="dolocationid_indexed")

encoder_pu = OneHotEncoder(inputCol="pulocationid_indexed", outputCol="pulocationid_encoded")
encoder_do = OneHotEncoder(inputCol="dolocationid_indexed", outputCol="dolocationid_encoded")

# Feature Assembler: Combines all features into a single column
feature_assembler = VectorAssembler(
    inputCols=["trip_distance", "pickup_hour", "pulocationid_encoded", "dolocationid_encoded"],
    outputCol="features"
)

# Train-Test Split
train_data, test_data = df_clean.randomSplit([0.8, 0.2], seed=42)

# Define regression models
regressors = {
    "Decision Tree Regressor": DecisionTreeRegressor(featuresCol="features", labelCol="duration"),
    "Random Forest Regressor": RandomForestRegressor(featuresCol="features", labelCol="duration", numTrees=100)
}

# Evaluator for Regression
evaluator = RegressionEvaluator(labelCol="duration", metricName="rmse")

# Train and Evaluate Models
cv_results = {}
for name, model in regressors.items():
    pipeline = Pipeline(stages=[indexer_pu, indexer_do, encoder_pu, encoder_do, feature_assembler, model])
    
    trained_model = pipeline.fit(train_data)  # ✅ train_data is now defined

    predictions = trained_model.transform(test_data)
    rmse = evaluator.evaluate(predictions)
    cv_results[name] = rmse

# Print Model Performance
print("Model Performance:")
for model, rmse in cv_results.items():
    print(f"{model}: RMSE = {rmse:.4f}")

# Best Model Selection (Lowest RMSE)
best_model = min(cv_results, key=cv_results.get)
print(f"\nBest model: {best_model} with RMSE = {cv_results[best_model]:.4f}")


Model Performance:
Decision Tree Regressor: RMSE = 0.0000
Random Forest Regressor: RMSE = 0.0000

Best model: Decision Tree Regressor with RMSE = 0.0000
