In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when, col

In [24]:
spark = SparkSession.builder \
    .appName("CrashRemedyPrediction") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.debug.maxToStringFields", "200") \
    .getOrCreate()

In [25]:
df = spark.read.parquet('/Users/swaifahaque/Documents/BigDataProject/merged')
df.printSchema()

root
 |-- municipality: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- region: string (nullable = true)
 |-- crash_configuration: string (nullable = true)
 |-- total_casualty: integer (nullable = true)
 |-- pedestrian_involved: boolean (nullable = true)
 |-- crash_severity: string (nullable = true)
 |-- day: string (nullable = true)
 |-- is_intersection_crash: boolean (nullable = true)
 |-- street: string (nullable = true)
 |-- time: string (nullable = true)
 |-- total_crashes: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- cross_street: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- city: string (nullable = true)
 |-- collision_type: string (nullable = true)
 |-- light: string (nullable = true)
 |-- road_condition: string (nullable = true)
 |-- weather: string (nullable = true)
 |-- road_surface: string (nullable = true)
 |-- speed_limit_km_h: integer (nullable = true)
 |-- speed_a

In [26]:
from pyspark.sql import functions as F

# Group by lat-long and aggregate required fields
agg_df = df.groupBy("latitude", "longitude").agg(
    F.sum("total_crashes").alias("total_crashes"),
    F.mode("crash_severity").alias("crash_severity"),
    F.mode("time_period").alias("time_period"),
    F.mode("is_weekend").alias("is_weekend"),
    F.mode("season").alias("season"),
    F.mode("road_condition").alias("road_condition"),
    F.mode("weather").alias("weather"),
    F.avg("speed_limit_km_h").alias("avg_speed_limit")
)

In [27]:
# Step 5: Compute crash quantiles to assign labels
quantiles = agg_df.approxQuantile("total_crashes", [0.5, 0.9], 0.01)
q50, q90 = quantiles

# Add risk level labels
agg_df = agg_df.withColumn(
    "hotspot_level",
    when(col("total_crashes") >= q90, "High")
    .when((col("total_crashes") >= q50) & (col("total_crashes") < q90), "Moderate")
    .otherwise("Low")
)


In [28]:
# Step 6: Index categorical variables
indexers = [
    StringIndexer(inputCol=col_name, outputCol=col_name + "_idx", handleInvalid="keep")
    for col_name in ["time_period", "season", "road_condition", "weather", "hotspot_level"]
]
for indexer in indexers:
    agg_df = indexer.fit(agg_df).transform(agg_df)

# Step 7: Feature Assembling
feature_cols = [
    "total_crashes", "avg_speed_limit",
    "time_period_idx", "season_idx", "road_condition_idx", "weather_idx"
]

                                                                                

In [29]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_df = assembler.transform(agg_df)

MODEL 1 : Random Forest

In [30]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 8: Train/Test Split
train_df, test_df = assembled_df.randomSplit([0.8, 0.2], seed=42)

# Step 9: Model Training
rf = RandomForestClassifier(labelCol="hotspot_level_idx", featuresCol="features", numTrees=100)
model = rf.fit(train_df)



In [31]:
# Step 10: Evaluation
predictions = model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(
    labelCol="hotspot_level_idx", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)

print(f"Model Accuracy: {accuracy:.4f}")



Model Accuracy: 0.9769


                                                                                

In [32]:
# Optional: Show sample predictions
predictions.select("latitude", "longitude", "hotspot_level", "prediction").show(10)



+---------+-----------+-------------+----------+
| latitude|  longitude|hotspot_level|prediction|
+---------+-----------+-------------+----------+
|48.378034| -123.72435|          Low|       0.0|
|48.387265|-123.687465|          Low|       0.0|
|48.392989|-123.710245|          Low|       0.0|
|48.411309|-123.357495|          Low|       0.0|
|48.414061|-123.371208|          Low|       0.0|
|48.416907|-123.352117|          Low|       0.0|
|48.419429|-123.353021|          Low|       0.0|
|48.421199|-123.502767|          Low|       0.0|
|48.423074|-123.302171|          Low|       0.0|
|48.423141|-123.365744|          Low|       0.0|
+---------+-----------+-------------+----------+
only showing top 10 rows



                                                                                

MODEL 2 : MLP

In [37]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Define the neural network structure
input_dim = len(train_df.select("features").first()[0])
layers = [input_dim, 10, 5, 3]  # Adjust as needed: input -> hidden -> hidden -> output (3 classes)

mlp = MultilayerPerceptronClassifier(
    maxIter=100,
    layers=layers,
    blockSize=128,
    labelCol="hotspot_level_idx",
    featuresCol="features",
    seed=1234
)

mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(labelCol="hotspot_level_idx", predictionCol="prediction", metricName="accuracy")
mlp_accuracy = evaluator.evaluate(mlp_predictions)

print(f"MLP Classifier Accuracy: {mlp_accuracy:.4f}")

25/04/01 15:29:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/04/01 15:29:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS

MLP Classifier Accuracy: 0.8828


                                                                                

HEATMAP VISUALIZATION

In [33]:
# Convert predictions to Pandas
heatmap_df = predictions.select("latitude", "longitude", "prediction").toPandas()

# Map predictions back to labels
label_map = {0.0: "Low", 1.0: "Moderate", 2.0: "High"}
heatmap_df["hotspot_label"] = heatmap_df["prediction"].map(label_map)

# Assign weight (optional: to enhance heatmap intensity)
risk_weight = {"Low": 1, "Moderate": 3, "High": 5}
heatmap_df["weight"] = heatmap_df["hotspot_label"].map(risk_weight)

                                                                                

In [34]:
import folium
from folium.plugins import HeatMap

# Initialize map centered around average lat/lon
center_lat = heatmap_df["latitude"].mean()
center_lon = heatmap_df["longitude"].mean()
m = folium.Map(location=[center_lat, center_lon], zoom_start=10)

# Prepare data: [lat, lon, weight]
heat_data = heatmap_df[["latitude", "longitude", "weight"]].values.tolist()

# Add heatmap layer
HeatMap(heat_data, radius=15, blur=10, max_zoom=13).add_to(m)

# Save and display
m.save("hotspot_heatmap.html")
m