In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.sql import functions as fn
from pyspark.ml import feature, regression, evaluation, Pipeline
import seaborn as sns
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import  StringIndexer, VectorAssembler
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
# Do not delete or change this cell

import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

In [3]:
#Loading Train Data

us_train = spark.read.csv(get_training_filename('USAccident_balanced_train_categorical_OHE.csv'), header = True, inferSchema = True)

In [4]:
#Loading Test Data

us_test = spark.read.csv(get_training_filename('USAccident_validation_OHE.csv'), header = True, inferSchema = True)

In [5]:
us_train=us_train.drop('hour_day', 'week_of_year')

In [6]:
# Checking the balance of data in training dataset

us_train.groupBy('Severity').count().show()

+--------+------+
|Severity| count|
+--------+------+
|       3|234316|
|       4|219321|
|       2|263797|
+--------+------+



In [7]:
# Checking the balance of data in testing dataset

us_test.groupBy('Severity').count().show()

+--------+------+
|Severity| count|
+--------+------+
|       3| 58617|
|       4|  5993|
|       2|131790|
+--------+------+



In [8]:
# Assigning label 0 to severity 2 label for test dataset

us_test=us_test.withColumn("Severity",when(us_test["Severity"]==2,0).otherwise(us_test["Severity"]))

In [9]:
# Assigning label 0 to severity 2 label for train dataset

us_train=us_train.withColumn("Severity",when(us_train["Severity"]==2,0).otherwise(us_train["Severity"]))

In [10]:
# Assigning label 1 to severity 3 label for test dataset

us_test=us_test.withColumn("Severity",when(us_test["Severity"]==3,1).otherwise(us_test["Severity"]))

In [11]:
# Assigning label 1 to severity 3 label for train dataset

us_train=us_train.withColumn("Severity",when(us_train["Severity"]==3,1).otherwise(us_train["Severity"]))

In [12]:
# Assigning label 2 to severity 4 label for test dataset

us_test=us_test.withColumn("Severity",when(us_test["Severity"]==4,2).otherwise(us_test["Severity"]))

In [13]:
# Assigning label 2 to severity 4 label for train dataset

us_train=us_train.withColumn("Severity",when(us_train["Severity"]==4,2).otherwise(us_train["Severity"]))

In [14]:
# Vector Assembler to convert all features except Severity to a single column features for feeding it to input of model

va = VectorAssembler().setInputCols([i for i in us_train.columns if i!='Severity']).setOutputCol('features')

In [15]:
# Standard Scaler to standardize data for the Logistic Regression

center = feature.StandardScaler(withMean=True, withStd=False, inputCol='features', outputCol='centered_features')

In [16]:
# String Indexer to assign target Variable Severity name Label needed for the model to predict

label_stringIdx = StringIndexer(inputCol="Severity", outputCol="label")

# Logistic Regression Multiclass Base Model

In [17]:
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="centered_features")

# LR model pipeline 

lrModel = Pipeline(stages=[label_stringIdx,va, center, lr])

# Fir the training data using the LR model 

lr_fit = lrModel.fit(us_train)

In [18]:
# Evaluator for Evaluating the model performance

evaluator_mul = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [19]:
# Accuracy calculation for the model on test data

print("Accuracy is",evaluator_mul.evaluate(lr_fit.transform(us_test)))

Accuracy is 0.5682993890020367


In [20]:
# Prediction output from the model to pandas

prediction_lrm=(lr_fit.transform(us_test)).toPandas()["prediction"]

In [21]:
# True Labels from test data for Target Variable

true_labels=us_test.toPandas()["Severity"]

In [22]:
# Initializing Classification Report from sklearn

from sklearn.metrics import classification_report

In [23]:
# Classification Report Generation for all metrics display at once

print(classification_report(y_pred=prediction_lrm,y_true=true_labels))

              precision    recall  f1-score   support

           0       0.89      0.48      0.62    131790
           1       0.52      0.74      0.61     58617
           2       0.13      0.92      0.23      5993

   micro avg       0.57      0.57      0.57    196400
   macro avg       0.51      0.71      0.49    196400
weighted avg       0.75      0.57      0.60    196400



In [24]:
# Weights/coefficients for All variables assigned by LR Model 

coef_L1_mul=lr_fit.stages[-1].coefficientMatrix.toArray()

In [27]:
# Combining the 3 arrays of coefficient matrix to 1 array

coeft_L1_mb = np.squeeze(coef_L1_mul)

In [28]:
# Extract 1st array of coefficients with features equal to number of columns

coef_one_b = coeft_L1_mb[:][0]

In [29]:
# Extract 2nd array of coefficients with features equal to number of columns

coef_two_b = coeft_L1_mb[:][1]

In [30]:
# Extract 3rd array of coefficients with features equal to number of columns

coef_three_b = coeft_L1_mb[:][2]

# Number of Features Eliminated by L1 Regularization for Base Model

In [31]:
# Taking the absolute value of the weights and calculating how many features were eliminated by the model for each class each array

coef_one_b = np.absolute(coef_one_b)
coef_two_b = np.absolute(coef_two_b)
coef_three_b = np.absolute(coef_three_b)

print('Total number of features for 1st class are',len(coef_one_b))
print('Total number of features for 2nd class are',len(coef_two_b))
print('Total number of features for 3rd class are',len(coef_three_b))

sorted_abs_1 = np.sort(coef_one_b)
sorted_abs_2 = np.sort(coef_two_b)
sorted_abs_3 = np.sort(coef_three_b)

weights_notzero_1 = sorted_abs_1[sorted_abs_1 == 0]
nonzero_weights_1 = len(sorted_abs_1[sorted_abs_1 == 0])

weights_notzero_2 = sorted_abs_2[sorted_abs_2 == 0]
nonzero_weights_2 = len(sorted_abs_2[sorted_abs_2 == 0])

weights_notzero_3 = sorted_abs_3[sorted_abs_3 == 0]
nonzero_weights_3 = len(sorted_abs_3[sorted_abs_3 == 0])

print('Eliminated features for 1st class out of ' + str(len(coef_one_b)) +' are', nonzero_weights_1)
print('Eliminated features for 2nd class out of ' + str(len(coef_two_b)) +' are', nonzero_weights_2)
print('Eliminated features for 3rd class out of ' + str(len(coef_three_b)) +' are', nonzero_weights_3)

Total number of features for 1st class are 120
Total number of features for 2nd class are 120
Total number of features for 3rd class are 120
Eliminated features for 1st class out of 120 are 0
Eliminated features for 2nd class out of 120 are 0
Eliminated features for 3rd class out of 120 are 0


# LR Multiclass Grid Search Model 

In [34]:
# Logistic Regression Pipeline initialization

lr_new = LogisticRegression(labelCol="label", featuresCol="centered_features")

In [35]:
# Grid Search for tuning the hyper parameters of Logistic Regression Model

paramGrid_lr = ParamGridBuilder().addGrid(lr_new.regParam, [0.01, 0.04,0.07]).addGrid(lr_new.elasticNetParam, [0.2,0.5,0.8]).build()

In [36]:
# Creating pipeline to be used for fitting the training data

cvModel_lrmu = Pipeline(stages=[label_stringIdx,va,center,lr_new])

In [37]:
# Initializing Multiclass Evaluator for evaluating the model performance

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_mul = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [38]:
# Cross validator pipeline initialization for 5-fold cross validation and fitting the train data

cv = CrossValidator(estimator=cvModel_lrmu, estimatorParamMaps=paramGrid_lr, evaluator=evaluator_mul, numFolds=5,seed=42).fit(us_train)

In [39]:
# Best Model Hyper Parameters 

cv.bestModel.stages[-1].extractParamMap()

{Param(parent='LogisticRegression_df8793b0c6b3', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_df8793b0c6b3', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.2,
 Param(parent='LogisticRegression_df8793b0c6b3', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_df8793b0c6b3', name='featuresCol', doc='features column name'): 'centered_features',
 Param(parent='LogisticRegression_df8793b0c6b3', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_df8793b0c6b3', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_df8793b0c6b3', name='maxIter', doc='maximum number of iterations (

In [40]:
# Accuracy of the model on the testing data

print("Accuracy is",evaluator_mul.evaluate(cv.bestModel.transform(us_test)))

Accuracy is 0.5435234215885947


In [41]:
# Coefficient matrix from Logistic Regression for each variable weight

coeft_L1_m=cv.bestModel.stages[-1].coefficientMatrix.toArray()

In [42]:
# Combining the 3 arrays of coefficient matrix to 1 array

coeft_L1_m = np.squeeze(coeft_L1_m)

In [43]:
#Extract 1st array of coefficients with features equal to number of columns

coef_one = coeft_L1_m[:][0]

In [44]:
# Extract 2nd array of coefficients with features equal to number of columns

coef_two = coeft_L1_m[:][1]

In [45]:
# Extract 3rd array of coefficients with features equal to number of columns

coef_three = coeft_L1_m[:][2]

# Number of Features Eliminated by L1 Regularization for Grid Model

In [46]:
# Taking the absolute value of the weights and calculating how many features were eliminated by the model for each class each array

coef_one = np.absolute(coef_one)
coef_two = np.absolute(coef_two)
coef_three = np.absolute(coef_three)

print('Total number of features for 1st class are',len(coef_one))
print('Total number of features for 2nd class are',len(coef_two))
print('Total number of features for 3rd class are',len(coef_three))

sorted_abs_1 = np.sort(coef_one)
sorted_abs_2 = np.sort(coef_two)
sorted_abs_3 = np.sort(coef_three)

weights_notzero_1 = sorted_abs_1[sorted_abs_1 == 0]
nonzero_weights_1 = len(sorted_abs_1[sorted_abs_1 == 0])

weights_notzero_2 = sorted_abs_2[sorted_abs_2 == 0]
nonzero_weights_2 = len(sorted_abs_2[sorted_abs_2 == 0])

weights_notzero_3 = sorted_abs_3[sorted_abs_3 == 0]
nonzero_weights_3 = len(sorted_abs_3[sorted_abs_3 == 0])

print('Eliminated features for 1st class out of ' + str(len(coef_one)) +' are', len(weights_notzero_1))
print('Eliminated features for 2nd class out of ' + str(len(coef_two)) +' are', len(weights_notzero_2))
print('Eliminated features for 3rd class out of ' + str(len(coef_three)) +' are', len(weights_notzero_3))

Total number of features for 1st class are 120
Total number of features for 2nd class are 120
Total number of features for 3rd class are 120
Eliminated features for 1st class out of 120 are 62
Eliminated features for 2nd class out of 120 are 84
Eliminated features for 3rd class out of 120 are 65


In [47]:
# Prediction output from the model to pandas

prediction_lrmt=(cv.bestModel.transform(us_test)).toPandas()["prediction"]

In [48]:
# True Labels from test data for Target Variable

true_labels=us_test.toPandas()["Severity"]

In [49]:
# Initializing Classification Report from sklearn

from sklearn.metrics import classification_report

In [50]:
# Classification Report Generation for all metrics display at once

print(classification_report(y_pred=prediction_lrmt,y_true=true_labels))

              precision    recall  f1-score   support

           0       0.90      0.43      0.58    131790
           1       0.51      0.76      0.61     58617
           2       0.12      0.95      0.22      5993

   micro avg       0.54      0.54      0.54    196400
   macro avg       0.51      0.71      0.47    196400
weighted avg       0.76      0.54      0.58    196400



In [51]:
# Pandas dataframe of weights of variables with variable names to find which variables are eliminated for 3rd class for Grid

feat_imp_tuned_lrt3 = pd.DataFrame(list(zip([i for i in us_train.columns if i!='Severity'], coef_three)),
            columns = ['column', 'weight']).sort_values('weight')

In [52]:
# Sample of 10 features eliminated by the Logistic Regression Model after L1 Regularization for class 3

feat_imp_tuned_lrt3[:10]

Unnamed: 0,column,weight
59,month_of_year_Index_3,0.0
80,Wind_Direction_Index_16,0.0
79,Wind_Direction_Index_9,0.0
78,Wind_Direction_Index_15,0.0
77,Wind_Direction_Index_5,0.0
76,Wind_Direction_Index_6,0.0
75,Wind_Direction_Index_13,0.0
74,Wind_Direction_Index_10,0.0
69,Wind_Direction_Index_11,0.0
81,Wind_Direction_Index_12,0.0
