##### Create Spark Session

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('PROJSVC').getOrCreate()

In [0]:
df = spark.read.csv('/FileStore/tables/collision_project.csv',inferSchema=True,header=True)

#### Data pre-processing

In [0]:
# Import the required libraries
import pyspark.sql.functions as f

In [0]:
# Drop Date Column from dataset
data = spark.sql("select AVE_HUMUDITY,AVE_PRESSURE,AVE_TEMP,AVE_WIND_SPEED,DAY_OF_WEEK,COLLISION from collision")

data.show()

In [0]:
# Checking missing values in the dataset
spark.sql("select * from collision WHERE AVE_HUMUDITY IS NULL \
OR AVE_PRESSURE IS NULL \
OR AVE_TEMP IS NULL \
OR AVE_WIND_SPEED IS NULL \
OR DAY_OF_WEEK IS NULL").show()

In [0]:
data.count()

In [0]:
# Create a 70-30 train test split

train_data,test_data=data.randomSplit([0.7,0.3], seed=2020)

#### Data Pipeline for Model Building

In [0]:
# Import the required libraries for Pipeline

from pyspark.ml.feature import VectorAssembler,StringIndexer,StandardScaler
from pyspark.ml import Pipeline

In [0]:
# Use StringIndexer to convert the categorical columns to hold numerical data

day_of_week_indexer = StringIndexer(inputCol='DAY_OF_WEEK',outputCol='day_of_week_index',handleInvalid='keep')

In [0]:
# Vector assembler is used to create a vector of input features

assembler = VectorAssembler(inputCols=['AVE_HUMUDITY','AVE_PRESSURE','AVE_TEMP',
                                       'AVE_WIND_SPEED','day_of_week_index'],
                            outputCol="unscaled_features")

In [0]:
# Standard scaler is used to scale the data for the linear regression to perform well on the training data

scaler = StandardScaler(inputCol="unscaled_features",outputCol="features")

In [0]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe = Pipeline(stages=[day_of_week_indexer,assembler,scaler])

In [0]:
# Fit Train Data
fitted_pipe=pipe.fit(train_data)

In [0]:
# Transform Train Data
train_data=fitted_pipe.transform(train_data)

In [0]:
train_data.show()

In [0]:
# Transform the test data using the model to predict the duration

test_data=fitted_pipe.transform(test_data)

#### ---------------------------------------------------------------------------------------------------------------------------------------------

#### Model#1: Linear Regression Model

###### Model#1: Building Linear Regression Model

In [0]:
# Import the required libraries for LR

from pyspark.ml.regression import LinearRegression

In [0]:
# Create an object for the Linear Regression model

lr_model = LinearRegression(labelCol='COLLISION')

In [0]:
# Fit the model on the train data

fit_model = lr_model.fit(train_data.select(['features','COLLISION']))

In [0]:
# Store the results in a dataframe

results = fit_model.transform(test_data)

In [0]:
# Result Display
results.select(['COLLISION','prediction']).show()

###### Model#1: Evaluating the Linear Regression Model

In [0]:
test_results = fit_model.evaluate(test_data)

In [0]:
# Display Residual
test_results.residuals.show()

In [0]:
# Display RMSE
rmse_lr = test_results.rootMeanSquaredError
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_lr)

The root mean squared error is very high indicating that the models prediction is on the poorer side

In [0]:
# Display R2
test_results.r2

The r-squared value implies that the model explains only about 7% variance

#### Model#2: RandomForest Regression Model

###### Model#2: Building RandomForest Regression Model

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
rf = RandomForestRegressor(labelCol="COLLISION",featuresCol="features")

In [0]:
rf_model = rf.fit(train_data)

In [0]:
predictions = rf_model.transform(test_data)

In [0]:
predictions.select("COLLISION","prediction").show()

###### Model#2: Evaluating RandomForest Regression Model

In [0]:
evaluator = RegressionEvaluator(labelCol="COLLISION", predictionCol="prediction", metricName="rmse")

In [0]:
rmse_rf = evaluator.evaluate(predictions)

In [0]:
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_rf)

#### Model#3: Decision Tree Regression Model

###### Model#3: Building DecisionTree Regression Model

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor

In [0]:
dt = DecisionTreeRegressor(labelCol="COLLISION",featuresCol="features")

In [0]:
train_data.show()

In [0]:
dt_model = dt.fit(train_data)

In [0]:
dt_predictions = dt_model.transform(test_data)

In [0]:
dt_predictions.select("COLLISION", "prediction").show()

###### Model#3: Evaluating DecisionTree Regression Model

In [0]:
evaluator = RegressionEvaluator(labelCol="COLLISION", predictionCol="prediction", metricName="rmse")
rmse_dt = evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_dt)

#### Model#4: Gradient-Boosted Tree Regression Model

###### Model#4: Building Gradient-Boosted Tree Regression Model

In [0]:
from pyspark.ml.regression import GBTRegressor

In [0]:
gbt = GBTRegressor(labelCol="COLLISION",featuresCol="features", maxIter=10)

In [0]:
gtb_model = gbt.fit(train_data)

In [0]:
gtb_predictions = gtb_model.transform(test_data)

In [0]:
gtb_predictions.select("COLLISION", "prediction").show()

###### Model#4: Evaluating Gradient-Boosted Tree Regression Model

In [0]:
evaluator = RegressionEvaluator(labelCol="COLLISION", predictionCol="prediction", metricName="rmse")
rmse_gtb = evaluator.evaluate(gtb_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_gtb)

#### Model Summary:

In [0]:
print("Model#1: Linear Regression Model: Root Mean Squared Error (RMSE) on test data = %g" % rmse_lr)
print("Model#2: RandomForest Regression Model: Root Mean Squared Error (RMSE) on test data = %g" % rmse_rf)
print("Model#3: DecisionTree Regression Model: Root Mean Squared Error (RMSE) on test data = %g" % rmse_dt)
print("Model#4: Gradient-Boosted Tree Regression Model: Root Mean Squared Error (RMSE) on test data = %g" % rmse_gtb)