In [1]:
import findspark
findspark.init('E:\spark')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ModelSelection').getOrCreate()

## 3.5 Data Formatting

In [2]:
# Importing data which has a header. Schema is automatically configured.
la_traffic_df = spark.read.csv('LA_Accidents_preprocessed.csv', header=True, inferSchema=True)
# Let's see the data. You'll notice nulls.
la_traffic_df.head()

Row(Severity=2, Wind_Direction='V', Weather_Condition='Clear', Amenity=False, Bump=False, Crossing=False, Give_Way=False, Junction=True, No_Exit=False, Railway=False, Roundabout=False, Station=False, Stop=False, Traffic_Calming=False, Traffic_Signal=False, Turning_Loop=False, Sunrise_Sunset='Day', Civil_Twilight='Day', Nautical_Twilight='Day', Astronomical_Twilight='Day', Distance(mi)=0.0, Temperature(F)=82.9, Wind_Chill(F)=64.92501173068268, Humidity(%)=47.0, Pressure(in)=29.95, Visibility(mi)=10.0, Wind_Speed(mph)=4.6, Precipitation(in)=0.0034345074298293894, Weekday=1, Hour=10)

In [3]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier,RandomForestClassifier,NaiveBayes,MultilayerPerceptronClassifier
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, roc_auc_score
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, ChiSqSelector, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import when, log10
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
# One hot encoding
la_traffic_pd = la_traffic_df.toPandas()

# feature encoding 
encoded_df = la_traffic_pd.copy()
encoded_df['Severity_encoded'] = encoded_df['Severity'].astype(int)
encoded_df
encoded_df = encoded_df.drop("Severity",axis=1)

In [5]:
encoded_df = pd.get_dummies(encoded_df, columns=[
    "Wind_Direction", "Weather_Condition","Sunrise_Sunset",
    "Civil_Twilight","Nautical_Twilight","Astronomical_Twilight"])

# encoded_df['Distance(mi)'] = encoded_df['Distance(mi)'].apply(
#     lambda x: x if x > 0 else 0.003)
# encoded_df['log_Distance(mi)'] = np.log10(encoded_df['Distance(mi)'])

columns = encoded_df.columns.tolist()

In [6]:
print(columns)

['Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Distance(mi)', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weekday', 'Hour', 'Severity_encoded', 'Wind_Direction_CALM', 'Wind_Direction_E', 'Wind_Direction_ENE', 'Wind_Direction_ESE', 'Wind_Direction_N', 'Wind_Direction_NE', 'Wind_Direction_NNE', 'Wind_Direction_NNW', 'Wind_Direction_NW', 'Wind_Direction_S', 'Wind_Direction_SE', 'Wind_Direction_SSE', 'Wind_Direction_SSW', 'Wind_Direction_SW', 'Wind_Direction_V', 'Wind_Direction_VAR', 'Wind_Direction_W', 'Wind_Direction_WNW', 'Wind_Direction_WSW', 'Weather_Condition_Blowing Dust', 'Weather_Condition_Clear', 'Weather_Condition_Cloudy', 'Weather_Condition_Cloudy / Windy', 'Weather_Condition_Drizzle', 'Weather_Condition_Duststorm', 'Weather_Condition_Fair', 'Weather_Condition_Fair / Windy', 'Weathe

## 4 Data Transformation

In [7]:
# Select relevant columns
import copy

la_traffic_df = spark.createDataFrame(encoded_df)
# columns.pop("Severity_encoded")
selected_columns = copy.deepcopy(columns)
selected_columns.remove("Severity_encoded")

# Assemble feature vectors
assembler = VectorAssembler(inputCols=selected_columns, outputCol="features",handleInvalid="keep")
la_traffic_df = assembler.transform(la_traffic_df)

# Perform feature selection using ChiSqSelector
selector = ChiSqSelector(numTopFeatures=60,featuresCol="features", outputCol="selected_features", labelCol="Severity_encoded")
model = selector.fit(la_traffic_df)
la_traffic_df = model.transform(la_traffic_df)

# show the name of selected features
selected_features = model.selectedFeatures
selected_feature_names = [selected_columns[i] for i in selected_features]
print("Selected Feature Names:", selected_feature_names)

Selected Feature Names: ['Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'Railway', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Distance(mi)', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weekday', 'Hour', 'Wind_Direction_CALM', 'Wind_Direction_E', 'Wind_Direction_ENE', 'Wind_Direction_N', 'Wind_Direction_NE', 'Wind_Direction_NNE', 'Wind_Direction_NW', 'Wind_Direction_S', 'Wind_Direction_SSE', 'Wind_Direction_SSW', 'Wind_Direction_SW', 'Wind_Direction_V', 'Wind_Direction_VAR', 'Wind_Direction_W', 'Wind_Direction_WNW', 'Wind_Direction_WSW', 'Weather_Condition_Clear', 'Weather_Condition_Cloudy', 'Weather_Condition_Fair', 'Weather_Condition_Fair / Windy', 'Weather_Condition_Fog', 'Weather_Condition_Haze', 'Weather_Condition_Heavy Rain', 'Weather_Condition_Light Rain', 'Weather_Condition_Mostly Cloudy', 'Weather_Condition_Overcast', 'Weather_Condition_Partly Cloudy', 'Weather_Condition_Partl

## 6.1 Conduct exploratory analysis and discuss

### 6.1.1 Logistic Regression

In [8]:
la_traffic_df.head()

Row(Amenity=False, Bump=False, Crossing=False, Give_Way=False, Junction=True, No_Exit=False, Railway=False, Roundabout=False, Station=False, Stop=False, Traffic_Calming=False, Traffic_Signal=False, Turning_Loop=False, Distance(mi)=0.0, Temperature(F)=82.9, Wind_Chill(F)=64.92501173068268, Humidity(%)=47.0, Pressure(in)=29.95, Visibility(mi)=10.0, Wind_Speed(mph)=4.6, Precipitation(in)=0.0034345074298293894, Weekday=1, Hour=10, Severity_encoded=2, Wind_Direction_CALM=0, Wind_Direction_E=0, Wind_Direction_ENE=0, Wind_Direction_ESE=0, Wind_Direction_N=0, Wind_Direction_NE=0, Wind_Direction_NNE=0, Wind_Direction_NNW=0, Wind_Direction_NW=0, Wind_Direction_S=0, Wind_Direction_SE=0, Wind_Direction_SSE=0, Wind_Direction_SSW=0, Wind_Direction_SW=0, Wind_Direction_V=1, Wind_Direction_VAR=0, Wind_Direction_W=0, Wind_Direction_WNW=0, Wind_Direction_WSW=0, Weather_Condition_Blowing Dust=0, Weather_Condition_Clear=1, Weather_Condition_Cloudy=0, Weather_Condition_Cloudy / Windy=0, Weather_Condition_D

In [9]:
(trainingData, testData) = la_traffic_df.randomSplit([0.9, 0.1])

# Create a LogisticRegression classifier
lr = LogisticRegression(featuresCol="selected_features", labelCol="Severity_encoded", maxIter=500)

# Train the logistic regression model
lr_model = lr.fit(trainingData)

# Make predictions on the test data
test_predictions = lr_model.transform(testData)
train_predictions = lr_model.transform(trainingData)


# Evaluate the model on the training data
train_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
train_accuracy = train_evaluator.evaluate(train_predictions)
print(f"Training set accuracy = {train_accuracy}")

# Evaluate the model on the test data
test_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
test_accuracy = test_evaluator.evaluate(test_predictions)
print(f"Test set accuracy = {test_accuracy}")


# Print the confusion matrix on training set
train_confusion_matrix = train_predictions.crosstab("Severity_encoded", "prediction")
train_confusion_matrix.show()

# Print the confusion matrix on test set
test_confusion_matrix = test_predictions.crosstab("Severity_encoded", "prediction")
test_confusion_matrix.show()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "E:\spark\python\lib\py4j-0.10.9.3-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "E:\spark\python\lib\py4j-0.10.9.3-src.zip\py4j\clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "E:\anaconda\envs\python39\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# Show the model coefficients and intercept
coefficients = lr_model.coefficientMatrix.toArray()
intercept = lr_model.interceptVector 
print("Coefficients: ", coefficients)
print("Intercept: ", intercept)

# Show the model equation
feature_columns = selected_feature_names

for i,coefficient in enumerate(coefficients):
    equation = f"Logit(P(y={i})) = {intercept}"
    for i, coef in enumerate(coefficient):
        equation += f" + ({coef:.4f} * {feature_columns[i]})"
    print("Model Equation: ", equation)

### 6.1.2 Decision Tree and Random Forest

In [None]:

# Split the data into training and test sets
(trainingData, testData) = la_traffic_df.randomSplit([0.9, 0.1])

# Create a DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="selected_features", labelCol="Severity_encoded")

# Create a pipeline
pipeline = Pipeline(stages=[dt])

# Train the Decision Tree model
dt_model = pipeline.fit(trainingData)

# Make predictions on the test data
test_predictions = dt_model.transform(testData)
train_predictions = dt_model.transform(trainingData)

# Evaluate the model on the training data
train_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
train_accuracy = train_evaluator.evaluate(train_predictions)
print(f"Training set accuracy = {train_accuracy}")

# Evaluate the model on the test data
test_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
test_accuracy = test_evaluator.evaluate(test_predictions)
print(f"Test set accuracy = {test_accuracy}")

# Print the confusion matrix on training set
train_confusion_matrix = train_predictions.crosstab("Severity_encoded", "prediction")
train_confusion_matrix.show()

# Print the confusion matrix on test set
test_confusion_matrix = test_predictions.crosstab("Severity_encoded", "prediction")
test_confusion_matrix.show()

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Split the data into training and test sets
(trainingData, testData) = la_traffic_df.randomSplit([0.9, 0.1])

# Create a RandomForestClassifier
rf = RandomForestClassifier(featuresCol="selected_features", labelCol="Severity_encoded")

# Create a pipeline
pipeline = Pipeline(stages=[rf])

# Train the Random Forest model
rf_model = pipeline.fit(trainingData)

# Make predictions on the test data
test_predictions = rf_model.transform(testData)
train_predictions = rf_model.transform(trainingData)

# Evaluate the model on the training data
train_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
train_accuracy = train_evaluator.evaluate(train_predictions)
print(f"Training set accuracy = {train_accuracy}")

# Evaluate the model on the test data
test_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
test_accuracy = test_evaluator.evaluate(test_predictions)
print(f"Test set accuracy = {test_accuracy}")

# Print the confusion matrix on training set
train_confusion_matrix = train_predictions.crosstab("Severity_encoded", "prediction")
train_confusion_matrix.show()

# Print the confusion matrix on test set
test_confusion_matrix = test_predictions.crosstab("Severity_encoded", "prediction")
test_confusion_matrix.show()

### 6.1.4 Multinomial Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Split the data into training and test sets
(trainingData, testData) = la_traffic_df.randomSplit([0.9, 0.1])

# Create a NaiveBayes classifier
nb = NaiveBayes(featuresCol="selected_features", labelCol="Severity_encoded")

# Create a pipeline
pipeline = Pipeline(stages=[nb])

# Train the Naive Bayes model
nb_model = pipeline.fit(trainingData)

# Make predictions on the test data
test_predictions = nb_model.transform(testData)
train_predictions = nb_model.transform(trainingData)

# Evaluate the model on the training data
train_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
train_accuracy = train_evaluator.evaluate(train_predictions)
print(f"Training set accuracy = {train_accuracy}")

# Evaluate the model on the test data
test_evaluator = MulticlassClassificationEvaluator(labelCol="Severity_encoded", predictionCol="prediction", metricName="accuracy")
test_accuracy = test_evaluator.evaluate(test_predictions)
print(f"Test set accuracy = {test_accuracy}")

# Print the confusion matrix on training set
train_confusion_matrix = train_predictions.crosstab("Severity_encoded", "prediction")
train_confusion_matrix.show()

# Print the confusion matrix on test set
test_confusion_matrix = test_predictions.crosstab("Severity_encoded", "prediction")
test_confusion_matrix.show()