In [None]:
import pandas as pd
import numpy as np
import xgboost as xgb
import matplotlib.pyplot as plt
import seaborn as sns
import joblib

from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import RandomOverSampler

# Load dataset
csv_path = "air_quality_health_impact_data.csv"
df = pd.read_csv(csv_path)

In [None]:
# Prepare features and target
# Drop RecordID and target-related columns
feature_cols = [
    "AQI", "PM10", "PM2_5", "NO2", "SO2", "O3",
    "Temperature", "Humidity", "WindSpeed",
    "RespiratoryCases", "CardiovascularCases", "HospitalAdmissions"
]

X = df[feature_cols]
y = df["HealthImpactClass"].astype(int)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"Training set: {X_train.shape}")
print(f"Test set: {X_test.shape}")
print(f"Class distribution in train:\n{y_train.value_counts().sort_index()}")

In [None]:
def evaluate_model(name, model, X_train, y_train, X_test, y_test):
    model.fit(X_train, y_train)
    pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, pred)
    weighted_f1 = f1_score(y_test, pred, average="weighted")

    print("\n" + "=" * 80)
    print(f"{name}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Weighted-F1: {weighted_f1:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_test, pred, zero_division=0))

    cm = confusion_matrix(y_test, pred)
    disp = ConfusionMatrixDisplay(cm, display_labels=sorted(y.unique()))
    fig, ax = plt.subplots(figsize=(7, 6))
    disp.plot(ax=ax, cmap="Blues", values_format="d", colorbar=False)
    ax.set_title(f"Confusion Matrix - {name}")
    plt.tight_layout()
    plt.show()


print("Train distribution (before):", Counter(y_train))

# 1) RandomOverSampler + XGBoost
ros_pipeline = ImbPipeline(
    steps=[
        ("scaler", StandardScaler()),
        ("ros", RandomOverSampler(random_state=42)),
        (
            "xgb",
            xgb.XGBClassifier(
                n_estimators=300,
                max_depth=6,
                learning_rate=0.05,
                subsample=0.9,
                colsample_bytree=0.9,
                random_state=42,
                eval_metric="mlogloss",
            ),
        ),
    ]
)

evaluate_model("RandomOverSampler + XGBoost", ros_pipeline, X_train, y_train, X_test, y_test)

In [None]:
#Save model
xgb_model = ros_pipeline.named_steps['xgb']
scaler = ros_pipeline.named_steps['scaler']
joblib.dump(ros_pipeline, 'xgb_model.pkl')
joblib.dump(scaler, 'scaler.pkl')


In [None]:
import numpy as np
import shap
explainer = shap.TreeExplainer(xgb_model)

# SHAP Global (All classes)
# In multi-class, SHAP values are class-specific. This loops and plots one class at a time.

raw_shap_values = explainer.shap_values(X_test_scaled)
class_labels = sorted(y.unique())

if not isinstance(raw_shap_values, list):
    raw = np.asarray(raw_shap_values)
    if raw.ndim == 3:
        # Convert 3D -> list of 2D per class
        if raw.shape[2] == len(feature_cols):
            raw_shap_values = [raw[:, i, :] for i in range(raw.shape[1])]
        elif raw.shape[1] == len(feature_cols):
            raw_shap_values = [raw[:, :, i] for i in range(raw.shape[2])]
        else:
            raise ValueError(f"Unexpected SHAP shape: {raw.shape}")
    else:
        raise ValueError("This helper cell is intended for multi-class outputs.")

for class_idx, class_label in enumerate(class_labels):
    shap_vals_2d = raw_shap_values[class_idx]
    base_val = np.asarray(explainer.expected_value)[class_idx]

    exp_k = shap.Explanation(
        values=shap_vals_2d,
        base_values=np.repeat(base_val, shap_vals_2d.shape[0]),
        data=X_test.values,
        feature_names=feature_cols,
    )

    shap.plots.beeswarm(exp_k, max_display=12, show=False)
    plt.gcf().set_size_inches(10, 6)
    plt.title(f"SHAP Beeswarm Plot (Class {class_label})")
    plt.tight_layout()
    plt.show()

    shap.plots.bar(exp_k, max_display=12, show=False)
    plt.gcf().set_size_inches(10, 6)
    plt.title(f"SHAP Bar Plot (Mean) - (Class {class_label})")
    plt.tight_layout()
    plt.show()

In [None]:
# SHAP Local Explanation
# Select a specific instance to explain (e.g., first test instance)
instance_idx = 0
instance = X_test_scaled[instance_idx : instance_idx + 1]
instance_original = X_test.iloc[instance_idx : instance_idx + 1]

# Get prediction
pred_class = xgb_model.predict(instance)[0]
pred_proba = xgb_model.predict_proba(instance)[0]

print(f"Instance {instance_idx}:")
print(f"Predicted Class: {pred_class}")
print(f"Predicted Probabilities: {pred_proba}")

shap_values_instance = explainer.shap_values(instance)

local_exp = shap.Explanation(
    values=shap_values_instance[pred_class][0],
    base_values=explainer.expected_value[pred_class],
    data=instance_original.values[0],
    feature_names=feature_cols,
)

shap.plots.waterfall(local_exp, max_display=len(feature_cols), show=False)
plt.title(f"SHAP Local - Waterfall - Instance {instance_idx} (Class {pred_class})")
plt.tight_layout()
plt.show()


In [None]:
from dice_ml import Data, Model, Dice
from sklearn.pipeline import Pipeline

# Counterfactual Explanation using DiCE
# DiCE needs to work with the same preprocessing (scaling) as the model.
# We create a pipeline that includes the scaler + model.

pipeline = Pipeline([
    ('scaler', scaler),
    ('classifier', xgb_model)
])

# Prepare data for DiCE (use original unscaled training data)
dice_data = Data(
    dataframe=pd.concat([X_train, y_train.rename("HealthImpactClass")], axis=1),
    continuous_features=feature_cols,
    outcome_name="HealthImpactClass"
)

# Wrap the pipeline for DiCE
dice_model = Model(model=pipeline, backend="sklearn", model_type="classifier")

# Create DiCE explainer with genetic method (more robust than random)
dice_exp = Dice(dice_data, dice_model, method="genetic")

# Select an instance to generate counterfactuals
cf_instance_idx = 0
query_instance = X_test.iloc[cf_instance_idx:cf_instance_idx+1]

print(f"Original Instance {cf_instance_idx}:")
print(f"Predicted Class: {xgb_model.predict(X_test_scaled[cf_instance_idx:cf_instance_idx+1])[0]}")
print(f"True Class: {y_test.iloc[cf_instance_idx]}")

# Generate counterfactuals (change to a different class)
desired_class = 1 if y_test.iloc[cf_instance_idx] == 0 else 0

try:
    counterfactuals = dice_exp.generate_counterfactuals(
        query_instance,
        total_CFs=3,
        desired_class=desired_class,
        proximity_weight=0.5,
        diversity_weight=1.0
    )
    
    print(f"\nCounterfactuals to change prediction to class {desired_class}:")
    counterfactuals.visualize_as_dataframe(show_only_changes=True)
except Exception as e:
    print(f"\nCould not generate counterfactuals: {e}")
    print("This may happen if the model is very confident or the desired class is hard to reach.")