**Load the training data**

Format of training data is as follows:

"id, vender_id, pickup_datetime, dropoff_datetime, passenger_count, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, store_and_fwd_flag, trip_duration"

This data has been cleaned for competition purposes as well.

In [1]:
from google.colab import files

uploaded = files.upload()

Saving train.csv to train.csv


**Load the test data**

Format of the test data is as follows:

"id, vendor_id, pickup_datetime, passenger_count, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, store_and_fws_flag"

This data has also been cleaned for competition purposes.

In [2]:
from google.colab import files

uploaded = files.upload()

Saving test.csv to test.csv


Install pyspark in our Google Collab environment.

This will be necessary to get all the libraries needed to create our model and run it.

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=ae1bcb58b82fb1c034ea7f1f357a24c97740aa3d214f2ccca14c73958b7e057d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


**Training**

This section is strictly for training.

We train our AI by using linear regression. Just simply running linear regression on the test data resulted with a RMSE value of greater than 3000 so we had to improvise and clean up the data further.

We use the haversine distance formula to determine the distance of each trip to add as a feature for our model.

We also extract a number of different date time data from our pickup as features to further improve our RMSE score. We would like to use dropoff datetime data as well but that data was not provided in the test data.

We use a 80-20 split for our training, validation data so see how well our model is working.

A random sample is displayed to show case our RMSE, MSE and score.



**Testing**

Same as we did with our training data, we create a bunch more features for better accuracy.

We print a random sample of our results and then save the rest to a CSV file.

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, radians, asin, sin, cos, sqrt, expr, hour, dayofweek, month
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, PolynomialExpansion
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import udf
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import DoubleType

# Spark session
spark = SparkSession.builder.appName("NYC_Taxi_Trip_Time_Prediction").getOrCreate()

# Load training data as a spark DataFrame
# Train data is formatted as "id, vender_id, pickup_datetime, dropoff_datetime, passenger_count, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, store_and_fwd_flag, trip_duration"
training_data_path = "train.csv"
df_train = spark.read.csv(training_data_path, header=True, inferSchema=True)


# Haversine to get distance in km and add as column
df_train = df_train.withColumn(
    "distance_km",
    asin(
        sqrt(
            sin(radians(col("dropoff_latitude") - col("pickup_latitude")) / 2) ** 2 +
            cos(radians(col("pickup_latitude"))) * cos(radians(col("dropoff_latitude"))) *
            sin(radians(col("dropoff_longitude") - col("pickup_longitude")) / 2) ** 2
        )
    ) * 2 * 6371  # Radius of Earth in kilometers
)

# Add additional feature data from the pickup_datetime to improve scores
df_train = df_train.withColumn("pickup_hour", hour("pickup_datetime"))
df_train = df_train.withColumn("pickup_dayofweek", dayofweek("pickup_datetime"))
df_train = df_train.withColumn("pickup_month", month("pickup_datetime"))

# Drop unnecessary columns
columns_to_drop = ["id", "dropoff_datetime", "store_and_fwd_flag", "pickup_datetime"]
df_train = df_train.drop(*columns_to_drop)

# Display the updated schema and first few rows of the training data
df_train.printSchema()
try:
    df_train.show(5)
except Exception as e:
    print(f"An error occurred while displaying the DataFrame: {e}")

# Data preprocessing
feature_columns = ["vendor_id", "passenger_count", "pickup_longitude", "pickup_latitude", \
                   "dropoff_longitude", "dropoff_latitude", "distance_km", "pickup_hour", \
                   "pickup_dayofweek", "pickup_month"]
label_column = "trip_duration"

# Create a feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_train = assembler.transform(df_train).select("features", label_column)

# Remove outlier trips with extremely long or short durations
# Remove outlier distances that are 0 or greater then 100 (NYC border to border is 35 miles)
df_train = df_train.filter((col("trip_duration") >= 60) & (col("trip_duration") <= 3600))
df_train = df_train.filter((col("Distance_km") > 0) & (col("distance_km") <= 100))

# 80% test, 20% validation
(training_data, validation_data) = df_train.randomSplit([0.8, 0.2], seed=42)

# Linear Regression
lr = LinearRegression(featuresCol="features", labelCol=label_column)

# Define the parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.01, 0.001])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

# Evaluate the model with the RMSE score for cross validation
evaluator = RegressionEvaluator(labelCol=label_column, predictionCol="prediction", metricName="rmse")

# Cross-validator with k-fold cross validation
cross_validate = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Perform cross-validation and get the best model
cv_model = cross_validate.fit(training_data)
best_model = cv_model.bestModel

# Make the predictions on the best models set
predictions = best_model.transform(validation_data)

# Evaluate Root Mean Squared Error
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on validation data = {rmse}")

# Evaluate Mean Squared Error (MSE)
evaluator = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE): {mse}")

# Evaluate accuracy (for regression, accuracy may not be meaningful)
evaluator = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(f"R-squared (R2): {r2}")

# Evaluate Mean Absolute Error (MAE)
mae_evaluator = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")

# Show actual duration vs predicted
# Select a random sample of 10% of the predictions
random_sample = predictions.sample(fraction=0.1, seed=42)

# Show the result
random_sample.select("trip_duration", "prediction").show()


# Testing stage

print(f"\nWe are now testing our model\n")

root
 |-- vendor_id: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)

+---------+---------------+------------------+------------------+------------------+------------------+-------------+------------------+-----------+----------------+------------+
|vendor_id|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|trip_duration|       distance_km|pickup_hour|pickup_dayofweek|pickup_month|
+---------+---------------+------------------+------------------+------------------+------------------+-------------+----------

**Testing**

Same as we did with our training data, we create a bunch more features for better accuracy.

We print a random sample of our results and then save the rest to a CSV file.

In [6]:
# Load testing data
test_data_path = "test.csv"
df_test = spark.read.csv(test_data_path, header=True, inferSchema=True)

# Drop unnecessary columns
# Testing data format: id, vendor_id, pickup_datetime, passenger_count, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, store_and_fws_flag
columns_to_drop_test = ["store_and_fwd_flag"]
df_test = df_test.drop(*columns_to_drop_test)

# Calculate haversine distance for testing data
df_test = df_test.withColumn(
    "distance_km",
    asin(
        sqrt(
            sin(radians(col("dropoff_latitude") - col("pickup_latitude")) / 2) ** 2 +
            cos(radians(col("pickup_latitude"))) * cos(radians(col("dropoff_latitude"))) *
            sin(radians(col("dropoff_longitude") - col("pickup_longitude")) / 2) ** 2
        )
    ) * 2 * 6371  # Radius of Earth in kilometers
)

# Add additional feature data from the pickup_datetime to improve scores
df_test = df_test.withColumn("pickup_hour", hour("pickup_datetime"))
df_test = df_test.withColumn("pickup_dayofweek", dayofweek("pickup_datetime"))
df_test = df_test.withColumn("pickup_month", month("pickup_datetime"))

# Display the updated schema and first few rows of the training data
df_test.printSchema()
try:
    df_test.show(5)
except Exception as e:
    print(f"An error occurred while displaying the DataFrame: {e}")

# Data preprocessing for testing data
df_test = assembler.transform(df_test).select("id", "features")

# Make predictions on the testing set
test_predictions = best_model.transform(df_test)

# Get a random sample of 10% of the test predictions
random_sample = test_predictions.sample(fraction=0.1, seed=42)


# Display the predicted travel times for testing data
random_sample.select("id", "prediction").show(1000)

# Path of results
csv_path = "results"

# Save as CSV
random_sample.select("id", "prediction").write.csv(csv_path, header=True, mode="overwrite")

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)

+---------+---------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+-----------+----------------+------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|       distance_km|pickup_hour|pickup_dayofweek|pickup_month|
+---------+---------+-------------------+---------------+--