# Crashlytics

In this notebook we investigate [US Car Accident Data](https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents). 

Our goal is to create a classifier that can predict the severity of a car accident given the other features in the dataset. We will use three methods of classification (Decision Trees, XGBoost, and a Neural Network) and compare the results.


## Update to project goal


Severity is defined rather cryptically in this dataset. According to the [original paper](https://arxiv.org/pdf/1906.05409.pdf) detailing the creation of the dataset, crash severity is a native feature of the data, which was collected from both Bing and MapQuest. Bing and MapQuest use different scales for severity, [1,4] for Bing and [0,4] for MapQuest, the similarity between the two ranking systems being that a larger number indicates a more severe accident. The only insight into the meaning of accident severity is that it is typically correlated with duration, distance, and delay where:
- **Duration**: how long was traffic impacted by the accident
- **Distance**: how far did the impact on traffic extend
- **Delay**: how much delay in traffic flow was caused

However, these three features are very poorly recorded in the dataset. In the majority of records, crash duration is recorded as either 30 or 45 minutes and crash distance is recorded as 0 miles or approximately 0 miles. Delay is nowhere to be found. So it is impossible to determine whether severity is actually determined by some combination of duration, distance, and delay.

Though the original intent of this project was to create a car accident severity predictor that could predict the severity of an accident given the circumstances the accident occured under, the classifiers are actually predicting how either Bing or MapQuest would classify the severity of an accident given the less-than-ideal data those services either generate or receive.

## Import/install dependencies, create SparkSession, and read in data

In [None]:
try:
    import pyspark
except ModuleNotFoundError:
    !pip3 install pyspark
    import pyspark
try:
    import pandas as pd
except ModuleNotFoundError:
    !pip3 install pandas
    import pandas as pd
    import csv
try:
    import matplotlib.pyplot as plt
except ModuleNotFoundError:
    !pip3 install matplotlib
    import matplotlib.pyplot as plt

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, MultilayerPerceptronClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, IndexToString, PCA, StandardScaler, Tokenizer, Word2Vec
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [None]:
try: 
    import sklearn
except:
    !pip3 install scikit-learn
    import sklearn
try:
    from xgboost import XGBClassifier
    from xgboost.spark import SparkXGBClassifier
except ModuleNotFoundError:
    !pip3 install xgboost
    from xgboost import XGBClassifier
    from xgboost.spark import SparkXGBClassifier

In [None]:
#May need to install java for this to work
ss=SparkSession.builder.master("local").appName("crashlytics").getOrCreate()

In [None]:
rawDf = ss.read.csv("reduced_crash_data.csv", header=True, inferSchema=True)
#rawDf.printSchema()

## Imbalance in Data

One important characteristic of our dataset is that a crash severity of 2 or 3 is much more common than any other severity ranking, which we will have to consider when building and evaluating our classifiers.

In [None]:
severityCount = rawDf.groupBy(F.col("Severity")).count()
#severityCount.show()

extremeSeverities = rawDf.filter(F.col("Severity") != 3).filter(F.col("Severity") != 2)
#extremeSeverities.show()

## Data preprocessing

There are a number of steps we need to take to prepare our data for KMeans Clustering and PCA and for use by the various classifiers.

First, we will add a crash duration column derived from the start_time and end_time columns to give our various models a data type they can work with (double) rather than a time. This also allows us to drop the start_time and end_time columns.

In [None]:
cleanedDf = rawDf.withColumn("Duration", (F.col("End_Time").cast("long") - F.col("Start_Time").cast("long"))/60)
cleanedDf = cleanedDf.drop("Start_Time").drop("End_Time")

We will also adjust the range of severity values from [1,4] to [0,3], as some of the models we will be using require that the label values start at 0.

In [None]:
cleanedDf = cleanedDf.withColumn("Severity", F.col("Severity")-1)

Convert categorical string columns into integer indices so that they can be ingested by the models.

In [None]:
labelIndexer = StringIndexer(inputCols = ["State", "Sunrise_Sunset"], outputCols = ["StateId", "DaytimeId"]).fit(rawDf)
cleanedDf = labelIndexer.transform(cleanedDf)

We drop non-categorical string columns that cannot be easily indexed and we do not expect to relevant to determining crash severity.

In [None]:

cleanedDf = cleanedDf.drop("ID", "Airport_Code", "Zipcode", "Source", "Start_Time", "End_Time", "End_Lat", "End_Lng", "City", "County", "Zipcode", "Country", "Timezone", \
                         "Weather_Timestamp", "Wind_Direction", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight", "Turning_Loop", "State", "Street", "Sunrise_Sunset")

For string columns that we may want to keep, we can try using a method to convert the values into vectors that still retain some of the semantic information of the values (by magic). 

The model we tried using for this conversion is called Word2Vec, but, though it succeeded, it increased the dimensionality of our data drastically. We think the increased dimensionality is too large a price to pay for **potentially** retaining some data about the weather during an accident and the description of the accident, which may not even be useful in our classification models.

In [None]:
vectorizeStrings = False

if vectorizeStrings:
    descTokenizer = Tokenizer(inputCol="Description", outputCol="descWords")
    tokenizedDescDf = descTokenizer.transform(cleanedDf)
    descVectorizer = Word2Vec(vectorSize=5, minCount=1, inputCol="descWords", outputCol="descWordsVec")
    descModel = descVectorizer.fit(tokenizedDescDf)
    vectorizedDescriptionDf = descModel.transform(tokenizedDescDf)

    tokenizer_weather = Tokenizer(inputCol="Weather_Condition", outputCol="weatherWords")
    vectorizedDescriptionDf = vectorizedDescriptionDf.fillna({"Weather_Condition": ""})
    tokenizedWeatherDf = tokenizer_weather.transform(vectorizedDescriptionDf)
    weatherVectorizer = Word2Vec(vectorSize=3, minCount=1, inputCol="weatherWords", outputCol="weatherWordsVec")
    weatherModel = weatherVectorizer.fit(tokenizedWeatherDf)
    vectorizedWeatherDf = weatherModel.transform(tokenizedWeatherDf)

    #wordVecData_weather.select("Description", "desc_word2vec", "Weather_Condition", "weather_word2vec").show(420,truncate=False)

    cleanedDf = vectorizedWeatherDf.drop("Description", "Weather_Condition", "desc_words", "weather_words")

else:
    cleanedDf = cleanedDf.drop("Description", "Weather_Condition")

Some of the columns in the dataset have lots of null values, so we replace any null values in those columns with 0. We believe that for these columns 0 is a fairly obvious stand-in value – if the wind chill, precipitation, or wind speed were not recorded they were likely not important factors in the accident.

In [None]:
cleanedDf = cleanedDf.fillna({"Wind_Chill(F)": 0, "Precipitation(in)": 0, "Wind_Speed(mph)": 0})

Our dataframe cleanedDf is now ready for use in KMeans, PCA, and classification.

## Vectorize our feature columns for use in K-Means clustering and PCA

The rest of the processing we do on our data prefers our table be condensed into a single vectorized column, so we do that here.

For K-Means and PCA we will retain severity in the features column, later for the classifier we will drop that information from the features column.

In [None]:
#Ignore reamining null values
kmeansAssembler = VectorAssembler(inputCols=cleanedDf.columns, outputCol="features", handleInvalid="skip")
kmeansData = kmeansAssembler.transform(cleanedDf)

lostRecordCount = cleanedDf.count() - kmeansData.count()
print("Lost records due to null values: ", lostRecordCount) 
print("Fraction of total records lost: ", lostRecordCount/cleanedDf.count())

## K-Means Clustering

This data has a large number of features, so we will perform K-Means Clustering to see if we can find some patterns in our data and PCA to better understand which features of our dataset affect crash severity.

In [None]:
def fit_kmeans(df_input,num_cluster_centers=3):
  kmeans = KMeans().setK(num_cluster_centers).setSeed(1)
  
  model = kmeans.fit(df_input)
  
  clustered_data = model.transform(df_input)

  wcss = model.summary.trainingCost
  
  return clustered_data, wcss

In [None]:
kValues = range(2, 20)
wcss_scores = []

for k in kValues:
  clustered_data, wcss = fit_kmeans(kmeansData,num_cluster_centers=k)
  wcss_scores.append(wcss)

In [None]:
plt.plot(kValues, wcss_scores, marker='o')
plt.title("Elbow Method for Optimal K")
plt.xlabel("Number of Clusters (K)")
plt.ylabel("WCSS Scores")
plt.show()

After performing K-Means clustering with 2-20 clusters, there is no obvious candidate for optimal number of clusters. This could indicate that our data does not have well-separated, distinct clusters. Reducing the dimensionality of our data may lead to better results, so after completing PCA and identifying the principle components of our dataset we will revisit K-Means.

## PCA

This data has a large number of features, so we will perform PCA to see what the true dimensionality of our data is, and see if we can produce better K-Means results after reducing our data

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scalerModel = scaler.fit(kmeansData)
scaledData = scalerModel.transform(kmeansData)

pca = PCA(k=18, inputCol="scaledFeatures", outputCol="pcaFeatures")
pcaModel = pca.fit(scaledData)
result = pcaModel.transform(scaledData)

print("Explained Variance: ", sum(pcaModel.explainedVariance))

We will find the minimum number of principle components needed to explain at least 95% of the variance. We want to reduce the dimensionality of our data as much as possible but still retain a high amount of the variance.

In [None]:
principleComponents = range(1, 26)
bestPcCount = 1
for pc in principleComponents:
    pca=PCA(k=pc, inputCol="scaledFeatures", outputCol="pcaFeatures")
    pcaModel = pca.fit(scaledData)
    if sum(pcaModel.explainedVariance) > 0.95:
        bestPcCount = pc
        break
bestPca = PCA(k=bestPcCount, inputCol="scaledFeatures", outputCol="pcaFeatures")
bestPcaModel = bestPca.fit(scaledData)
pcaReducedDf = bestPcaModel.transform(scaledData)

print("Princple Components needed for >95% explained variance: ", bestPcCount)
print("Explained Variance: ", sum(bestPcaModel.explainedVariance))

In [None]:
def fit_kmeans_column(df_input,column_name='pcaFeatures',num_cluster_centers=3):
  kmeans = KMeans(featuresCol=column_name).setK(num_cluster_centers).setSeed(1)
  
  model = kmeans.fit(df_input)
  
  clustered_data = model.transform(df_input)

  wcss = model.summary.trainingCost
  
  return clustered_data, wcss

In [None]:
wcss_scores_pca = []

for k in kValues:
  clustered_data, wcss = fit_kmeans_column(pcaReducedDf,column_name="pcaFeatures",num_cluster_centers=k)
  wcss_scores_pca.append(wcss)

In [None]:
plt.plot(kValues, wcss_scores_pca, marker='o')
plt.title("Elbow Method for Optimal K")
plt.xlabel("Number of Clusters (K)")
plt.ylabel("WCSS Scores")
plt.show()

Unfortunately the results of K-Means clustering are still inconclusive after performing PCA and extracting the principle components of our dataset. Moreover, we were not able to greatly reduce the dimensionality of our data by performing PCA, in order to explain 95% of the variance in our data we needed to retain 21 principle components from 26 features.

## Vectorize our feature columns for use by classifiers

For the classifier we will drop severity from the features column, as that will be the label column that the classifier will be trying to predict.

We also split our data into a training set and test set, using 75% of the data in training and reserving 25% for testing. As we will be using these two sets for each of the classifiers, we cache them using the persist() command.

In [None]:
classifierAssembler = VectorAssembler(inputCols=cleanedDf.drop("Severity").columns, outputCol="features", handleInvalid="skip")
classifierData = classifierAssembler.transform(cleanedDf)

trainingData, testData = classifierData.randomSplit([0.75, 0.25], seed=42)
trainingData.persist()
testData.persist()

## Decision Tree Classifier

Create a Decision tree classifier using arbitrary hyperparameters

In [None]:
dtClassifier = DecisionTreeClassifier(featuresCol="features", labelCol="Severity", maxDepth=6, minInstancesPerNode=2, maxBins=32)
dtModel = dtClassifier.fit(trainingData)
testPredictions = dtModel.transform(testData)

accuracyEvaluator = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="accuracy")
accuracy = accuracyEvaluator.evaluate(testPredictions)
print(f"Accuracy: {accuracy}")

f1Evaluator = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="f1")
f1 = f1Evaluator.evaluate(testPredictions)
print(f"F1: {f1}")

We perform hyperparameter tuning on the classifier to identify the best set of hyperparameters.

In [None]:
dtHyperparamsEvaluationDf = pd.DataFrame(columns = ["max depth", "min instances per node", "training f1", "testing f1", "training accuracy", "testing accuracy", "best model"])

index = 0
bestIndex = 0
highestTestF1 = 0
bestDtModel = None

maxDepthList = range(2, 10)
minInstancesPerNodeList = range(2,15)

for maxDepth in maxDepthList:
    for minInstances in minInstancesPerNodeList:
        seed = 42
        dtClassifier = DecisionTreeClassifier(featuresCol="features", labelCol="Severity", maxDepth=maxDepth, minInstancesPerNode=minInstances, seed=seed)
        dtModel = dtClassifier.fit(trainingData)
        trainingPredictions = dtModel.transform(trainingData)
        testPredictions = dtModel.transform(testData)
        trainingF1 = f1Evaluator.evaluate(trainingPredictions)
        testingF1 = f1Evaluator.evaluate(testPredictions)
        trainingAccuracy = accuracyEvaluator.evaluate(trainingPredictions)
        testingAccuracy = accuracyEvaluator.evaluate(testPredictions)

        dtHyperparamsEvaluationDf.loc[index] = [maxDepth, minInstances, trainingF1, testingF1, trainingAccuracy, testingAccuracy, 0]

        if testingF1 > highestTestF1:
            highestTestF1 = testingF1
            bestIndex = index
            bestDtModel = dtModel

        index += 1

dtHyperparamsEvaluationDf.loc[bestIndex, "best model"] = 1
print(dtHyperparamsEvaluationDf.loc[bestIndex])

Our classifier achieved an f1 score on validation data of 0.85 with 85% accuracy. Let's visualize our decision tree to see if we can better understand how it is making its predictions.

In [None]:
from decision_tree_plot.decision_tree_parser import decision_tree_parse
try:
    from decision_tree_plot.decision_tree_plot import plot_trees
except ModuleNotFoundError:
    !pip3 install jinja2
    from decision_tree_plot.decision_tree_plot import plot_trees

In [None]:
bestModelPath = "bestDecisionTreeModel"
bestTree = decision_tree_parse(bestDtModel, ss, bestModelPath)
column = dict([(str(idx), i) for idx, i in enumerate(classifierData.columns)])
plot_trees(bestTree, column = column, output_path = 'decisionTreeVisualization.html')

The visualization of the decision tree is dense and hard to decipher. Later we will see how many features we need to keep to achieve similar amounts of predictive ability, and the visualization should be easier to understand if that succeeds.

## Classification with XGBoost

We want to try classification using an XGBoost model, but PySpark does not natively support this. So, we use the SparkXGBClassifier provided by [xgboost](https://github.com/dmlc/xgboost).

In [None]:
xgbClassifier = SparkXGBClassifier(features_col="features", label_col="Severity", max_depth=5, n_estimators=20, seed=42, learning_rate=0.2)
xgbModel = xgbClassifier.fit(trainingData)
trainingPredictions = xgbModel.transform(trainingData)
testPredictions = xgbModel.transform(testData)

accuracy = accuracyEvaluator.evaluate(testPredictions)
print(f"Accuracy: {accuracy}")

f1 = f1Evaluator.evaluate(testPredictions)
print(f"F1: {f1}")

In [None]:
xgbHyperparamsEvaluationDf = pd.DataFrame(columns = ["max depth", "num boosting rounds", "learning rate", "training f1", "testing f1", "training accuracy", "testing accuracy", "best model"])

index = 0
bestIndex = 0
highestTestF1 = 0
bestXgbModel = None

maxDepthList = range(2, 10)
numBoostingRounds = range(20,140,20)
learningRates = [0.01, 0.05, 0.1, 0.15, 0.2, 0.3]

for maxDepth in maxDepthList:
    for boostingRounds in numBoostingRounds:
        for learningRate in learningRates:
            seed=42
            xgbClassifier = SparkXGBClassifier(features_col="features", label_col="Severity", max_depth=maxDepth, n_estimators=boostingRounds, seed=seed, learning_rate=learningRate)
            xgbModel = xgbClassifier.fit(trainingData)
            trainingPredictions = xgbModel.transform(trainingData)
            testPredictions = xgbModel.transform(testData)
            trainingF1 = f1Evaluator.evaluate(trainingPredictions)
            testingF1 = f1Evaluator.evaluate(testPredictions)
            trainingAccuracy = accuracyEvaluator.evaluate(trainingPredictions)
            testingAccuracy = accuracyEvaluator.evaluate(testPredictions)

            xgbHyperparamsEvaluationDf.loc[index] = [maxDepth, boostingRounds, learningRate, trainingF1, testingF1, trainingAccuracy, testingAccuracy, 0]

            if testingF1 > highestTestF1:
                highestTestF1 = testingF1
                bestIndex = index
                bestXgbModel = xgbModel

            index += 1

xgbHyperparamsEvaluationDf.loc[bestIndex, "best model"] = 1
print(xgbHyperparamsEvaluationDf.loc[bestIndex])

The XGBoost classifier performed substantially better than the decision tree model. Granted, we tuned more hyperparameters here but even when using 2 hyperparameter as we did with the decision tree the results were better. XGBoost supposedly handles imbalanced data better than standard decision trees, and that may be the reason for the improved performance.

## Classification with a neural network (multilayer perceptron)

Neural Networks are generally outperformed by tree-based methods such as decision trees as xgboost in classifying tabular data, so we would expect that this method of classification would perform worst on our dataset. 

First we create an MLP Classifier with arbitrary hidden layer sizes. The input layer must have 25 nodes to ingest all 25 features of our dataset and the output layer must have 4 nodes for each of the 4 severity classes. The hyperparameter we will be tuning here is the hidden layer sizes i.e. the layers of nodes between the input and output layers.

In [None]:
mlpClassifier = MultilayerPerceptronClassifier(featuresCol="features", labelCol="Severity", layers=[25,12,4,16,4,4])
mlpModel = mlpClassifier.fit(trainingData)
trainingPredictions = mlpModel.transform(trainingData)

accuracy = accuracyEvaluator.evaluate(testPredictions)
print(f"Accuracy: {accuracy}")

f1 = f1Evaluator.evaluate(testPredictions)
print(f"F1: {f1}")

In [None]:
mlpHyperparamsEvaluationDf = pd.DataFrame(columns = ["hidden layer sizes", "training f1", "testing f1", "training accuracy", "testing accuracy", "best model"])

index = 0
bestIndex = 0
highestTestF1 = 0
bestMlpModel = None

hiddenLayerSizes = [[10], [20], [30], [10, 10], [10, 20], [10, 30], [20, 10], [20, 20], [20, 30], [30, 10], [30, 20], [30, 30]]

for layerSize in hiddenLayerSizes:
    seed=42
    mlpClassifier = MultilayerPerceptronClassifier(featuresCol="features", labelCol="Severity", layers=[25] + layerSize + [4])
    mlpModel = mlpClassifier.fit(trainingData)
    trainingPredictions = mlpModel.transform(trainingData)
    testPredictions = mlpModel.transform(testData)
    trainingF1 = f1Evaluator.evaluate(trainingPredictions)
    testingF1 = f1Evaluator.evaluate(testPredictions)
    trainingAccuracy = accuracyEvaluator.evaluate(trainingPredictions)
    testingAccuracy = accuracyEvaluator.evaluate(testPredictions)

    mlpHyperparamsEvaluationDf.loc[index] = [layerSize, trainingF1, testingF1, trainingAccuracy, testingAccuracy, 0]

    if testingF1 > highestTestF1:
        highestTestF1 = testingF1
        bestIndex = index
        bestMlpModel = mlpModel

    index += 1

mlpHyperparamsEvaluationDf.loc[bestIndex, "best model"] = 1
print(mlpHyperparamsEvaluationDf.loc[bestIndex])

As expected, the mlp classifier performed much worse than either tree-based classifier. With two hidden layers with 20 nodes, the model only achieved an F1 score of 0.6. It is possible that with better tuning of the hidden layer sizes this approach would work better, so we will increase the number of hidden layer sizes when running on ICDS and see if the results improve significantly.

## Results of classification

After training and tuning three classification models (decision tree, XGBoost, and multilayer perceptron) we can conclude that, of the models examined, the **XGBoost model is the most powerful predictor of car crash severity**, given this specific dataset. Neither the decision tree nor the multilayer perceptron classifier was able to achieve an F1 score >90% on the test data.

In [104]:
#ss.stop()