#PART 3: Machine Learning

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Imputer
from pyspark.ml.regression import DecisionTreeRegressor

In [0]:
spark = SparkSession.builder \
        .appName('at2_part3_ml') \
        .getOrCreate()

In [0]:
parquet_output_path = "/FileStore/my_dataframe/final_df"
df_final = spark.read.parquet(parquet_output_path)

In [0]:
df_final.createOrReplaceTempView("green_yellow_taxi_df")

In [0]:
df_final.columns

Out[5]: ['lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge',
 'trip_distance_km',
 'trip_duration_hours',
 'speed_kmph',
 'airport_fee',
 'taxi_color',
 'pickup_borough',
 'pickup_zone',
 'dropoff_borough',
 'dropoff_zone']

##Data Preparation

#### Features selection

In [0]:
col_list = ['passenger_count','lpep_pickup_datetime','trip_distance_km','trip_duration_hours','tip_amount','total_amount']

In [0]:
df_cleaned = df_final.select(col_list)

#### Raplace missing value with mean 

In [0]:
imputer = Imputer(
    inputCols=['passenger_count'], 
    outputCols=['passenger_count_imputed'],  
    strategy='mean'  
)

# Fit and transform the data
df_cleaned= imputer.fit(df_cleaned).transform(df_cleaned)

#### Split data

Filter the data for October, November, and December 2022 for the test set

In [0]:
test_set = df_cleaned.filter(
    (year(df_cleaned.lpep_pickup_datetime) == 2022) &
    (month(df_cleaned.lpep_pickup_datetime).isin([10, 11, 12]))
)

In [0]:
train_data = df_cleaned.subtract(test_set)

In [0]:
train_data = train_data.drop("lpep_pickup_datetime","passenger_count")
test_set = test_set.drop("lpep_pickup_datetime","passenger_count")

In [0]:
# Sample 0.0001 (0.01%) of the data for training
sampled_train = train_data.sample(fraction=0.0001, seed=42)
sampled_test = test_set.sample(fraction=0.0001, seed=42)

Split the train_data into training (80%) and validation (20%) sets

In [0]:
train_set, val_set = sampled_train.randomSplit([0.8, 0.2], seed=42)

## Baseline Model

In [0]:
Trip_stat ="""
SELECT
    taxi_color AS TaxiColor,
    CONCAT(pickup_zone, ' to ', dropoff_zone) AS Pickup_Dropoff,
    CONCAT(LPAD(MONTH(lpep_pickup_datetime), 2, '0')) AS Month,
    DATE_FORMAT(lpep_pickup_datetime, 'EEEE') AS DayOfWeek,
    HOUR(lpep_pickup_datetime) AS HourOfDay,
    COUNT(*) AS TotalTrips,
    AVG(trip_distance_km) AS AvgDistance,
    SUM(total_amount) / COUNT(*) AS AvgAmountPaid,
    SUM(total_amount) AS TotalAmountPaid
FROM
    green_yellow_taxi_df
WHERE trip_distance_km > 0
GROUP BY
    TaxiColor, Pickup_Dropoff, Month, DayOfWeek, HourOfDay
ORDER BY 
    TaxiColor, Pickup_Dropoff, Month, DayOfWeek, HourOfDay
"""

TripSummary = spark.sql(Trip_stat)

In [0]:
y_base = TripSummary.selectExpr("AvgAmountPaid").first()[0]

# Create a new DataFrame with predicted values (y_base - 'TotalAmountPaid')
baseline_predictions = TripSummary.withColumn("prediction", y_base - col("TotalAmountPaid"))

# Evaluate the baseline model using an appropriate evaluation metric
evaluator = RegressionEvaluator(labelCol="TotalAmountPaid", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(baseline_predictions)

# Print the RMSE of the baseline model
print(f"Baseline Model RMSE: {rmse}")

Baseline Model RMSE: 3334.553055253188


In [0]:
y_base = sampled_train.selectExpr("avg(total_amount) as prediction").first()[0]

# Create a new DataFrame with predicted values (y_base - 'TotalAmountPaid')
baseline_predictions = sampled_train.withColumn("prediction", y_base - col("total_amount"))

# Evaluate the baseline model using an appropriate evaluation metric
evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(baseline_predictions)

# Print the RMSE of the baseline model
print(f"Baseline Model RMSE: {rmse}")

In [0]:
y_mean = sampled_train.selectExpr("avg(total_amount) as prediction").first().prediction

# Create a new DataFrame with predicted values
baseline = sampled_train.select("total_amount").withColumn("basline_pred",lit(y_mean))

# Evaluate the baseline model
evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="basline_pred", metricName="rmse")
basline_rmse = evaluator.evaluate(baseline)

# Print the RMSE of the baseline model
print(f"Baseline Model RMSE: {basline_rmse}")

## Linear Regression

In [0]:
feature_columns = [
    'passenger_count_imputed',
    'trip_distance_km',
    'trip_duration_hours',
    'tip_amount'
]

In [0]:
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

#####Fitting model on Train set

In [0]:
lr = LinearRegression(featuresCol="features", labelCol="total_amount")
pipeline_lr = Pipeline(stages=[vector_assembler, lr])

In [0]:
model_lr = pipeline_lr.fit(train_set)

###### Save the model using Spark's built-in serialization

In [0]:
import os

# Directory path
directory_path = '/dbfs/mnt/at2-bde/'

# Create the directory 
if not os.path.exists(directory_path):
    os.makedirs(directory_path)

# Specify the complete directory path where to save the model
model_path = "/dbfs/mnt/at2-bde/model_lr"
model_lr.write().overwrite().save(model_path)

In [0]:
from pyspark.ml.pipeline import PipelineModel

# Load the model
model_lr = PipelineModel.load('/dbfs/mnt/at2-bde/model_lr')

#####Train model on Validation set

In [0]:
lr_pred = model_lr.transform(val_set)

# Evaluate the model on the val data
evaluator_lreg = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
lr_rmse = evaluator_lreg.evaluate(lr_pred)

print("Linear Regression RMSE:", lr_rmse)

Linear Regression RMSE: 3.3556476540654008


## Decision Tree

##### Fiting the model on Train set

In [0]:
dt = DecisionTreeRegressor(featuresCol='features', labelCol='total_amount', maxDepth=5)
pipeline_dt = Pipeline(stages=[vector_assembler, dt])

model_dt = pipeline_dt.fit(train_set)

In [0]:
import os

# Directory path
directory_path = '/dbfs/mnt/at2-bde/'

# Create the directory 
if not os.path.exists(directory_path):
    os.makedirs(directory_path)

# Specify the complete directory path where to save the model
model_path = "/dbfs/mnt/at2-bde/model_dt"
model_dt.write().overwrite().save(model_path)

In [0]:
from pyspark.ml.pipeline import PipelineModel

# Load the model
model_dt = PipelineModel.load('/dbfs/mnt/at2-bde/model_dt')

##### Fiting the model on Validation set

In [0]:
dt_pred = model_dt.transform(val_set)

evaluator_dt = RegressionEvaluator(labelCol='total_amount', predictionCol='prediction', metricName='rmse')
dt_rmse = evaluator_dt.evaluate(dt_pred)

print(f"Decision Tree RMSE: {dt_rmse}")

Decision Tree RMSE: 4.194063080024141


## Train the best model on Test set

### Linear regression

#### Fit the best model all test set

In [0]:
lr_pred_test = model_lr.transform(test_set)

# Evaluate the model on the val data
evaluator_lreg_test = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
lr_rmse_test = evaluator_lreg_test.evaluate(lr_pred_test)
print("Linear Regression RMSE on Test set:", lr_rmse_test)

Linear Regression RMSE on Test set: 6.967146645795853


#### Fit the best model with sampled test set

In [0]:
lr_pred_samtest = model_lr.transform(sampled_test)

# Evaluate the model on the val data
evaluator_lreg_samtest = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
lr_rmse_samtest = evaluator_lreg_samtest.evaluate(lr_pred_samtest)
print("Linear Regression RMSE on Test set:", lr_rmse_samtest)

Linear Regression RMSE on Test set: 6.729204590213371


###Random Forest

#### Fit the model all test set

In [0]:
dt_pred_test = model_dt.transform(test_set)

evaluator_dt_test = RegressionEvaluator(labelCol='total_amount', predictionCol='prediction', metricName='rmse')
dt_rmse_test = evaluator_dt_test.evaluate(dt_pred_test)

print(f"Decision Tree RMSE on Test set: {dt_rmse_test}")

Decision Tree RMSE on Test set: 8.061201048524683


#### Fit the model sampled test set

In [0]:
dt_pred_samtest = model_dt.transform(sampled_test)

evaluator_dt_test = RegressionEvaluator(labelCol='total_amount', predictionCol='prediction', metricName='rmse')
dt_rmse_test = evaluator_dt_test.evaluate(dt_pred_samtest)

print(f"Decision Tree RMSE on Test set: {dt_rmse_test}")

Decision Tree RMSE on Test set: 8.230829771750095
