In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.util import MLUtils
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, concat, to_timestamp, year, month, day, hour, sum, concat_ws, lit, to_date, ceil, dayofweek, when, lag, isnull

# Initialize SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("Motor Vehicle Collisions") \
    .getOrCreate()

# Load Data

In [4]:
# Load data
data = spark.read.csv("Motor_Vehicle_Collisions_-_Crashes.csv", header=True, inferSchema=True)
data = data.select("CRASH DATE","NUMBER OF PERSONS INJURED", "NUMBER OF PERSONS KILLED", "NUMBER OF PEDESTRIANS INJURED", "NUMBER OF PEDESTRIANS KILLED", "NUMBER OF CYCLIST INJURED", "NUMBER OF CYCLIST KILLED", "NUMBER OF MOTORIST INJURED", "NUMBER OF MOTORIST KILLED")
data.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)



# Preprocessing

In [5]:
# Typecast CRASH DATE and CRASH TIME into a single timestamp column
data = data.withColumn("CRASH DATE", to_date(col("CRASH DATE"), "MM/dd/yyyy"))
# Extract year, month, day, and hour from the timestamp column
data = data.withColumn("YEAR", year(col("CRASH DATE")))
data = data.withColumn("MONTH", month(col("CRASH DATE")))
data = data.withColumn("DAY", day(col("CRASH DATE")))
# Calculate the total number of killed and injured persons
data = data.withColumn("TOTAL_KILLED", col("NUMBER OF PERSONS KILLED") + col("NUMBER OF PEDESTRIANS KILLED") + col("NUMBER OF CYCLIST KILLED") + col("NUMBER OF MOTORIST KILLED"))
data = data.withColumn("TOTAL_INJURED", col("NUMBER OF PERSONS INJURED") + col("NUMBER OF PEDESTRIANS INJURED") + col("NUMBER OF CYCLIST INJURED") + col("NUMBER OF MOTORIST INJURED"))
# Melakukan ceil pada output
data = data.withColumn("TOTAL KILLED", ceil(col("TOTAL_KILLED")))
data = data.withColumn("TOTAL INJURED", ceil(col("TOTAL_INJURED")))
data = data.fillna(0, subset=["TOTAL KILLED", "TOTAL INJURED"])
data.show(5)

+----------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+----+-----+---+------------+-------------+------------+-------------+
|CRASH DATE|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|YEAR|MONTH|DAY|TOTAL_KILLED|TOTAL_INJURED|TOTAL KILLED|TOTAL INJURED|
+----------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+----+-----+---+------------+-------------+------------+-------------+
|2021-09-11|                        2|                       0|                            0|                   

In [6]:
data = data.select("YEAR", "MONTH", "DAY", "TOTAL KILLED", "TOTAL INJURED")
data.show()

+----+-----+---+------------+-------------+
|YEAR|MONTH|DAY|TOTAL KILLED|TOTAL INJURED|
+----+-----+---+------------+-------------+
|2021|    9| 11|           0|            4|
|2022|    3| 26|           0|            2|
|2022|    6| 29|           0|            0|
|2021|    9| 11|           0|            0|
|2021|   12| 14|           0|            0|
|2021|    4| 14|           0|            0|
|2021|   12| 14|           0|            0|
|2021|   12| 14|           0|            4|
|2021|   12| 14|           0|            0|
|2021|   12| 14|           0|            0|
|2021|   12| 13|           0|            0|
|2021|   12| 14|           0|            0|
|2021|   12| 14|           0|            0|
|2021|   12| 14|           0|            0|
|2021|   12| 14|           0|            4|
|2021|   12| 14|           0|            0|
|2021|   12| 14|           0|            8|
|2021|   12| 14|           0|            6|
|2021|   12| 11|           0|            2|
|2021|   12| 14|           0|   

# Model Building

In [7]:
data = data.select("YEAR", "MONTH", "DAY", "TOTAL KILLED", "TOTAL INJURED")
data = data.dropna()
data.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- TOTAL KILLED: long (nullable = true)
 |-- TOTAL INJURED: long (nullable = true)



In [8]:
vectorAssembler = VectorAssembler(inputCols=["YEAR", "MONTH", "DAY"], outputCol="features")
data = vectorAssembler.transform(data)

## CrossValidation

In [9]:
#split data
train_data = data.filter(data["YEAR"] < 2020)
test_data = data.filter(data["YEAR"] > 2020)

In [10]:
#Define hyperparameters
gbt = GBTRegressor(featuresCol="features", labelCol="TOTAL KILLED", maxIter=10, maxDepth=10, seed=42, lossType="squared")
paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, [5, 10]).addGrid(gbt.maxBins, [50, 100]).addGrid(gbt.minInstancesPerNode, [10, 50]).build()

In [11]:
#train model
crossval = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol="TOTAL KILLED"), numFolds=3)
cvModel = crossval.fit(train_data)

In [12]:
#evaluate model
predictions = cvModel.transform(test_data)
evaluator = RegressionEvaluator(labelCol="TOTAL KILLED")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 0.108018


In [25]:
# Forecasting
futureData = spark.createDataFrame(
    pd.DataFrame(
        {
            "YEAR": [2025, 2025, 2025, 2025, 2025],
            "MONTH": [1, 3, 5, 7, 9],
            "DAY": [2, 4, 6, 8, 10]
        }
    )
)

vectorAssembler = VectorAssembler(inputCols=["YEAR", "MONTH", "DAY",], outputCol="features")
futureData = vectorAssembler.transform(futureData)

# Pastikan cvModel sudah dilatih sebelumnya dan siap untuk prediksi
try:
    predictions = cvModel.transform(futureData)
    predictedKilled = predictions.select("prediction").toPandas()
    print("Predicted total killed for next 5 years:", predictedKilled.values.tolist())
except Exception as e:
    print("Error during prediction:", e)

Error during prediction: An error occurred while calling o9290.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3842.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3842.0 (TID 15249) (DESKTOP-V4CS9LO.mshome.net executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.r

## HoldOut

In [7]:
# Split data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

# Define hyperparameters
gbt = GBTRegressor(featuresCol="features", labelCol="TOTAL KILLED", maxDepth=5, maxBins=50, minInstancesPerNode=1)

# Train model
model = gbt.fit(train_data)

In [None]:
# Create prediction on test data
prediction = model.transform(test_data)

# Show the prediction results
prediction.show()

+----+-----+---+------------+-------------+------------------+--------------------+
|YEAR|MONTH|DAY|TOTAL KILLED|TOTAL INJURED|          features|          prediction|
+----+-----+---+------------+-------------+------------------+--------------------+
|2012|    9| 25|           0|            0| [2012.0,9.0,25.0]|0.003468517870580...|
|2016|    6|  6|           0|            0|  [2016.0,6.0,6.0]|0.002171788876053...|
|2017|   10| 11|           0|            0|[2017.0,10.0,11.0]|0.002426131389444...|
|2017|   11|  9|           0|            0| [2017.0,11.0,9.0]|0.004320031503587245|
|2017|   11| 10|           0|            0|[2017.0,11.0,10.0]|0.002829747403615473|
|2017|   11| 15|           0|            0|[2017.0,11.0,15.0]|0.002294930407449...|
|2017|   11| 20|           0|            0|[2017.0,11.0,20.0]|0.002094381080981...|
|2017|   11| 20|           0|            0|[2017.0,11.0,20.0]|0.002094381080981...|
|2017|   11| 22|           0|            0|[2017.0,11.0,22.0]|0.002137663845

In [None]:
evaluator = RegressionEvaluator(labelCol="TOTAL KILLED", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): 0.08067391746596747
