In [None]:
''''''
# Pedestrian crash severity analysis using Adaptive Interpretive Modeling (AIM)
# Author:   Amir Rafe (amir.rafe@usu.edu)
# File:     AIM_TabNet.ipynb
# Date:     Spring 2024
# Version:  1.47  
# About:    AIM pipeline to pedestrian crash severity analysis using TabNet method
''''''

In [None]:
## Loading packages

import numpy as np
import pandas as pd
import torch
from imblearn.over_sampling import SMOTE
from pytorch_tabnet.tab_model import TabNetClassifier
import optuna
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.externals.joblib import dump
from sklearn.cluster import KMeans
import shap
import matplotlib.pyplot as plt

## Data Preparation and Preprocessing

In [None]:
# Load the dataset
Ped_df = pd.read_csv('df.csv')

# List of features to use for modeling
features = ['PersonType','Sex','AgeText','Aggressive','AlcoholSuspected','AlcResult',
                       'UrbanRural','FunctionClass','CommercialMotorVehInvolved','Light',
                       'Weather','RoadwaySurf','DisregardTrafficControl','DistractedDriving',
                       'DomesticAnimalRelated','DrowsyDriving','DrugsSuspected','OlderDriverInvolved',
                       'TeenageDriverInvolved','DUI','HeavyTruckInvolved','OverturnRollover','RightTurn',
                       'TransitVehicleInvolved','HolidayCrash','HolidayCrashYN','Intersection',
                       'LeftUTurnInvolved','VerticalAlignment','WorkZoneInvolved','WrongWayDriving']

# Features for confounding factors
confounders = ['AgeText', 'AlcResult', 'Light', 'Weather', 'OlderDriverInvolved', 'TeenageDriverInvolved']

# Target variable
target = 'Severity'

# Convert all columns to numeric, handling errors by coercing to NaNs
Ped_df = Ped_df.apply(pd.to_numeric, errors='coerce')

# Fill NaN values with 0
Ped_df.fillna(0, inplace=True)

# Convert the 'Severity' column to integer type
Ped_df['Severity'] = Ped_df['Severity'].astype(int)

# Feature representing a continuous variable
numerical_features = ['AgeText']

# Determine categorical features by subtracting the set of numerical features from the set of all features
categorical_features = list(set(features) - set(numerical_features))

# Preprocessing pipeline for numerical and categorical data
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_features),
        ('cat', OneHotEncoder(), categorical_features)
    ]
)

# Apply the preprocessing pipeline to the data
X = preprocessor.fit_transform(Ped_df[features])
y = Ped_df[target].values

# Create a new column in 'Ped_df' for spatial clusters
kmeans = KMeans(n_clusters=5, random_state=42).fit(Ped_df[['LAT', 'LNG']])
Ped_df['spatial_cluster'] = kmeans.labels_

# Stratify by 'AgeText' and 'Light' confounders
Ped_df['AgeText_binned'] = pd.qcut(Ped_df['AgeText'], 4, labels=False, duplicates='drop')
Ped_df['composite_key'] = Ped_df['spatial_cluster'].astype(str) + "_" + Ped_df['AgeText_binned'].astype(str) + "_" + Ped_df['Light'].astype(str)
Ped_df['composite_key_encoded'] = pd.factorize(Ped_df['composite_key'])[0]

# Split the dataset 80-20
X = preprocessor.fit_transform(Ped_df[features])
y = Ped_df[target].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=Ped_df['composite_key_encoded'], random_state=42)

# Optuna objective function
def objective(trial):
    # Placeholder for accumulated accuracies across folds
    fold_accuracies = []
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    
    for train_index, test_index in skf.split(X_train, y_train):
        X_train_fold, y_train_fold = X_train[train_index], y_train[train_index]
        X_test_fold, y_test_fold = X_train[test_index], y_train[test_index]
        
        # Apply SMOTE
        smote = SMOTE(random_state=42)
        X_train_fold_smote, y_train_fold_smote = smote.fit_resample(X_train_fold, y_train_fold)
        
        # Suggest hyperparameters within the trial
        n_d = trial.suggest_int('n_d', 16, 64)
        n_a = trial.suggest_int('n_a', 16, 64)
        n_steps = trial.suggest_int('n_steps', 1, 10)
        gamma = trial.suggest_float('gamma', 1.0, 2.0)
        n_independent = trial.suggest_int('n_independent', 1, 10)
        n_shared = trial.suggest_int('n_shared', 1, 10)
        lambda_sparse = trial.suggest_loguniform('lambda_sparse', 1e-5, 1e-1)
        lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)

        # Initialize and train TabNetClassifier with suggested hyperparameters
        clf = TabNetClassifier(
            n_d=n_d, n_a=n_a, n_steps=n_steps,
            gamma=gamma, n_independent=n_independent, n_shared=n_shared,
            lambda_sparse=lambda_sparse,
            optimizer_fn=torch.optim.Adam, optimizer_params=dict(lr=lr),
            scheduler_params={"step_size":10, "gamma":0.9}, scheduler_fn=torch.optim.lr_scheduler.StepLR,
            mask_type='entmax', input_dim=X_train_fold.shape[1]
        )

        clf.fit(X_train_fold_smote, y_train_fold_smote, 
                eval_set=[(X_test_fold, y_test_fold)], 
                eval_name=['valid'], eval_metric=['accuracy'], 
                max_epochs=100, patience=20, batch_size=256, 
                virtual_batch_size=128, num_workers=0, drop_last=False)

        # Predict on the validation set
        y_pred = clf.predict(X_test_fold)
        
        # Calculate and store the accuracy for the current fold
        fold_accuracies.append(accuracy_score(y_test_fold, y_pred))
    
    # The objective is to maximize the average accuracy across all folds
    average_accuracy = np.mean(fold_accuracies)
    return average_accuracy

# Create an Optuna study that seeks to maximize the average accuracy across folds
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

# The best hyperparameters
best_params = study.best_params
print('Best hyperparameters:', best_params)


# TabNet 

In [None]:

def create_train_model(X_train_fold_smote, y_train_fold_smote):
    """
    Create and train a TabNet model.

    Parameters:
    - X_train_fold_smote: Training features
    - y_train_fold_smote: Training labels

    Returns:
    - clf: Trained TabNet classifier
    """
    # Initialize TabNetClassifier with predefined hyperparameters
    clf = TabNetClassifier(
        n_d=53, n_a=58, n_steps=1,
        gamma=1.9526677094303813, n_independent=8, n_shared=6,
        lambda_sparse=0.023989318086231295, momentum=0.3, clip_value=2.,
        optimizer_fn=torch.optim.Adam, optimizer_params=dict(lr=0.007566831910358642),
        scheduler_params={"step_size":10, "gamma":0.9}, scheduler_fn=torch.optim.lr_scheduler.StepLR,
        mask_type='entmax', input_dim=X_train_fold_smote.shape[1]
    )
    
    # Train the model on the provided training set
    clf.fit(
        X_train_fold_smote, y_train_fold_smote,
        eval_set=[(X_train_fold_smote, y_train_fold_smote)],
        eval_name=['train'],
        eval_metric=['accuracy'],
        max_epochs=1000, patience=20,
        batch_size=256, virtual_batch_size=128,
        num_workers=0,
        weights=1,
        drop_last=False
    )
    
    return clf

def predict(model, X_test_fold):
    """
    Make predictions using a trained TabNet model.

    Parameters:
    - model: Trained TabNet classifier
    - X_test_fold: Test features

    Returns:
    - preds: Predictions as integer values
    """
    # Predict using the model and round off to get integer class labels
    preds = model.predict(X_test_fold)
    return np.round(preds).astype(int)

# Initialize variables to keep track of the best model
n_models = 1
best_model = None
best_accuracy = 0

# Train and evaluate n_models TabNet models
for i in range(n_models):
    # Create bootstrap sample to simulate resampling
    sample_idxs = np.random.choice(len(X_train_fold_smote), size=len(X_train_fold_smote), replace=True)
    X_sample, y_sample = X_train_fold_smote[sample_idxs], y_train_fold_smote[sample_idxs]
    
    # Train model on the bootstrap sample
    model = create_train_model(X_sample, y_sample)
    
    # Predict on the test set
    y_pred = predict(model, X_test_fold)
    
    # Calculate accuracy and other performance metrics
    accuracy = accuracy_score(y_test_fold, y_pred)
    cm = confusion_matrix(y_test_fold, y_pred)
    cr = classification_report(y_test_fold, y_pred)
    
    # Display performance metrics
    print(f'Model {i+1}:')
    print('Accuracy:', accuracy)
    print('Confusion matrix:')
    print(cm)
    print('Classification report:')
    print(cr)
    print('----------------------------------')

    # If the current model is the best so far, update best_model and best_accuracy
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_model = model

# Save the best model to a file
dump(best_model, 'best_model.joblib')

# Get feature importances from the best model
feature_importances = best_model.feature_importances_

# Map feature names to their importances
importance_df = pd.DataFrame({
    'Feature': features,  
    'Importance': feature_importances
})

# Save feature importances to a CSV file
importance_df.to_csv('feature_importances.csv', index=False)


# SHAP interpretation 

In [None]:
# Sample a background dataset from features. This subset is used as a reference point to compute SHAP values.
# 150 samples are typically enough to approximate the SHAP values without being too computationally intensive.
background = shap.sample(X_train_fold_smote, 150)

# Initialize a SHAP explainer object. This uses the KernelExplainer, which is model-agnostic and can be used 
# with any machine learning model. The explainer requires a prediction function and a background dataset.
# Here, `best_model.predict_proba` is passed to compute SHAP values for classification models, which outputs
# probabilities for each class. The background dataset helps in comparing the feature's effect when present vs. absent.
explainer = shap.KernelExplainer(best_model.predict_proba, background)

# Compute SHAP values for each feature in dataset X. SHAP values quantify the impact of each feature on the model's prediction.
# This computation is done for all instances in X, allowing to interpret the model's behavior. The `n_jobs=-1` parameter
# enables parallel computation, using all available CPUs to speed up the calculation.
shap_values = explainer.shap_values(background, n_jobs=-1)


In [None]:
#shap summary plot (feature importance)

features2 = ['PersonType','Sex','Age','Aggressive','DrugResult','AlcResult','UrbanRural','FunctionClass','CommercialVehInvolved',
                       'LightingCondition','Weather','RoadwaySurf','DisregardTrafficControl','DistractedDriving','DomesticAnimalRelated',
                       'DrowsyDriving','DrugsSuspected','OlderDriverInvolved','TeenageDriverInvolved','DUI','HeavyTruckInvolved',
                       'OverturnRollover','RightTurnInvolved','TransitVehicleInvolved','Aggressive',
                       'HolidayCrashYN','Intersection','LeftUTurnInvolved','VerticalAlignment','WorkZoneInvolved','WrongWayDriving']

shap.summary_plot(shap_values, X_train, feature_names=features2 , plot_type= 'bar' , plot_size=(12,12) ,  class_names= {
    0: 'Possible injury',
    1: 'Minor injury',
    2: 'No injury/PDO',
    3: 'Serious injury',
    4: 'Fatal'
},show=False)

# Get the current figure and axes objects
fig, ax = plt.gcf(), plt.gca()

# Modifying main plot parameters
ax.tick_params(labelsize=16)
ax.set_xlabel("mean(|SHAP value|) (average impact on model output magnitude)", fontsize=16 ,  labelpad=15)

# Get colorbar
plt.legend(fontsize=14)
plt.tight_layout()
plt.savefig("shap_summary_TabNet.png",dpi=300) 
plt.show()

In [None]:
#shap summary plot (dot plot)

shap.summary_plot(shap_values[0], X_train, feature_names=features2 , plot_type= 'dot' , plot_size=(10,10),show=False)

# Get the current figure and axes objects
fig, ax = plt.gcf(), plt.gca()

# Modifying main plot parameters
ax.tick_params(labelsize=16)
ax.set_xlabel("SHAP value (impact on model output)", fontsize=16  , labelpad=15)

# Get colorbar
cb_ax = fig.axes[1] 
cb_ax.tick_params(labelsize=16)
cb_ax.set_ylabel("Feature value", fontsize=16)
plt.tight_layout()
plt.savefig("shap_summary_TabNet_Possible.png", dpi=300)
plt.show()

# Post-hoc analysis

In [None]:
# PDP
X_df = pd.DataFrame(X_train, columns=features2)
shap.dependence_plot('LightingCondition', shap_values[4], X_df, interaction_index='Age' , dot_size=2, x_jitter=0.5 , show=False)
plt.savefig("PDP_TabNet_Fatal.png",dpi=300) 

In [None]:
# Performance metrics

y_pred = best_model.predict(X_test)

# Compute and print metrics
accuracy = accuracy_score(y_test, y_pred)
cm = confusion_matrix(y_test, y_pred)
cr = classification_report(y_test, y_pred)

print('Accuracy:', accuracy)
print('Confusion matrix:')
print(cm)
print('Classification report:')
print(cr)

In [None]:
# Sensetivity analysis 

# Light' feature index 
light_feature_index = features.index('Light')

# Function to simulate the impact of a policy intervention on the 'Light' feature
def simulate_policy_intervention_and_plot_shap(X_test, shap_values, light_feature_index, severity_classes, intervention_mapping):
    # Make a copy of the original dataset
    X_intervention = np.copy(X_test)
    
    # Apply the policy intervention simulation: change 'Dark-Not lighted' to 'Dark-Lighted'
    # Replace all instances of '1' with '0' in the 'Light' feature
    X_intervention[X_intervention[:, light_feature_index] == intervention_mapping['from']] = intervention_mapping['to']
    
    # Calculate the SHAP values for the dataset with the intervention
    shap_values_intervention = [explainer.shap_values(X_intervention)[i] for i in range(5)]
    
    # Plotting the SHAP values distribution before and after the intervention
    fig, axes = plt.subplots(1, 5, figsize=(20, 4))
    fig.suptitle('SHAP Values Distribution for Policy Intervention on Lighting Condition')

    # Plot SHAP values distribution for each severity class
    for i in range(5):
        axes[i].hist(shap_values[i][:, light_feature_index], bins=20, alpha=0.5, label='Original')
        axes[i].hist(shap_values_intervention[i][:, light_feature_index], bins=20, alpha=0.5, label='Intervention')
        axes[i].set_title(f'Severity {severity_classes[i]}')
        axes[i].set_xlabel('SHAP Value')
        axes[i].set_ylabel('Frequency')
        axes[i].legend()

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()

# Intervention mapping: from 'Dark-Not lighted' to 'Dark-Lighted'
intervention_mapping = {'from': 1, 'to': 0}

# Call the function
severity_classes = ['Possible injury', 'Minor injury', 'No injury', 'Serious injury', 'Fatal']  
simulate_policy_intervention_and_plot_shap(X_test, shap_values, light_feature_index, severity_classes, intervention_mapping)
