In [37]:
# CrashScope Data Preprocessing
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pathlib import Path

# Initialize Spark
spark = SparkSession.builder.appName("CrashScope").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Setup paths
ROOT = Path.cwd()
RAW_DIR = ROOT.parent / "datasets" / "RawData" 
PROCESSED_DIR = ROOT.parent / "datasets" / "ProcessedData"
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

# Load data
df = spark.read.parquet(str(RAW_DIR / "accidents_2022_2024_full.parquet"))
print(f"Loaded {df.count():,} records with {len(df.columns)} columns")

Loaded 382,421 records with 40 columns


In [38]:
# Feature selection and data cleaning
features = {
    'numeric': ['jaar_ongeval', 'aantal_partijen', 'maximum_snelheid', 'lon', 'lat'],
    'categorical': ['verkeersongeval_afloop', 'aard_ongeval', 'bebouwde_kom', 'wegverlichting', 'weersgesteldheid']
}

# Clean missing values
df_clean = df
for col_name in [col_name for col_name, dtype in dict(df.dtypes).items() if dtype == 'string']:
    df_clean = df_clean.withColumn(col_name, 
        when((col(col_name) == "") | (col(col_name) == "Unknown") | 
             (col(col_name) == "NULL") | (col(col_name) == "N/A"), None)
        .otherwise(col(col_name)))

# Select features and remove nulls
all_features = features['numeric'] + features['categorical']
df_processed = df_clean.select(*all_features).dropna()

print(f"Records after preprocessing: {df_processed.count():,}")
print(f"Features: {len(all_features)} ({len(features['numeric'])} numeric + {len(features['categorical'])} categorical)")

Records after preprocessing: 238,525
Features: 10 (5 numeric + 5 categorical)


In [39]:
# Build ML pipeline
stages = []

# Encode categorical features
for col_name in features['categorical']:
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_idx", handleInvalid="skip")
    encoder = OneHotEncoder(inputCol=f"{col_name}_idx", outputCol=f"{col_name}_vec")
    stages.extend([indexer, encoder])

# Create feature vector
feature_cols = features['numeric'] + [f"{col}_vec" for col in features['categorical']]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw", handleInvalid="skip")
stages.append(assembler)

# Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
stages.append(scaler)

# Fit pipeline
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_processed)
df_ml_ready = model.transform(df_processed)

print(f"ML pipeline fitted. Feature vector size: {df_ml_ready.select('features').first()['features'].size}")

ML pipeline fitted. Feature vector size: 33


In [40]:
# Save preprocessed data
train, test = df_ml_ready.randomSplit([0.8, 0.2], seed=42)

# Save ML-ready dataset (this is what you use for ML)
df_ml_ready.select(*all_features, "features").write.mode('overwrite').parquet(str(PROCESSED_DIR / "ml_ready.parquet"))

# Save train/test splits
train.select(*all_features, "features").write.mode('overwrite').parquet(str(PROCESSED_DIR / "train.parquet"))
test.select(*all_features, "features").write.mode('overwrite').parquet(str(PROCESSED_DIR / "test.parquet"))

# Save preprocessing pipeline
model.write().overwrite().save(str(PROCESSED_DIR / "preprocessing_model"))

print("Files saved:")
print(f"- ml_ready.parquet: {df_ml_ready.count():,} records (USE THIS FOR ML)")
print(f"- train.parquet: {train.count():,} records")
print(f"- test.parquet: {test.count():,} records")
print(f"- preprocessing_model: fitted pipeline for new data")

                                                                                

Files saved:
- ml_ready.parquet: 238,525 records (USE THIS FOR ML)


                                                                                

- train.parquet: 190,829 records




- test.parquet: 47,696 records
- preprocessing_model: fitted pipeline for new data


                                                                                