In [1]:
# 1. Spark 세션 재생성
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("accident-severity-prediction") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

In [2]:
# 2. accident_df 다시 로드
import os
cwd = os.getcwd()
a_data_path = os.path.join(cwd, 'learning_spark_data', 'accident' , 'Accident_Information.csv')
a_file_path = f"file:///{a_data_path.replace(os.sep, '/')}"
accident_df = spark.read.csv(a_file_path, inferSchema=True, header=True)

In [3]:
# 불러오기
accident_df = spark.read.csv(a_file_path, inferSchema=True, header=True)

# StringIndexer로 fit 및 transform
from pyspark.ml.feature import StringIndexer

severity_indexer = StringIndexer(inputCol="Accident_Severity", outputCol="label")
severity_indexer_model = severity_indexer.fit(accident_df)

print(severity_indexer_model.labels)  # 레이블 순서 확인용

accident_df = severity_indexer_model.transform(accident_df)
accident_df.select("Accident_Severity", "label").show(10)


['Slight', 'Serious', 'Fatal']
+-----------------+-----+
|Accident_Severity|label|
+-----------------+-----+
|          Serious|  1.0|
|           Slight|  0.0|
|           Slight|  0.0|
|           Slight|  0.0|
|           Slight|  0.0|
|           Slight|  0.0|
|           Slight|  0.0|
|           Slight|  0.0|
|           Slight|  0.0|
|           Slight|  0.0|
+-----------------+-----+
only showing top 10 rows



In [4]:
# vehicle 불러오기
v_data_path = os.path.join(cwd, 'learning_spark_data', 'accident' , 'Vehicle_Information.csv')
v_file_path = f"file:///{v_data_path.replace(os.sep, '/')}"

# 2. CSV 파일 읽기
vehicle_df = spark.read.csv(v_file_path, inferSchema=True, header=True)

# 3. 스키마 확인 (선택)
vehicle_df.printSchema()

root
 |-- Accident_Index: string (nullable = true)
 |-- Age_Band_of_Driver: string (nullable = true)
 |-- Age_of_Vehicle: string (nullable = true)
 |-- Driver_Home_Area_Type: string (nullable = true)
 |-- Driver_IMD_Decile: string (nullable = true)
 |-- Engine_Capacity_.CC.: string (nullable = true)
 |-- Hit_Object_in_Carriageway: string (nullable = true)
 |-- Hit_Object_off_Carriageway: string (nullable = true)
 |-- Journey_Purpose_of_Driver: string (nullable = true)
 |-- Junction_Location: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- Propulsion_Code: string (nullable = true)
 |-- Sex_of_Driver: string (nullable = true)
 |-- Skidding_and_Overturning: string (nullable = true)
 |-- Towing_and_Articulation: string (nullable = true)
 |-- Vehicle_Leaving_Carriageway: string (nullable = true)
 |-- Vehicle_Location.Restricted_Lane: string (nullable = true)
 |-- Vehicle_Manoeuvre: string (nullable = true)
 |-- Vehicle_Reference: intege

In [5]:
# vehicle_df rename 
vehicle_df = vehicle_df.withColumnRenamed("Engine_Capacity_.CC.", "Engine_Capacity")

# accident_df와 vehicle_df 조인 하기
df = accident_df.join(vehicle_df, on="Accident_Index", how="inner")

In [6]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "Age_Band_of_Driver",
    when(col("Age_Band_of_Driver") == "Data missing or out of range", "Unknown_Age")
    .otherwise(col("Age_Band_of_Driver"))
).withColumn(
    "Sex_of_Driver",
    when(col("Sex_of_Driver").isin("Not known", "Data missing or out of range"), "Unknown_Sex")
    .otherwise(col("Sex_of_Driver"))
).withColumn(
    "Vehicle_Type",
    when(col("Vehicle_Type").rlike("(?i)(unknown|other|Data missing)"), "Other_Vehicle")
    .otherwise(col("Vehicle_Type"))
).withColumn(
    "Journey_Purpose_of_Driver",
    when(col("Journey_Purpose_of_Driver").isin("Not known", "Other", "Other/Not known (2005-10)", "Data missing or out of range"), "Unknown_Purpose")
    .otherwise(col("Journey_Purpose_of_Driver"))
).withColumn(
    "Road_Type",
    when(col("Road_Type").isin("Unknown", "Data missing or out of range"), "Unknown_Road")
    .otherwise(col("Road_Type"))
).withColumn(
    "Light_Conditions",
    when(col("Light_Conditions").isin("Darkness - lighting unknown", "Data missing or out of range"), "Unknown_Light")
    .otherwise(col("Light_Conditions"))
).withColumn(
    "Urban_or_Rural_Area",
    when(col("Urban_or_Rural_Area") == "Unallocated", "Unknown_Area")
    .otherwise(col("Urban_or_Rural_Area"))
).withColumn(
    "Speed_limit",
    when(col("Speed_limit").cast("string").isin("NA", "0", "10", "15"), "Unknown_Speed")
    .otherwise(col("Speed_limit"))
)


In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

categorical_cols = [
    "Age_Band_of_Driver",
    "Sex_of_Driver",
    "Vehicle_Type",
    "Journey_Purpose_of_Driver",
    "Weather_Conditions",
    "Road_Type",
    "Day_of_Week",
    "Light_Conditions",
    "Urban_or_Rural_Area",
    "Speed_limit"
]

# 인덱서 + 인코더 정의
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c + "_idx", outputCol=c + "_vec") for c in categorical_cols]

# 파이프라인 구성
pipeline = Pipeline(stages=indexers + encoders)
df_encoded = pipeline.fit(df).transform(df)


In [8]:
# Engine_Capacity 처리
from pyspark.sql.functions import mean
mean_val = df_encoded.select(mean("Engine_Capacity")).first()[0]
df_encoded = df_encoded.withColumn("Engine_Capacity", col("Engine_Capacity").cast("double"))
df_encoded = df_encoded.fillna({"Engine_Capacity": mean_val})

# VectorAssembler
from pyspark.ml.feature import VectorAssembler
feature_cols = [c + "_vec" for c in categorical_cols] + ["Engine_Capacity"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
final_df = assembler.transform(df_encoded)

In [9]:
from pyspark.sql.functions import when

# label_binary 생성
binary_df = final_df.withColumn(
    "label_binary",
    when(col("label") == 0.0, 0.0).otherwise(1.0)
)

In [10]:
binary_df.groupBy("label_binary").count().show()

+------------+-------+
|label_binary|  count|
+------------+-------+
|         0.0|1765650|
|         1.0| 292758|
+------------+-------+



In [11]:
# 로지스틱 회귀 모델 학습

from pyspark.ml.classification import LogisticRegression

lr_bin = LogisticRegression(featuresCol="features", labelCol="label_binary", maxIter=10)
lr_bin_model = lr_bin.fit(binary_df)

# 예측
pred_bin = lr_bin_model.transform(binary_df)
pred_bin.select("label_binary", "prediction", "probability").show(5, truncate=False)


+------------+----------+----------------------------------------+
|label_binary|prediction|probability                             |
+------------+----------+----------------------------------------+
|0.0         |0.0       |[0.878541105892094,0.121458894107906]   |
|0.0         |0.0       |[0.748624635479233,0.25137536452076703] |
|0.0         |0.0       |[0.862015857256882,0.137984142743118]   |
|0.0         |0.0       |[0.8906087431469926,0.10939125685300743]|
|0.0         |0.0       |[0.786721726665338,0.21327827333466198] |
+------------+----------+----------------------------------------+
only showing top 5 rows



In [12]:
# Accuracy, F1, AUC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator_auc = BinaryClassificationEvaluator(labelCol="label_binary", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label_binary", predictionCol="prediction", metricName="f1")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label_binary", predictionCol="prediction", metricName="accuracy")
evaluator_pre = MulticlassClassificationEvaluator(labelCol="label_binary", predictionCol="prediction", metricName="weightedPrecision")
evaluator_rec = MulticlassClassificationEvaluator(labelCol="label_binary", predictionCol="prediction", metricName="weightedRecall")

print(f"정확도: {evaluator_acc.evaluate(pred_bin):.4f}")
print(f"F1 Score: {evaluator_f1.evaluate(pred_bin):.4f}")
print(f"정밀도: {evaluator_pre.evaluate(pred_bin):.4f}")
print(f"재현률: {evaluator_rec.evaluate(pred_bin):.4f}")
print(f"AUC: {evaluator_auc.evaluate(pred_bin):.4f}")


정확도: 0.8587
F1 Score: 0.7986
정밀도: 0.8205


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_com

Py4JError: An error occurred while calling o1522.evaluate

In [None]:
# 혼동행렬
pred_bin.groupBy("label_binary", "prediction").count().orderBy("label_binary", "prediction").show()