<a href="https://colab.research.google.com/github/pulindu-seniya-silva/ASAS_FDM/blob/preprocessing-uvindu/Accident_Severity_Analysis_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Pre-processing

Importing relevent libraries

Importing dataset

Keeping save copies of the dataset

In [1]:
import os, re, json
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from scipy import sparse
import joblib

from sklearn.metrics import (
    classification_report, confusion_matrix, f1_score,
    balanced_accuracy_score, recall_score
)
from sklearn.model_selection import StratifiedKFold, GridSearchCV, RandomizedSearchCV

CSV_PATH = "Road Accident Data.csv"   # drag and drop dataset directly to the files in colab and keep this as it is
OUTDIR   = "artifacts"
TEST_SIZE = 0.20
RANDOM_STATE = 42

os.makedirs(OUTDIR, exist_ok=True)

# load and keep a safe raw copy
df_in  = pd.read_csv(CSV_PATH)
df_raw = df_in.copy(deep=True)   # audit/rollback
df     = df_in.copy(deep=True)   # working copy

print("Loaded:", df.shape)
print("Dataset:")
df.head(5)

def evaluate(y_true, y_pred, title=""):
    print(f"\n=== {title} ===")
    print("Macro F1:       ", round(f1_score(y_true, y_pred, average="macro"), 4))
    print("Recall (macro): ", round(recall_score(y_true, y_pred, average="macro"), 4))
    print("Balanced Acc:   ", round(balanced_accuracy_score(y_true, y_pred), 4))
    print("\nClassification report:\n", classification_report(y_true, y_pred))
    cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_true))
    cm_norm = (cm.T / cm.sum(axis=1, where=cm.sum(axis=1)!=0)).T  # safe normalize
    print("Confusion matrix (row-normalized):\n", np.round(cm_norm, 3))



Loaded: (307973, 21)
Dataset:


drop purely irrelevant columns early

In [2]:
early_drop = [c for c in [
    "Accident_Index",
    "Local_Authority_(District)",
    "Police_Force",
    "Latitude", "Longitude",
    "Vehicle_Type",
    "Carriageway_Hazards"
] if c in df.columns]

df = df.drop(columns=early_drop, errors="ignore")
print("After early drop:", df.shape)
print("Dropped:", early_drop)


After early drop: (307973, 14)
Dropped: ['Accident_Index', 'Local_Authority_(District)', 'Police_Force', 'Latitude', 'Longitude', 'Vehicle_Type', 'Carriageway_Hazards']


Implimenting string hygene utils

In [3]:
def strip_collapse(s):
    if pd.isna(s): return s
    return re.sub(r"\s+", " ", str(s).strip())

def map_values(series, mapping):
    return series.replace(mapping)

def bucket_rare(series, min_count=100, other_label="Other"):
    vc = series.value_counts(dropna=False)
    rare = vc[vc < min_count].index
    return series.where(~series.isin(rare), other_label)

# trim whitespace on all object columns to avoid duplicate labels
for col in df.select_dtypes(include="object").columns:
    df[col] = df[col].apply(strip_collapse)


**Basic cleaning**

- Fix tatget label typos and remove missing values

In [4]:
if "Accident_Severity" in df.columns:
    df["Accident_Severity"] = df["Accident_Severity"].astype("object")
    # from your counts: "Fetal" is a typo
    df["Accident_Severity"] = map_values(df["Accident_Severity"],
                                         {"Fetal": "Fatal", "fetal": "Fatal"})
    before = df.shape[0]
    df = df[~df["Accident_Severity"].isna()].copy()
    print("Dropped rows with missing Accident_Severity:", before - df.shape[0])


Dropped rows with missing Accident_Severity: 0


- parse "accident_date" and recompute "Day_of_Week"

In [5]:
if "Accident Date" in df.columns:
    df["Accident Date"] = pd.to_datetime(df["Accident Date"], errors="coerce", dayfirst=True)
    print("Unparseable dates:", df["Accident Date"].isna().sum())

    # recompute Day_of_Week from parsed date (consistent + fixes 1 NaN)
    df["Day_of_Week"] = df["Accident Date"].dt.day_name()


Unparseable dates: 184996


- normalize "Time" to HH:MM and flag invalid

In [6]:
if "Time" in df.columns:
    def normalize_time(x):
        if pd.isna(x): return np.nan
        s = str(x).strip()
        m = re.match(r"^(\d{1,2}):(\d{2})$", s)
        if not m: return np.nan
        hh, mm = int(m.group(1)), int(m.group(2))
        if 0 <= hh <= 23 and 0 <= mm <= 59:
            return f"{hh:02d}:{mm:02d}"
        return np.nan

    df["Time"] = df["Time"].apply(normalize_time)
    print("Invalid/NaN times:", df["Time"].isna().sum())


Invalid/NaN times: 17


- Harmonize "unknown" values and shorten long labels

In [7]:
# unify explicit "Unknown" labels
if "Junction_Control" in df.columns:
    df["Junction_Control"] = map_values(df["Junction_Control"],
                                        {"Data missing or out of range": "Unknown"})

if "Junction_Detail" in df.columns:
    df["Junction_Detail"] = map_values(df["Junction_Detail"],
                                       {"Not at junction or within 20 metres": "Not at junction"})

# fill NaN -> "Unknown" for key categoricals
for col in ["Light_Conditions","Weather_Conditions","Road_Surface_Conditions",
            "Road_Type","Junction_Control","Junction_Detail",
            "Urban_or_Rural_Area"]:
    if col in df.columns:
        df[col] = df[col].fillna("Unknown")

# Compact some verbose labels (optional but cleaner for OHE and plots)
if "Light_Conditions" in df.columns:
    df["Light_Conditions"] = map_values(df["Light_Conditions"], {
        "Darkness - lights lit": "Dark_lit",
        "Darkness - lights unlit": "Dark_unlit",
        "Darkness - no lighting": "Dark_none",
        "Darkness - lighting unknown": "Dark_unknown",
    })

if "Weather_Conditions" in df.columns:
    df["Weather_Conditions"] = map_values(df["Weather_Conditions"], {
        "Fine + high winds": "Fine_high_winds",
        "Raining + high winds": "Rain_high_winds",
        "Raining no high winds": "Rain_no_high",
        "Snowing + high winds": "Snow_high_winds",
        "Snowing no high winds": "Snow_no_high",
        "Fog or mist": "Fog_mist",
        "Fine no high winds": "Fine_no_high",
    })

if "Road_Surface_Conditions" in df.columns:
    df["Road_Surface_Conditions"] = map_values(df["Road_Surface_Conditions"],
                                               {"Flood over 3cm. deep": "Flood_3cm_plus"})


In [8]:
df.head(5)

Unnamed: 0,Accident Date,Day_of_Week,Junction_Control,Junction_Detail,Accident_Severity,Light_Conditions,Number_of_Casualties,Number_of_Vehicles,Road_Surface_Conditions,Road_Type,Speed_limit,Time,Urban_or_Rural_Area,Weather_Conditions
0,2021-01-01,Friday,Give way or uncontrolled,T or staggered junction,Serious,Daylight,1,2,Dry,One way street,30,15:11,Urban,Fine_no_high
1,2021-01-05,Tuesday,Give way or uncontrolled,Crossroads,Serious,Daylight,11,2,Wet or damp,Single carriageway,30,10:59,Urban,Fine_no_high
2,2021-01-04,Monday,Give way or uncontrolled,T or staggered junction,Slight,Daylight,1,2,Dry,Single carriageway,30,14:19,Urban,Fine_no_high
3,2021-01-05,Tuesday,Auto traffic signal,T or staggered junction,Serious,Daylight,1,2,Frost or ice,Single carriageway,30,08:10,Urban,Other
4,2021-01-06,Wednesday,Auto traffic signal,Crossroads,Serious,Dark_lit,1,2,Dry,Single carriageway,30,17:25,Urban,Fine_no_high


**Feature engineering**

In [9]:
# Time_of_Day from Time
if "Time" in df.columns:
    def time_bucket(t):
        if pd.isna(t): return "Unknown"
        hh = int(t.split(":")[0])
        if 5 <= hh < 11:  return "Morning"
        if 11 <= hh < 16: return "Afternoon"
        if 16 <= hh < 20: return "Evening"
        return "Night"
    df["Time_of_Day"] = df["Time"].apply(time_bucket)

# Month / Season / Day_name from Accident Date
if "Accident Date" in df.columns:
    df["Month"] = df["Accident Date"].dt.month
    def season(m):
        if m in [12,1,2]:  return "Winter"
        if m in [3,4,5]:   return "Spring"
        if m in [6,7,8]:   return "Summer"
        if m in [9,10,11]: return "Autumn"
        return "Unknown"
    df["Season"]   = df["Month"].apply(season)
    df["Day_name"] = df["Accident Date"].dt.day_name()

# High speed flag
if "Speed_limit" in df.columns:
    df["High_Speed"] = (pd.to_numeric(df["Speed_limit"], errors="coerce") >= 60).astype(int)


**Bucketing rare categories**

In [10]:
df.head()

Unnamed: 0,Accident Date,Day_of_Week,Junction_Control,Junction_Detail,Accident_Severity,Light_Conditions,Number_of_Casualties,Number_of_Vehicles,Road_Surface_Conditions,Road_Type,Speed_limit,Time,Urban_or_Rural_Area,Weather_Conditions,Time_of_Day,Month,Season,Day_name,High_Speed
0,2021-01-01,Friday,Give way or uncontrolled,T or staggered junction,Serious,Daylight,1,2,Dry,One way street,30,15:11,Urban,Fine_no_high,Afternoon,1.0,Winter,Friday,0
1,2021-01-05,Tuesday,Give way or uncontrolled,Crossroads,Serious,Daylight,11,2,Wet or damp,Single carriageway,30,10:59,Urban,Fine_no_high,Morning,1.0,Winter,Tuesday,0
2,2021-01-04,Monday,Give way or uncontrolled,T or staggered junction,Slight,Daylight,1,2,Dry,Single carriageway,30,14:19,Urban,Fine_no_high,Afternoon,1.0,Winter,Monday,0
3,2021-01-05,Tuesday,Auto traffic signal,T or staggered junction,Serious,Daylight,1,2,Frost or ice,Single carriageway,30,08:10,Urban,Other,Morning,1.0,Winter,Tuesday,0
4,2021-01-06,Wednesday,Auto traffic signal,Crossroads,Serious,Dark_lit,1,2,Dry,Single carriageway,30,17:25,Urban,Fine_no_high,Evening,1.0,Winter,Wednesday,0


**Enforce numeric dtypes and quick sanity report**

In [11]:
for c in ["Speed_limit","Number_of_Vehicles","Number_of_Casualties","Month"]:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

print("\n=== Post-cleaning snapshot ===")
cols_to_peek = ["Accident_Severity","Day_of_Week","Light_Conditions","Weather_Conditions",
                "Road_Surface_Conditions","Road_Type","Junction_Control","Junction_Detail",
                "Urban_or_Rural_Area"]
for col in cols_to_peek:
    if col in df.columns:
        print(f"{col}: {df[col].nunique()} unique")
        print(df[col].value_counts(dropna=False).head(5).to_dict(), "\n")



=== Post-cleaning snapshot ===
Accident_Severity: 3 unique
{'Slight': 263280, 'Serious': 40740, 'Fatal': 3953} 

Day_of_Week: 7 unique
{nan: 184996, 'Saturday': 20200, 'Friday': 19111, 'Thursday': 18450, 'Wednesday': 18408} 

Light_Conditions: 5 unique
{'Daylight': 227286, 'Dark_lit': 60093, 'Dark_none': 16528, 'Dark_unknown': 2924, 'Dark_unlit': 1142} 

Weather_Conditions: 8 unique
{'Fine_no_high': 244496, 'Rain_no_high': 37841, 'Other': 8802, 'Snow_no_high': 4839, 'Fog_mist': 4783} 

Road_Surface_Conditions: 5 unique
{'Dry': 208967, 'Wet or damp': 81796, 'Frost or ice': 12078, 'Snow': 4758, 'Flood_3cm_plus': 374} 

Road_Type: 5 unique
{'Single carriageway': 230612, 'Dual carriageway': 45467, 'Roundabout': 20929, 'One way street': 6197, 'Slip road': 4768} 

Junction_Control: 7 unique
{'Give way or uncontrolled': 150045, 'Unknown': 98056, 'Auto traffic signal': 32256, 'Not at junction or within 20 metres': 25378, 'Stop sign': 1685} 

Junction_Detail: 9 unique
{'Not at junction': 12309

**Definiing input(x) and target(y) features**

In [12]:
assert "Accident_Severity" in df.columns
y = df["Accident_Severity"].astype("category")

candidate_X = [
    "Day_name","Time_of_Day","Month","Season",
    "Weather_Conditions","Light_Conditions","Road_Surface_Conditions",
    "Road_Type","Junction_Detail","Junction_Control",
    "Urban_or_Rural_Area","Speed_limit","High_Speed",
]
X_cols = [c for c in candidate_X if c in df.columns]
X = df[X_cols].copy()


assert "Number_of_Casualties" not in X.columns
assert "Accident_Severity" not in X.columns

print("X (classification):", X_cols)
print("y distribution:\n", y.value_counts(normalize=True).round(3))


X (classification): ['Day_name', 'Time_of_Day', 'Month', 'Season', 'Weather_Conditions', 'Light_Conditions', 'Road_Surface_Conditions', 'Road_Type', 'Junction_Detail', 'Junction_Control', 'Urban_or_Rural_Area', 'Speed_limit', 'High_Speed']
y distribution:
 Accident_Severity
Slight     0.855
Serious    0.132
Fatal      0.013
Name: proportion, dtype: float64


**Split x and y and stratify bt class**

In [13]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.20, random_state=42, stratify=y
)


**Building preprocessors**

In [14]:
cat_cols = X.select_dtypes(include=["object"]).columns.tolist()
num_cols = [c for c in X_cols if c not in cat_cols]

cat_pipe = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot",  OneHotEncoder(handle_unknown="ignore", sparse_output=True))
])

num_pipe_linear = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler",  StandardScaler())
])

num_pipe_tree = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median"))
])

preproc_linear = ColumnTransformer(
    transformers=[
        ("cat", cat_pipe, cat_cols),
        ("num", num_pipe_linear, num_cols),
    ],
    remainder="drop"
)

preproc_tree = ColumnTransformer(
    transformers=[
        ("cat", cat_pipe, cat_cols),
        ("num", num_pipe_tree, num_cols),
    ],
    remainder="drop"
)

**Fit and transform**

In [15]:
preproc_linear.fit(X_train)
preproc_tree.fit(X_train)

#For linear classification models
Xtr_lin = preproc_linear.transform(X_train)
Xte_lin = preproc_linear.transform(X_test)

#for tree classification models
Xtr_tree = preproc_tree.transform(X_train)
Xte_tree = preproc_tree.transform(X_test)

print("Linear shapes:", Xtr_lin.shape, Xte_lin.shape)
print("Tree shapes:  ", Xtr_tree.shape, Xte_tree.shape)

Linear shapes: (246378, 61) (61595, 61)
Tree shapes:   (246378, 61) (61595, 61)


**Support vector machine**

In [None]:
from sklearn.svm import SVC

# 1) Baseline (RBF)
svm = SVC(
    kernel="rbf",
    class_weight="balanced",
    probability=False,     # needed for predict_proba & thresholding
    random_state=42
)
svm.fit(Xtr_lin, y_train)
pred_svm = svm.predict(Xte_lin)
evaluate(y_test, pred_svm, "SVM (RBF) — Baseline")

# 2) Tuning (small grid; expand if time allows)
param_grid = {
    "C": [0.5, 1, 2, 5],
    "gamma": ["scale", "auto", 0.1, 0.01]
}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
svm_gs = GridSearchCV(
    estimator=SVC(kernel="rbf", class_weight="balanced", probability=True, random_state=42),
    param_grid=param_grid,
    scoring="f1_macro",
    cv=cv,
    n_jobs=-1,
    verbose=1
)
svm_gs.fit(Xtr_lin, y_train)
print("Best SVM params:", svm_gs.best_params_)
print("Best CV macro-F1:", svm_gs.best_score_)
best_svm = svm_gs.best_estimator_

# 3) Final test evaluation
pred_svm_best = best_svm.predict(Xte_lin)
evaluate(y_test, pred_svm_best, "SVM (RBF) — Tuned")


