In [0]:
combined_data = spark.read.parquet("/FileStore/green_yellow_location_combined")

In [0]:
combined_data.show(5)

+--------+-------------------+---------------+-------------+------------+------------+----------+------------+----------------+-------------------+------------------+----------+---------+---------+
|VendorID|    pickup_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|tip_amount|total_amount|duration_seconds|     duration_hours|         speed_mph|taxi_color|PUBorough|DOBorough|
+--------+-------------------+---------------+-------------+------------+------------+----------+------------+----------------+-------------------+------------------+----------+---------+---------+
|       2|2015-05-01 00:07:45|            1.0|         1.09|         255|         112|       1.0|         7.8|             305|0.08472222222222223|12.865573770491803|    yellow| Brooklyn| Brooklyn|
|       2|2015-05-01 00:26:23|            1.0|         5.73|         255|          13|      4.66|       27.96|            1729| 0.4802777777777778|11.930595720069405|     green| Brooklyn|Manhattan|
|       2|

In [0]:
# Create a final dataframe for modelling containing data until September 2022
final_model_data = combined_data.where(combined_data.pickup_datetime < '2022-10-01 00:00:00')

# Put the data for Oct-Dec 2022 in a separate dataframe which will be used to test the ultimately trained model
test_data = combined_data.where(combined_data.pickup_datetime >= '2022-10-01 00:00:00')

In [0]:
final_model_data.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- duration_seconds: long (nullable = true)
 |-- duration_hours: double (nullable = true)
 |-- speed_mph: double (nullable = true)
 |-- taxi_color: string (nullable = true)
 |-- PUBorough: string (nullable = true)
 |-- DOBorough: string (nullable = true)



In [0]:
# Creating a list containing the column names
cols_list = ["passenger_count", "trip_distance", "tip_amount",  "duration_seconds", "duration_hours", "speed_mph", "taxi_color", "total_amount"]

In [0]:
final_model_data = final_model_data[cols_list]

In [0]:
# Dropping rows with NA values in the desired features
final_model_data = final_model_data.na.drop(subset=["trip_distance", "duration_hours", "taxi_color"])

In [0]:
final_model_data.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- duration_seconds: long (nullable = true)
 |-- duration_hours: double (nullable = true)
 |-- speed_mph: double (nullable = true)
 |-- taxi_color: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [0]:
# Creating an empty list called stages
stages = []

In [0]:
# Import OneHotEncoder, StringIndexer, VectorAssembler from pyspark.ml.feature 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
# Instantiate StringIndexer and OneHotEncoder for taxi_color variable and add them to stages
col_indexer = StringIndexer(inputCol="taxi_color", outputCol="taxi_color_ind")
col_encoder = OneHotEncoder(inputCols=["taxi_color_ind"], outputCols=["taxi_color_ohe"])
stages += [col_indexer, col_encoder]

In [0]:
# Importing VectorAssembler
from pyspark.ml.feature import VectorAssembler
# Instantiate a VectorAssembler with the desired features
vector_assembler = VectorAssembler(inputCols = ["trip_distance", "duration_hours", "taxi_color_ohe"], outputCol = "features")

In [0]:
# Add vector_assembler to stages
stages += [vector_assembler]

In [0]:
# Import Pipeline from pyspark.ml
from pyspark.ml import Pipeline
# Instantiate a Pipeline with stages
pipeline = Pipeline(stages=stages)

In [0]:
pipeline_model = pipeline.fit(final_model_data)

In [0]:
final_model_data = pipeline_model.transform(final_model_data)

In [0]:
# Select the features column, target variable and all the rest of the variables
final_model_data = final_model_data.select(['features'] + cols_list)
final_model_data.show()

+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+
|            features|passenger_count|trip_distance|tip_amount|duration_seconds|      duration_hours|         speed_mph|taxi_color|total_amount|
+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+
|[1.09,0.084722222...|            1.0|         1.09|       1.0|             305| 0.08472222222222223|12.865573770491803|    yellow|         7.8|
|[5.73,0.480277777...|            1.0|         5.73|      4.66|            1729|  0.4802777777777778|11.930595720069405|     green|       27.96|
|   [2.35,0.1725,1.0]|            1.0|         2.35|       0.0|             621|              0.1725|13.623188405797103|    yellow|        10.8|
|[2.68,0.210833333...|            1.0|         2.68|      3.08|             759| 0.21083333333333334|12.711462450592885|     green

In [0]:
# Creating two copies of the final_model_data for the two ML models
final_model_data_lr = final_model_data
final_model_data_dtr = final_model_data

In [0]:
##Linear Regression Model

In [0]:
# Split the final_model_data_lr into train and test sets with a 75-25 ratio
train_data, test_data = final_model_data_lr.randomSplit([0.75, 0.25], seed=8)

In [0]:
# Import LinearRegression from pyspark.ml.regression
from pyspark.ml.regression import LinearRegression

# Fit a linear regression model to the train data 
linearRegressor = LinearRegression(featuresCol = 'features', labelCol = 'total_amount')
lr_model = linearRegressor.fit(train_data)

In [0]:
# Get the summary of the linear regression model
lr_model_summary = lr_model.summary

In [0]:
# Retrieve the rmse of the predictions on the train data
train_rmse = lr_model_summary.rootMeanSquaredError

In [0]:
# Print the rmse of the predictions on the train data
print(train_rmse)

4.133512342436048


In [0]:
# Use the trained linear regression model to make predictions on the training set
lr_model_train_preds = lr_model.transform(train_data)

In [0]:
# Print the first 5 rows of the training set with the predictions
lr_model_train_preds.show(5)

+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|            features|passenger_count|trip_distance|tip_amount|duration_seconds|      duration_hours|         speed_mph|taxi_color|total_amount|       prediction|
+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|[0.45,0.016944444...|            1.0|         0.45|       0.0|              61|0.016944444444444446|26.557377049180324|     green|        13.0|5.070840288915878|
|[0.45,0.016944444...|            2.0|         0.45|       0.0|              61|0.016944444444444446|26.557377049180324|    yellow|         4.3| 5.07273850600082|
|[0.45,0.017222222...|            1.0|         0.45|       0.0|              62|0.017222222222222222|26.129032258064516|     green|         4.3|5.076171119658134|
|[0.45,0.017222222...|

In [0]:
# Use the trained linear regression model to make predictions on the test data
lr_model_test_preds = lr_model.transform(test_data)

In [0]:
# Print the first 5 rows of the test set with the predictions
lr_model_test_preds.show(5)

+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+------------------+
|            features|passenger_count|trip_distance|tip_amount|duration_seconds|      duration_hours|         speed_mph|taxi_color|total_amount|        prediction|
+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+------------------+
|[0.45,0.017222222...|            1.0|         0.45|       0.0|              62|0.017222222222222222|26.129032258064516|     green|         4.8| 5.076171119658134|
|   [0.45,0.0175,1.0]|            1.0|         0.45|       0.0|              63|              0.0175| 25.71428571428571|    yellow|         4.3|5.0834001674853315|
|[0.45,0.018333333...|            1.0|         0.45|       0.0|              66|0.018333333333333333|24.545454545454547|     green|         4.3| 5.097494442627156|
|[0.45,0.0183333

In [0]:
# Retrieve the rmse of the predictions made on the test set
lr_model_test_eval = lr_model.evaluate(test_data)
test_rmse = lr_model_test_eval.rootMeanSquaredError
print(test_rmse)

4.752936382407733


In [0]:
## Decision Tree Regression Model

In [0]:
# Split the final_model_data_dtr into train and test sets with a 75-25 ratio
train_data, test_data = final_model_data_dtr.randomSplit([0.75, 0.25], seed=8)

In [0]:
# Import DecisionTreeRegressor from pyspark.ml.regression
from pyspark.ml.regression import DecisionTreeRegressor
# Fit a Decision Tree Regression model to the training set
decisionTreeRegressor = DecisionTreeRegressor(featuresCol = 'features', labelCol = 'total_amount')
dtr_model = decisionTreeRegressor.fit(train_data)

In [0]:
# Use the trained decision tree regression model to make predictions on the training set
dtr_model_train_preds = dtr_model.transform(train_data)
# Print the first 5 rows of the training set with the predictions
dtr_model_train_preds.show(5)

+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|            features|passenger_count|trip_distance|tip_amount|duration_seconds|      duration_hours|         speed_mph|taxi_color|total_amount|       prediction|
+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|[0.45,0.016944444...|            1.0|         0.45|       0.0|              61|0.016944444444444446|26.557377049180324|     green|        13.0|6.378293191334128|
|[0.45,0.016944444...|            2.0|         0.45|       0.0|              61|0.016944444444444446|26.557377049180324|    yellow|         4.3|6.378293191334128|
|[0.45,0.017222222...|            1.0|         0.45|       0.0|              62|0.017222222222222222|26.129032258064516|     green|         4.3|6.378293191334128|
|[0.45,0.017222222...|

In [0]:
# Import RegressionEvaluator from pyspark.ml.evaluation
from pyspark.ml.evaluation import RegressionEvaluator
# Instantiate a RegressionEvaluator with rmse as the metric
dtr_train_evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
# Retrieve the rmse of the predictions made on the training set
train_rmse = dtr_train_evaluator.evaluate(dtr_model_train_preds)
print(train_rmse)

4.656714814275851


In [0]:
# Use the trained decision tree regression model to make predictions on the test data
dtr_model_test_preds = dtr_model.transform(test_data)
# Print the first 5 rows of the training set with the predictions
dtr_model_test_preds.show(5)

+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|            features|passenger_count|trip_distance|tip_amount|duration_seconds|      duration_hours|         speed_mph|taxi_color|total_amount|       prediction|
+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|[0.45,0.017222222...|            1.0|         0.45|       0.0|              62|0.017222222222222222|26.129032258064516|     green|         4.8|6.378293191334128|
|   [0.45,0.0175,1.0]|            1.0|         0.45|       0.0|              63|              0.0175| 25.71428571428571|    yellow|         4.3|6.378293191334128|
|[0.45,0.018333333...|            1.0|         0.45|       0.0|              66|0.018333333333333333|24.545454545454547|     green|         4.3|6.378293191334128|
|[0.45,0.018333333...|

In [0]:
# Instantiate a RegressionEvaluator with rmse as the metric
dtr_test_evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
# Retrieve the rmse of the predictions made on the test set
test_rmse = dtr_test_evaluator.evaluate(dtr_model_test_preds)
print(test_rmse)

5.2195066182423275


In [0]:
## Predictions

In [0]:
# Creating an empty list called stages
stages = []

In [0]:
# Dropping rows with NA values in the desired features
test_data = test_data.na.drop(subset=["trip_distance", "duration_hours", "taxi_color"])

In [0]:
# Import OneHotEncoder, StringIndexer, VectorAssembler from pyspark.ml.feature 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
# Instantiate StringIndexer and OneHotEncoder for taxi_color variable and add them to stages
col_indexer = StringIndexer(inputCol="taxi_color", outputCol="taxi_color_ind")
col_encoder = OneHotEncoder(inputCols=["taxi_color_ind"], outputCols=["taxi_color_ohe"])
stages += [col_indexer, col_encoder]

In [0]:
# Importing VectorAssembler
from pyspark.ml.feature import VectorAssembler
# Instantiate a VectorAssembler with the desired features
vector_assembler = VectorAssembler(inputCols = ["trip_distance", "duration_hours", "taxi_color_ohe"], outputCol = "features")

In [0]:
# Add vector_assembler to stages
stages += [vector_assembler]

In [0]:
# Import Pipeline from pyspark.ml
from pyspark.ml import Pipeline
# Instantiate a Pipeline with stages
pipeline = Pipeline(stages=stages)

In [0]:
# Fit the pipeline test_data
pipeline_model = pipeline.fit(test_data)

In [0]:
# Select the features column, target variable and all the rest of the variables
test_data = test_data.select(['features'] + cols_list)
test_data.show()

+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+
|            features|passenger_count|trip_distance|tip_amount|duration_seconds|      duration_hours|         speed_mph|taxi_color|total_amount|
+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+
|[0.45,0.017222222...|            1.0|         0.45|       0.0|              62|0.017222222222222222|26.129032258064516|     green|         4.8|
|   [0.45,0.0175,1.0]|            1.0|         0.45|       0.0|              63|              0.0175| 25.71428571428571|    yellow|         4.3|
|[0.45,0.018333333...|            1.0|         0.45|       0.0|              66|0.018333333333333333|24.545454545454547|     green|         4.3|
|[0.45,0.018333333...|            1.0|         0.45|       0.0|              66|0.018333333333333333|24.545454545454547|    yellow

In [0]:
# Use the trained decision tree regression model to make predictions on the test_data
dtr_model_test_data_preds = dtr_model.transform(test_data)
# Print the first 5 rows of the training set with the predictions
dtr_model_test_data_preds.show(5)

+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|            features|passenger_count|trip_distance|tip_amount|duration_seconds|      duration_hours|         speed_mph|taxi_color|total_amount|       prediction|
+--------------------+---------------+-------------+----------+----------------+--------------------+------------------+----------+------------+-----------------+
|[0.45,0.017222222...|            1.0|         0.45|       0.0|              62|0.017222222222222222|26.129032258064516|     green|         4.8|6.378293191334128|
|   [0.45,0.0175,1.0]|            1.0|         0.45|       0.0|              63|              0.0175| 25.71428571428571|    yellow|         4.3|6.378293191334128|
|[0.45,0.018333333...|            1.0|         0.45|       0.0|              66|0.018333333333333333|24.545454545454547|     green|         4.3|6.378293191334128|
|[0.45,0.018333333...|

In [0]:
# Instantiate a RegressionEvaluator with rmse as the metric
dtr_test_evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
# Retrieve the rmse of the predictions made on the test_data
test_data_rmse = dtr_test_evaluator.evaluate(dtr_model_test_data_preds)


In [0]:
print(test_data_rmse)

5.2195066182423275
