In [19]:
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
#models
from sklearn.ensemble import RandomForestClassifier
from sklearn import svm
from xgboost import XGBClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neural_network import MLPClassifier

import alibi
print(alibi.__version__)
from alibi.explainers import CEM

0.9.6


In [20]:
df = pd.read_csv('healthcare-dataset-stroke-data.csv', sep=',', header=0)
feature_names = df.columns.tolist()
df = df.dropna(subset=['bmi']).reset_index(drop=True)

X = df.drop(['id','stroke'], axis=1)
y = df['stroke']

X['gender'] = X['gender'].map({'Female': 0, 'Male': 1, 'Other': 2})
X['ever_married'] = X['ever_married'].map({'No': 0, 'Yes': 1})
X['work_type'] = X['work_type'].map({'Private': 0, 'Self-employed': 1, 'Govt_job': 2, 'children': 3, 'Never_worked': 4})
X['Residence_type'] = X['Residence_type'].map({'Urban': 0, 'Rural': 1})
X['smoking_status'] = X['smoking_status'].map({'never smoked': 0, 'formerly smoked': 1, 'smokes': 2, 'Unknown' : 3})

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
numerical_columns = ['age', 'avg_glucose_level', 'bmi']

scaler = StandardScaler()
X_train_scaled = X_train.copy()
X_test_scaled = X_test.copy()

X_train_scaled[numerical_columns] = scaler.fit_transform(X_train[numerical_columns])
X_test_scaled[numerical_columns] = scaler.transform(X_test[numerical_columns])

smote = SMOTE(random_state=42)
X_train_res, y_train_res = smote.fit_resample(X_train_scaled, y_train)
X_test_res, y_test_res = smote.fit_resample(X_test_scaled, y_test)


In [21]:
def get_models():
    return {
        "RandomForest": lambda: RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42),
        "XGBoost": lambda: XGBClassifier(n_estimators=2, max_depth=2, learning_rate=1, objective='binary:logistic'),
        "AdaBoost": lambda: AdaBoostClassifier(n_estimators=100),
        "SVM": lambda: svm.SVC(kernel='linear', probability=True, random_state=42, class_weight='balanced'),
        "MLP": lambda: MLPClassifier(hidden_layer_sizes=(50,), activation='relu', solver='adam', alpha=0.01, learning_rate_init=0.001,
                        max_iter=2000, random_state=42, early_stopping=True)
    }

def apply_cem(instance: np.ndarray, predict_fn, X_train_res) -> np.ndarray:
    cem = CEM(
        predict_fn,
        mode='PN',
        shape=instance.shape,
        kappa=0.2,
        beta=0.01,
        gamma=0.0,
        c_init=10.0,
        c_steps=10,
        max_iterations=1000,
        feature_range=(X_train_res.min(axis=0), X_train_res.max(axis=0)),
        clip=(X_train_res.min(axis=0), X_train_res.max(axis=0)),
        learning_rate_init=1e-2
    )
    cem.fit(instance)
    explanation = cem.explain(instance)

    return explanation

def predict(x: np.ndarray, model) -> np.ndarray:
    if isinstance(x, np.ndarray):
        x = pd.DataFrame(x, columns=X_train_res.columns)
    preds = model.predict_proba(x)
    return np.atleast_2d(preds)

def run_cem():
    for model_name, model in get_models().items():
        model = model()
        model.fit(X_train_res, y_train_res)

        res = []
        test = [10, 20, 30]
        for i in test: # size: 569
            instance = X_test_res.iloc[i].values.reshape(1, -1)
             #instance = X_test_res[i].reshape(1, -1)
            original_pred = predict(instance, model).argmax(axis=1)[0]
            original_prob = predict(instance, model)[0][original_pred]

            cem_explanation = apply_cem(instance, lambda x: predict(x, model), X_train_res)

            counterfactual = cem_explanation.PN if cem_explanation.PN is not None else instance
            success = 1 if cem_explanation.PN is not None else 0

            cf_pred = predict(counterfactual, model).argmax(axis=1)[0]
            cf_prob = predict(counterfactual, model)[0][cf_pred]

            row = {
                "model": model_name,
                "success": success,
                "original_pred": original_pred,
                "original_prob": original_prob,
                "cf_pred": cf_pred,
                "cf_prob": cf_prob,
            }
            diff = (counterfactual - instance)[0]
            for j, name in enumerate(X_train_res.columns):
                row[f"Δ_{name}"] = diff[j]

            res.append(row)

        df = pd.DataFrame(res)
        df.to_csv(f"cem_{model_name}_strokeprediction.csv", index=False)



In [22]:
if __name__ == "__main__":
    run_cem()

No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
No PN found!
