In [1]:
pip install pyspark 

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import when



In [3]:
# Initialize Spark Session
spark = SparkSession.builder.appName("InjurySeverityPrediction").getOrCreate()

# Load data
df = spark.read.csv("cleaned_traffic_crashes.csv", header=True, inferSchema=True)



In [4]:

df = df.withColumn("INJURY_SEVERITY", 
                   when(df["MOST_SEVERE_INJURY"] == 'FATAL', 'Fatal')
                   .when(df["MOST_SEVERE_INJURY"] == 'INCAPACITATING INJURY', 'Incapacitating Injury')
                   .when(df["MOST_SEVERE_INJURY"] == 'NONINCAPACITATING INJURY', 'Non-Incapacitating Injury')
                   .when(df["MOST_SEVERE_INJURY"] == 'REPORTED, NOT EVIDENT', 'Reported Not Evident Injury')
                   .otherwise('No Injury'))

distinct_injury_severity = df.select("INJURY_SEVERITY").distinct()
distinct_injury_severity.show()

binary_df = df.withColumn(
    "BINARY_INJURY_SEVERITY",
    when(
        (df["INJURY_SEVERITY"] == 'Fatal') |
        (df["INJURY_SEVERITY"] == 'Incapacitating Injury') |
        (df["INJURY_SEVERITY"] == 'Non-Incapacitating Injury') |
        (df["INJURY_SEVERITY"] == 'Reported Not Evident Injury'),
        'Injury'
    ).otherwise('No Injury')
)
#distinct_injury_severity = binary_df.select("BINARY_INJURY_SEVERITY").distinct()
#distinct_injury_severity.show()

+--------------------+
|     INJURY_SEVERITY|
+--------------------+
|Non-Incapacitatin...|
|               Fatal|
|Reported Not Evid...|
|           No Injury|
|Incapacitating In...|
+--------------------+



In [5]:
# Define the features

features = ['POSTED_SPEED_LIMIT', 'WEATHER_CONDITION', 'LIGHTING_CONDITION', 
            'TRAFFIC_CONTROL_DEVICE', 'DEVICE_CONDITION','ROADWAY_SURFACE_COND',
            'ROAD_DEFECT','CRASH_TYPE']


In [6]:

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in features]

In [7]:
encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers], outputCols=[col+"_encoded" for col in features])

In [8]:

assembler = VectorAssembler(inputCols=[col+"_encoded" for col in features], outputCol="features")

labelIndexer = StringIndexer(inputCol="INJURY_SEVERITY", outputCol="label").fit(df)

In [9]:
pipeline = Pipeline(stages=indexers + [encoder, assembler, labelIndexer])

pipelineModel = pipeline.fit(df)
transformed_df = pipelineModel.transform(df)

transformed_df.select(["features", "label"]).show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(98,[2,44,56,63,8...|  0.0|
|(98,[0,44,56,63,8...|  0.0|
|(98,[0,44,56,61,7...|  0.0|
|(98,[2,44,56,60,8...|  0.0|
|(98,[0,44,57,61,7...|  0.0|
|(98,[0,44,56,60,7...|  0.0|
|(98,[1,44,56,61,8...|  0.0|
|(98,[5,44,56,60,7...|  0.0|
|(98,[0,44,56,61,8...|  1.0|
|(98,[0,44,56,61,7...|  0.0|
|(98,[1,44,55,60,7...|  0.0|
|(98,[0,44,56,61,7...|  0.0|
|(98,[0,44,56,61,7...|  0.0|
|(98,[3,44,57,61,7...|  0.0|
|(98,[5,44,55,60,7...|  0.0|
|(98,[0,44,56,60,7...|  0.0|
|(98,[0,44,57,60,7...|  0.0|
|(98,[0,44,57,60,7...|  3.0|
|(98,[0,44,57,60,7...|  0.0|
|(98,[0,44,56,60,7...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [10]:
#1. Randomforestclassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Build the pipeline
pipeline = Pipeline(stages=indexers + [encoder, assembler, labelIndexer, rf])

# Splitting the dataset
(train, test) = df.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = pipeline.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate the model
rf_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_accuracy = rf_evaluator.evaluate(predictions)
rf_precision = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "weightedPrecision"})
rf_recall = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "weightedRecall"})
rf_f1 = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "f1"})

# Print the evaluation metrics
print("RF Accuracy = " + str(rf_accuracy))
print("RF Precision = " +str(rf_precision))
print("RF Recall = "+str(rf_recall))
print("RF F1-score = "+str(rf_f1))



RF Accuracy = 0.8635177335889758
RF Precision = 0.7456628762226413
RF Recall = 0.8635177335889758
RF F1-score = 0.8002745160750989


In [11]:
# 2.LogisticRegressionClassifier

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features")

# Build the pipeline with all stages including the Logistic Regression classifier
pipeline = Pipeline(stages=indexers + [encoder, assembler, labelIndexer, lr])

# Splitting the dataset
(train, test) = df.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = pipeline.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate the model
lr_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = lr_evaluator.evaluate(predictions)
lr_precision = lr_evaluator.evaluate(predictions, {lr_evaluator.metricName: "weightedPrecision"})
lr_recall = lr_evaluator.evaluate(predictions, {lr_evaluator.metricName: "weightedRecall"})
lr_f1 = lr_evaluator.evaluate(predictions, {lr_evaluator.metricName: "f1"})

print("LR Accuracy = " + str(lr_accuracy))
print("LR Precision = " +str(lr_precision))
print("LR Recall = "+str(lr_recall))
print("LR F1-score = "+str(lr_f1))

LR Accuracy = 0.8633510102791333
LR Precision = 0.8109876681324307
LR Recall = 0.8633510102791332
LR F1-score = 0.8235537512996971


In [12]:
# 3. DecisionTreeClassifier

from pyspark.ml.classification import GBTClassifier,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Build the pipeline with all stages including the Decision Tree classifier
pipeline = Pipeline(stages=indexers + [encoder, assembler, labelIndexer, dt])

# Splitting the dataset
(train, test) = df.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = pipeline.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate the model
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(predictions)
dt_precision = dt_evaluator.evaluate(predictions, {dt_evaluator.metricName: "weightedPrecision"})
dt_recall = dt_evaluator.evaluate(predictions, {dt_evaluator.metricName: "weightedRecall"})
dt_f1 = dt_evaluator.evaluate(predictions, {dt_evaluator.metricName: "f1"})

# Print the evaluation metrics
print("DT Accuracy = " + str(dt_accuracy))
print("DT Precision = " +str(dt_precision))
print("DT Recall = "+str(dt_recall))
print("DT F1-score = "+str(dt_f1))

DT Accuracy = 0.8637614061187455
DT Precision = 0.7865504827202008
DT Recall = 0.8637614061187455
DT F1-score = 0.8015812928742783


In [13]:
## 4. LinearSVC classifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.classification import LinearSVC


binary_df = df.withColumn(
    "BINARY_INJURY_SEVERITY",
    when(
        (df["INJURY_SEVERITY"] == 'Fatal') |
        (df["INJURY_SEVERITY"] == 'Incapacitating Injury') |
        (df["INJURY_SEVERITY"] == 'Non-Incapacitating Injury') |
        (df["INJURY_SEVERITY"] == 'Reported Not Evident Injury'),
        'Injury'
    ).otherwise('No Injury')
)

binary_labelIndexer = StringIndexer(inputCol="BINARY_INJURY_SEVERITY", outputCol="label").fit(binary_df)

svc = LinearSVC(labelCol="label", featuresCol="features")

pipeline_svc = Pipeline(stages=indexers + [encoder, assembler, binary_labelIndexer, svc])

(train_svc, test_svc) = binary_df.randomSplit([0.8, 0.2], seed=42)

model_svc = pipeline_svc.fit(train_svc)

predictions_svc = model_svc.transform(test_svc)

binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
svc_auc = binary_evaluator.evaluate(predictions_svc)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
f1 = evaluator.evaluate(predictions_svc, {evaluator.metricName: "f1"})
weightedRecall = evaluator.evaluate(predictions_svc, {evaluator.metricName: "weightedRecall"})
weightedPrecision = evaluator.evaluate(predictions_svc, {evaluator.metricName: "weightedPrecision"})
accuracy = evaluator.evaluate(predictions_svc, {evaluator.metricName: "accuracy"})

# Print metrics
#print("SVC AUC = " + str(svc_auc))
print("SVC Accuracy = " + str(accuracy))
print("SVC Precision = " + str(weightedPrecision))
print("SVC Recall = " + str(weightedRecall))
print("SVC F1 Score = " + str(f1))

SVC Accuracy = 0.8875451275112699
SVC Precision = 0.8866783678936732
SVC Recall = 0.8875451275112698
SVC F1 Score = 0.8871031098846179


In [14]:
# 5. Gradient boosting classifier

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)

pipeline_gbt = Pipeline(stages=indexers + [encoder, assembler, binary_labelIndexer, gbt])

(train_gbt, test_gbt) = binary_df.randomSplit([0.8, 0.2], seed=42)

model_gbt = pipeline_gbt.fit(train_gbt)

predictions_gbt = model_gbt.transform(test_gbt)

gbt_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
gbt_auc = gbt_evaluator.evaluate(predictions_gbt)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
f1 = evaluator.evaluate(predictions_gbt, {evaluator.metricName: "f1"})
weightedRecall = evaluator.evaluate(predictions_gbt, {evaluator.metricName: "weightedRecall"})
weightedPrecision = evaluator.evaluate(predictions_gbt, {evaluator.metricName: "weightedPrecision"})
accuracy = evaluator.evaluate(predictions_gbt, {evaluator.metricName: "accuracy"})

#print("GBT AUC = " + str(gbt_auc))
print("GBT Accuracy = " + str(accuracy))
print("GBT Precision = " + str(weightedPrecision))
print("GBT Recall = " + str(weightedRecall))
print("GBT F1 Score = " + str(f1))


GBT Accuracy = 0.8895842818393429
GBT Precision = 0.9038458245421899
GBT Recall = 0.8895842818393428
GBT F1 Score = 0.8950201892133081


Accuracy: The Gradient Boosting Classifier (GBT) has the highest accuracy (0.8896), indicating it correctly classifies the highest percentage of instances among the models tested.

Precision and Recall: GBT also leads in precision (0.9038), suggesting it has the lowest false positive rate, and has a high recall (0.8896), indicating it correctly identifies a high percentage of actual positives.

F1 Score: The F1 score is the harmonic mean of precision and recall, providing a balance between the two. GBT again has the highest F1 score (0.8950), suggesting it has the most balanced performance between precision and recall among the models evaluated.

In [None]:
spark.stop()