In [0]:
%fs
ls mnt/usaccidentdl/

path,name,size,modificationTime
dbfs:/mnt/usaccidentdl/processed/,processed/,0,0
dbfs:/mnt/usaccidentdl/raw/,raw/,0,0


In [0]:
us_df = spark.read.format("csv").option("header","true").option("inferSchema",'True').load('/mnt/usaccidentdl/processed/final_df.csv')

In [0]:
# SINCE DATA IS TOO LARGE WE HAVE EXTRACTED ONLY FIRST 2 LAKH RECORDS FOR MODEL BUILDING
limit = 200000
us_df = us_df.limit(limit)

In [0]:
us_df.show(10)

+-------+-------+--------+---------+------------------+---------+------------------+------------+-----------+-------+----------+--------------+-------------+-----------+------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+-------+-------+-------+-----+------------+--------------+----+-------------------+-------------------+-------+-------------------+-------------------+
|    _c0| Source|Severity|Start_Lat|         Start_Lng|  End_Lat|           End_Lng|Distance(mi)|       City|Zipcode|  Timezone|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Junction|No_Exit|Railway|Station| Stop|Turning_Loop|Sunrise_Sunset|Year|              Month|                Day|Weekday|               Hour|Time Duration (min)|
+-------+-------+--------+---------+------------------+---------+------------------+------------+-----------+-------+----------+------

In [0]:
us_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Precipitation(in): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: boolean (nullable = true)
 |-- Bump: boolean (nullable = true)
 |-- Crossing: boolean (nullable = true)
 |-- Junction: boolean (nullable = true)
 |-- No_Exit: boolean (nullable =

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorIndexer
from pyspark.ml import Pipeline

In [0]:
from pyspark.sql.functions import col, year, month, dayofmonth, dayofweek, hour
from pyspark.sql.functions import to_timestamp

In [0]:
us_df = us_df.withColumn('Month',month(col("Month"))).withColumn('Hour',
    hour(col("Hour")))

In [0]:
us_df = us_df.withColumn('Day',dayofmonth(col('Day')))

In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer  
from pyspark.sql.functions import col

In [0]:
# List of columns that need preprocessing
categorical_cols = ["City", "Zipcode", "Timezone", "Wind_Direction", "Sunrise_Sunset","Weather_Condition"]

# Create a list of stages for the pipeline
stages = []

# String Indexing for categorical columns
for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index")
    stages.append(indexer)

# One-Hot Encoding for indexed categorical columns
for col_name in [f"{col_name}_index" for col_name in categorical_cols]:
    encoder = OneHotEncoder(inputCol=col_name, outputCol=f"{col_name}_encoded")
    stages.append(encoder)

# List of columns after one-hot encoding
encoded_cols = [f"{col_name}_encoded" for col_name in [f"{col_name}_index" for col_name in categorical_cols]]

# Assemble features into a vector column
feature_cols = ["Severity", "Start_Lat", "Start_Lng", "End_Lat", "End_Lng", "Distance(mi)", "Temperature(F)",
                "Wind_Chill(F)", "Humidity(%)", "Pressure(in)", "Wind_Speed(mph)", "Precipitation(in)",
                "Amenity", "Bump", "Crossing", "Junction", "No_Exit", "Railway", "Station", "Stop",
                "Turning_Loop", "Year", "Month", "Day", "Weekday", "Hour", "Time Duration (min)"]
assembler = VectorAssembler(inputCols=encoded_cols + feature_cols, outputCol="features")
stages.append(assembler)

# Create a pipeline with all stages
pipeline = Pipeline(stages=stages)

# Fit and transform the pipeline
pipeline_model = pipeline.fit(us_df)
preprocessed_df = pipeline_model.transform(us_df)

# Show the resulting DataFrame
preprocessed_df.show()

+--------+---------+------------------+-----------------+------------------+------------------+-----------+-------+----------+--------------+-------------+-----------+------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+-------+-------+-------+-----+------------+--------------+----+-----+---+-------+----+-------------------+----------+-------------+--------------+--------------------+--------------------+-----------------------+-------------------+---------------------+----------------------+----------------------------+----------------------------+-------------------------------+--------------------+
|Severity|Start_Lat|         Start_Lng|          End_Lat|           End_Lng|      Distance(mi)|       City|Zipcode|  Timezone|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Junction|No_Exit|Railway|Station| Stop|Turning_Loop|Sunrise_Su

In [0]:
encoded_cols

['City_index_encoded',
 'Zipcode_index_encoded',
 'Timezone_index_encoded',
 'Wind_Direction_index_encoded',
 'Sunrise_Sunset_index_encoded',
 'Weather_Condition_index_encoded']

In [0]:
preprocessed_df.select('features').show()

+--------------------+
|            features|
+--------------------+
|(64739,[348,11119...|
|(64739,[93,13842,...|
|(64739,[90,15209,...|
|(64739,[1259,8524...|
|(64739,[93,9207,6...|
|(64739,[90,8827,6...|
|(64739,[541,13009...|
|(64739,[65,11120,...|
|(64739,[379,18668...|
|(64739,[2896,1149...|
|(64739,[65,11120,...|
|(64739,[80,18630,...|
|(64739,[93,10577,...|
|(64739,[2811,1524...|
|(64739,[20,43434,...|
|(64739,[80,11124,...|
|(64739,[1579,1031...|
|(64739,[66,8885,6...|
|(64739,[66,10719,...|
|(64739,[90,8827,6...|
+--------------------+
only showing top 20 rows



In [0]:
us_df.count()

200000

In [0]:
final_df = preprocessed_df.select('features','Severity')

In [0]:
train_data,test_data = final_df.randomSplit([0.7,0.3],seed=42)

# LOGISTIC REGRESSION

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

In [0]:
log_reg = LogisticRegression(featuresCol='features',labelCol='Severity')

In [0]:
log_reg_model = log_reg.fit(train_data)

In [0]:
predictions = log_reg_model.transform(test_data)

In [0]:
predictions.show()

+--------------------+--------+--------------------+--------------------+----------+
|            features|Severity|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|(64739,[0,7075,64...|       2|[-5.4626466728440...|[1.32491690972221...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4631935265005...|[2.62948516632558...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4601945316914...|[9.48239838559812...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4591629974348...|[1.49241208732829...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4628480811400...|[3.53600062426869...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4619476467814...|[4.06026403963626...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4629158682354...|[7.63034917757077...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4610728323604...|[8.91825438597174...|       2.0|
|(64739,[0,7075,64...|       2|[-5.4619321900545...|[1.9376894939

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Severity", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("Area Under ROC Curve (AUC) = %g" % auc)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-2500396693763356>, line 2[0m
[1;32m      1[0m evaluator [38;5;241m=[39m BinaryClassificationEvaluator(rawPredictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mrawPrediction[39m[38;5;124m"[39m, labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mSeverity[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124mareaUnderROC[39m[38;5;124m"[39m)
[0;32m----> 2[0m auc [38;5;241m=[39m evaluator[38;5;241m.[39mevaluate(predictions)
[1;32m      3[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mArea Under ROC Curve (AUC) = [39m[38;5;132;01m%g[39;00m[38;5;124m"[39m [38;5;241m%[39m auc)

File [0;32m/databricks/spark/python/pyspark/ml/evaluation.py:111[0m, in [0;36mEvaluator.evaluate[0;34m(self, dataset, params)[0m
[1;32m    109[0m         [38;5

In [0]:
# Accuracy, Precision, and Recall
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

#print(f"AUC-ROC: {AUC:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Accuracy: 0.9945
Precision: 0.9948
Recall: 0.9945


# DECISION TREES

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

In [0]:
tree = DecisionTreeClassifier(featuresCol='features',labelCol='Severity',maxDepth=5,maxBins=32)
tree_model = tree.fit(train_data)

In [0]:
# Create predictions on test data
pred_dt= tree_model.transform(test_data)

In [0]:
tree_model.featureImportances

SparseVector(64739, {64712: 1.0})

In [0]:
# Accuracy, Precision, and Recall
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(pred_dt, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(pred_dt, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(pred_dt, {multi_evaluator.metricName: "weightedRecall"})

#print(f"AUC-ROC: {AUC:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000


# RANDOM FOREST CLASSIFIER

In [0]:
from pyspark.ml.classification import RandomForestClassifier

In [0]:
rfc = RandomForestClassifier(numTrees=10, maxDepth=5, labelCol="Severity", seed=1)
model = rfc.fit(train_data)

In [0]:
predictions_rf = model.transform(test_data)

In [0]:
# Accuracy, Precision, and Recall
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predictions_rf, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions_rf, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions_rf, {multi_evaluator.metricName: "weightedRecall"})

#print(f"AUC-ROC: {AUC:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Accuracy: 0.9571
Precision: 0.9161
Recall: 0.9571
