In [1]:
!pip install pyspark==3.0.0
!pip install findspark

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044158 sha256=5ff5581961c9527d885747a9610b6c339c4fe3c41f89921bad33468710a4dbb8
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attem

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, count, when, lit
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark=SparkSession.builder.appName("Accidentseverityclassification").getOrCreate()

df=spark.read.csv("/content/RTA Dataset.csv",header=True,inferSchema=True)

In [3]:
df.show()

+--------+-----------+------------------+-------------+------------------+-----------------------+------------------+--------------------+----------------+-----------------------+-----------------+---------------------+--------------------+--------------------+-----------------+-----------------+-----------------------+--------------------+------------------+--------------------+---------------------------+--------------------+----------------+---------------+---------------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+-----------------+
|    Time|Day_of_week|Age_band_of_driver|Sex_of_driver| Educational_level|Vehicle_driver_relation|Driving_experience|     Type_of_vehicle|Owner_of_vehicle|Service_year_of_vehicle|Defect_of_vehicle|Area_accident_occured|    Lanes_or_Medians|     Road_allignment|Types_of_Junction|Road_surface_type|Road_surface_conditions|    Light_conditions|Weather_conditions|   Type_of_collision

In [4]:
# converting the features to its original datatype
df = df.withColumn("Time", df["Time"].cast("timestamp"))
df = df.withColumn("Casualty_severity", df["Casualty_severity"].cast("integer"))

In [5]:
df.show()

+-------------------+-----------+------------------+-------------+------------------+-----------------------+------------------+--------------------+----------------+-----------------------+-----------------+---------------------+--------------------+--------------------+-----------------+-----------------+-----------------------+--------------------+------------------+--------------------+---------------------------+--------------------+----------------+---------------+---------------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+-----------------+
|               Time|Day_of_week|Age_band_of_driver|Sex_of_driver| Educational_level|Vehicle_driver_relation|Driving_experience|     Type_of_vehicle|Owner_of_vehicle|Service_year_of_vehicle|Defect_of_vehicle|Area_accident_occured|    Lanes_or_Medians|     Road_allignment|Types_of_Junction|Road_surface_type|Road_surface_conditions|    Light_conditions|Weather_condition

In [6]:
df.printSchema()

root
 |-- Time: timestamp (nullable = true)
 |-- Day_of_week: string (nullable = true)
 |-- Age_band_of_driver: string (nullable = true)
 |-- Sex_of_driver: string (nullable = true)
 |-- Educational_level: string (nullable = true)
 |-- Vehicle_driver_relation: string (nullable = true)
 |-- Driving_experience: string (nullable = true)
 |-- Type_of_vehicle: string (nullable = true)
 |-- Owner_of_vehicle: string (nullable = true)
 |-- Service_year_of_vehicle: string (nullable = true)
 |-- Defect_of_vehicle: string (nullable = true)
 |-- Area_accident_occured: string (nullable = true)
 |-- Lanes_or_Medians: string (nullable = true)
 |-- Road_allignment: string (nullable = true)
 |-- Types_of_Junction: string (nullable = true)
 |-- Road_surface_type: string (nullable = true)
 |-- Road_surface_conditions: string (nullable = true)
 |-- Light_conditions: string (nullable = true)
 |-- Weather_conditions: string (nullable = true)
 |-- Type_of_collision: string (nullable = true)
 |-- Number_of_ve

In [7]:
# checking number of rows and columns
rows=df.count()
columns=len(df.columns)

print('Number of Rows',rows)
print('Number of Columns',columns)

Number of Rows 12316
Number of Columns 32


In [16]:
#drop unwanted columns
#drop unwanted columns
df=df.drop(*['Time','Vehicle_driver_relation', 'Work_of_casuality', 'Fitness_of_casuality','Sex_of_driver',
            'Educational_level','Owner_of_vehicle', 'Sex_of_casualty','Age_band_of_driver','Age_band_of_casualty'])

In [17]:
df.columns

['Day_of_week',
 'Driving_experience',
 'Type_of_vehicle',
 'Service_year_of_vehicle',
 'Defect_of_vehicle',
 'Area_accident_occured',
 'Lanes_or_Medians',
 'Road_allignment',
 'Types_of_Junction',
 'Road_surface_type',
 'Road_surface_conditions',
 'Light_conditions',
 'Weather_conditions',
 'Type_of_collision',
 'Number_of_vehicles_involved',
 'Number_of_casualties',
 'Vehicle_movement',
 'Casualty_class',
 'Casualty_severity',
 'Pedestrian_movement',
 'Cause_of_accident',
 'Accident_severity']

In [18]:
# checking duplicates
duplicate_records = df.groupBy(df.columns).agg(count("*").alias("count")).filter(col("count") > 1)
duplicate_records.show()

+-----------+------------------+---------------+-----------------------+-----------------+---------------------+----------------+---------------+-----------------+-----------------+-----------------------+----------------+------------------+-----------------+---------------------------+--------------------+----------------+--------------+-----------------+-------------------+-----------------+-----------------+-----+
|Day_of_week|Driving_experience|Type_of_vehicle|Service_year_of_vehicle|Defect_of_vehicle|Area_accident_occured|Lanes_or_Medians|Road_allignment|Types_of_Junction|Road_surface_type|Road_surface_conditions|Light_conditions|Weather_conditions|Type_of_collision|Number_of_vehicles_involved|Number_of_casualties|Vehicle_movement|Casualty_class|Casualty_severity|Pedestrian_movement|Cause_of_accident|Accident_severity|count|
+-----------+------------------+---------------+-----------------------+-----------------+---------------------+----------------+---------------+-------------

In [19]:
# Define numerical columns
numerical_cols = ["Number_of_vehicles_involved", "Number_of_casualties","Casualty_severity"]
# Replace 'unknown' and null values in numerical columns with the mean value
for col_name in numerical_cols:
    # Calculate the mean value
    mean_value = df.select(mean(col(col_name))).collect()[0][0]
    # Replace 'unknown' and null values with the mean value
    df = df.withColumn(col_name, when((col(col_name) == 'unknown') | col(col_name).isNull(), mean_value).otherwise(col(col_name)))

In [20]:
# Define categorical columns
categorical_cols = ['Day_of_week','Driving_experience','Type_of_vehicle','Service_year_of_vehicle',
                    'Defect_of_vehicle','Area_accident_occured','Lanes_or_Medians','Road_allignment',
                    'Types_of_Junction','Road_surface_type','Road_surface_conditions','Light_conditions',
                    'Weather_conditions','Type_of_collision','Vehicle_movement','Casualty_class','Pedestrian_movement',
                    'Cause_of_accident']

# Replace 'unknown' and null values in categorical columns with the most frequent category
for col_name in categorical_cols:
    # Calculate the most frequent category
    mode_value = df.groupBy(col_name).count().orderBy('count', ascending=False).first()[0]
    # Replace 'unknown' and null values with the most frequent category
    df = df.withColumn(col_name, when((col(col_name) == 'unknown') | col(col_name).isNull(), mode_value).otherwise(col(col_name)))

df.show()

+-----------+------------------+--------------------+-----------------------+-----------------+---------------------+--------------------+--------------------+-----------------+-----------------+-----------------------+--------------------+------------------+--------------------+---------------------------+--------------------+----------------+---------------+------------------+--------------------+--------------------+-----------------+
|Day_of_week|Driving_experience|     Type_of_vehicle|Service_year_of_vehicle|Defect_of_vehicle|Area_accident_occured|    Lanes_or_Medians|     Road_allignment|Types_of_Junction|Road_surface_type|Road_surface_conditions|    Light_conditions|Weather_conditions|   Type_of_collision|Number_of_vehicles_involved|Number_of_casualties|Vehicle_movement| Casualty_class| Casualty_severity| Pedestrian_movement|   Cause_of_accident|Accident_severity|
+-----------+------------------+--------------------+-----------------------+-----------------+---------------------

In [24]:
# data preprocessing

from pyspark.ml.feature import StringIndexer,OneHotEncoder


categorical_cols=['Day_of_week','Driving_experience','Type_of_vehicle','Service_year_of_vehicle',
                    'Defect_of_vehicle','Area_accident_occured','Lanes_or_Medians','Road_allignment',
                    'Types_of_Junction','Road_surface_type','Road_surface_conditions','Light_conditions',
                    'Weather_conditions','Type_of_collision','Vehicle_movement','Casualty_class','Pedestrian_movement',
                    'Cause_of_accident']

indexer=[StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="skip") for col in categorical_cols]

encoder=[OneHotEncoder(inputCol=col + "_indexed", outputCol=col + "_encoded") for col in categorical_cols]

# Indexer for the label column
label_indexer = StringIndexer(inputCol="Accident_severity", outputCol="label")


In [25]:
from pyspark.ml import Pipeline

pipeline=Pipeline(stages=indexer + encoder + [label_indexer])
pipeline=pipeline.fit(df)

df=pipeline.transform(df)

In [26]:
# Assemble all feature columns into a feature vector
assembler_input_cols = numerical_cols + [f"{col}_encoded" for col in categorical_cols]
assembler = VectorAssembler(inputCols=assembler_input_cols, outputCol="features")
df = assembler.transform(df)

In [32]:
df.select('features','label').show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(139,[0,1,2,7,12,...|  0.0|
|(139,[0,1,2,7,11,...|  0.0|
|(139,[0,1,2,10,14...|  0.0|
|(139,[0,1,2,3,10,...|  0.0|
|(139,[0,1,2,3,11,...|  0.0|
|(139,[0,1,2,3,12,...|  1.0|
|(139,[0,1,2,8,12,...|  1.0|
|(139,[0,1,2,4,12,...|  0.0|
|(139,[0,1,2,4,9,1...|  0.0|
|(139,[0,1,2,4,11,...|  1.0|
|(139,[0,1,2,7,10,...|  0.0|
|(139,[0,1,2,6,13,...|  1.0|
|(139,[0,1,2,4,10,...|  1.0|
|(139,[0,1,2,5,10,...|  0.0|
|(139,[0,1,2,5,9,1...|  0.0|
|(139,[0,1,2,3,10,...|  0.0|
|(139,[0,1,2,4,11,...|  0.0|
|(139,[0,1,2,4,13,...|  0.0|
|(139,[0,1,2,8,13,...|  0.0|
|(139,[0,1,2,8,9,1...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [33]:
data=df.select('features','label')
data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(139,[0,1,2,7,12,...|  0.0|
|(139,[0,1,2,7,11,...|  0.0|
|(139,[0,1,2,10,14...|  0.0|
|(139,[0,1,2,3,10,...|  0.0|
|(139,[0,1,2,3,11,...|  0.0|
|(139,[0,1,2,3,12,...|  1.0|
|(139,[0,1,2,8,12,...|  1.0|
|(139,[0,1,2,4,12,...|  0.0|
|(139,[0,1,2,4,9,1...|  0.0|
|(139,[0,1,2,4,11,...|  1.0|
|(139,[0,1,2,7,10,...|  0.0|
|(139,[0,1,2,6,13,...|  1.0|
|(139,[0,1,2,4,10,...|  1.0|
|(139,[0,1,2,5,10,...|  0.0|
|(139,[0,1,2,5,9,1...|  0.0|
|(139,[0,1,2,3,10,...|  0.0|
|(139,[0,1,2,4,11,...|  0.0|
|(139,[0,1,2,4,13,...|  0.0|
|(139,[0,1,2,8,13,...|  0.0|
|(139,[0,1,2,8,9,1...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [34]:
# Train-test split
train_df, test_df = data.randomSplit([0.8, 0.2], seed=42)

In [50]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(labelCol="label", featuresCol="features")
model_lr=lr.fit(train_df)
# Make predictions on the test data
predictions_lr = model_lr.transform(test_df)

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
model_dt=dt.fit(train_df)
# Make predictions on the test data
predictions_dt = model_dt.transform(test_df)

rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=200)
model_rf=rf.fit(train_df)
# Make predictions on the test data
predictions_rf = model_rf.transform(test_df)

nb = NaiveBayes(labelCol="label", featuresCol="features")
model_nb=nb.fit(train_df)
# Make predictions on the test data
predictions_nb = model_nb.transform(test_df)

In [51]:
predictions_lr.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(139,[0,1,2,3,9,1...|  1.0|[2.06135385395914...|[0.68381356657025...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[2.34852228547104...|[0.84062224117351...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|[2.54997761752030...|[0.71668582919977...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[3.57143120556823...|[0.81812327053488...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[1.86857798688019...|[0.79750362108599...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[3.77271417227242...|[0.86776169932519...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|[1.76345036024959...|[0.80212832928762...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[3.85902551934572...|[0.88132836171923...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[2.15422281087061...|[0.78573207064292...|       0.0|
|(139,[0,1,2,3,9

In [52]:
predictions_dt.show(10)

+--------------------+-----+-------------------+--------------------+----------+
|            features|label|      rawPrediction|         probability|prediction|
+--------------------+-----+-------------------+--------------------+----------+
|(139,[0,1,2,3,9,1...|  1.0| [629.0,200.0,16.0]|[0.74437869822485...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0| [629.0,200.0,16.0]|[0.74437869822485...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|   [196.0,44.0,2.0]|[0.80991735537190...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[3725.0,512.0,59.0]|[0.86708566108007...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[3725.0,512.0,59.0]|[0.86708566108007...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[3725.0,512.0,59.0]|[0.86708566108007...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|[3725.0,512.0,59.0]|[0.86708566108007...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[3725.0,512.0,59.0]|[0.86708566108007...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|      [3.0,4.0,0.0]|[0.42857142857142...|       1.0|
|(139,[0,1,2,3,9,1...|  0.0|

In [53]:
predictions_rf.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(139,[0,1,2,3,9,1...|  1.0|[162.118921759061...|[0.81059460879530...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[164.807423383409...|[0.82403711691704...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|[166.298264529010...|[0.83149132264505...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[170.232023252775...|[0.85116011626387...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[169.625147352362...|[0.84812573676181...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[170.994561099192...|[0.85497280549596...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|[169.497760250533...|[0.84748880125266...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[169.582077837425...|[0.84791038918712...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[167.192987557998...|[0.83596493778999...|       0.0|
|(139,[0,1,2,3,9

In [54]:
predictions_nb.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(139,[0,1,2,3,9,1...|  1.0|[-86.297911111396...|[0.79835611018467...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[-85.028823534987...|[0.87059652056677...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|[-88.476517768290...|[0.76563169450656...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[-89.852000689915...|[0.82662320850905...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[-93.711254866015...|[0.80511483680333...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[-97.445978964963...|[0.86715458121198...|       0.0|
|(139,[0,1,2,3,9,1...|  1.0|[-92.599188227083...|[0.83269989802993...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[-92.601286246796...|[0.86407867621425...|       0.0|
|(139,[0,1,2,3,9,1...|  0.0|[-99.796978163524...|[0.82211663151014...|       0.0|
|(139,[0,1,2,3,9

In [55]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
# Muti Classification Evaluator for AUC
evaluator_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Multiclass Classification Evaluator for Accuracy, Precision, and Recall
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

# Calculate metrics for Logistic Regression
auc_lr = evaluator_auc.evaluate(predictions_lr)
accuracy_lr = evaluator_accuracy.evaluate(predictions_lr)
precision_lr = evaluator_precision.evaluate(predictions_lr)
recall_lr = evaluator_recall.evaluate(predictions_lr)

# Calculate metrics for Decision Tree
auc_dt = evaluator_auc.evaluate(predictions_dt)
accuracy_dt = evaluator_accuracy.evaluate(predictions_dt)
precision_dt = evaluator_precision.evaluate(predictions_dt)
recall_dt = evaluator_recall.evaluate(predictions_dt)

# Calculate metrics for Random Forest
auc_rf = evaluator_auc.evaluate(predictions_rf)
accuracy_rf = evaluator_accuracy.evaluate(predictions_rf)
precision_rf = evaluator_precision.evaluate(predictions_rf)
recall_rf = evaluator_recall.evaluate(predictions_rf)

# Calculate metrics for Naive Bayes
auc_nb = evaluator_auc.evaluate(predictions_nb)
accuracy_nb = evaluator_accuracy.evaluate(predictions_nb)
precision_nb = evaluator_precision.evaluate(predictions_nb)
recall_nb = evaluator_recall.evaluate(predictions_nb)

# Print the results
print(f"Logistic Regression - AUC: {auc_lr}, Accuracy: {accuracy_lr}, Precision: {precision_lr}, Recall: {recall_lr}")
print(f"Decision Tree - AUC: {auc_dt}, Accuracy: {accuracy_dt}, Precision: {precision_dt}, Recall: {recall_dt}")
print(f"Random Forest - AUC: {auc_rf}, Accuracy: {accuracy_rf}, Precision: {precision_rf}, Recall: {recall_rf}")
print(f"naive Bayes - AUC: {auc_nb}, Accuracy: {accuracy_nb}, Precision: {precision_nb}, Recall: {recall_nb}")


Logistic Regression - AUC: 0.48197835170421305, Accuracy: 0.8502807236431691, Precision: 0.7269129138337787, Recall: 0.8502807236431691
Decision Tree - AUC: 0.4726110622047537, Accuracy: 0.8565190268247037, Precision: 0.8204255559207139, Recall: 0.8565190268247036
Random Forest - AUC: 0.5943362305183925, Accuracy: 0.852776044915783, Precision: 0.7272269827822055, Recall: 0.852776044915783
naive Bayes - AUC: 0.4897213990800097, Accuracy: 0.8521522145976295, Precision: 0.7271486125811233, Recall: 0.8521522145976295
