In [None]:
import lime
import re
import shap
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, classification_report
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from lime.lime_tabular import LimeTabularExplainer
import matplotlib.pyplot as plt


In [None]:
cc_data = pd.read_excel('default of credit card clients.xls')

# missing values
cc_data.dropna(inplace=True)
print(cc_data.shape)
cc_data.head(3)


In [None]:
# dataset preprocessing

# dataset and target variable
dc_data = cc_data.drop(columns=['default payment next month'], axis=1)
# 1: yes; 0: no
m_payment = cc_data['default payment next month']

# scale the dataset
data_columns = dc_data.columns
scaler =MinMaxScaler()
dc_data_scaled = scaler.fit_transform(dc_data)
dc_data = pd.DataFrame(dc_data_scaled, columns= data_columns)

# split dataset 70:30
x_train, x_test, y_train, y_test = train_test_split(dc_data, m_payment, test_size=0.3, random_state=45)

In [None]:
# Using SMOTE to balance the dataset because this dataset is imbalanced, making it biased to the majority class.
# WHY? Improves model's fairness, boosts performance and automates variable selection while preserving the structure of the dataset.

from collections import Counter

CANDIDATE_VARS = [
    ("x_train", "y_train"),
    ("x_tr", "y_tr"),
    ("x_train_enc", "y_train_enc"),
    ("x_train_scaled", "y_train"),
    ("x_train_final", "y_train_final"),
]

# locate the valid training variables
x_train_var, y_train_var = None, None
for xv, yv in CANDIDATE_VARS:
    if xv in globals() and yv in globals():
        x_train_var, y_train_var = xv, yv
        break

if x_train_var is None:
    raise NameError("Could not find X_train / y_train. Edit [MOD A] to set your variable names.")

print(f"[MOD A] Using training vars: {x_train_var}, {y_train_var}")
print("[MOD A] Class distribution before:", Counter(globals()[y_train_var]))


# apply the SMOTE technique
try:
    from imblearn.over_sampling import SMOTE
except Exception as e:
    raise ImportError("imblearn (imbalanced-learn) is required for SMOTE.") from e

sm = SMOTE(random_state=42, k_neighbors=5)
X_train_sm, y_train_sm = sm.fit_resample(globals()[x_train_var], globals()[y_train_var])

print("[MOD A] Class distribution after:", Counter(y_train_sm))

# dataset column preservation
import pandas as pd
if hasattr(globals()[x_train_var], "columns"):
    X_train_sm = pd.DataFrame(X_train_sm, columns=globals()[x_train_var].columns)

In [None]:
# Applying class weight to balance the dataset, as it tells the model to pay attention to the minority class during training.
# classification algorithms to train the model for the predicted outcome - SVM, RF, Logistic Regression

class_weights_strategy = "balanced"   # or a dict like {0: 1.0, 1: 2.5}

def make_model(model_type="logreg", **kwargs):
    if model_type == "logreg":
        from sklearn.linear_model import LogisticRegression
        return LogisticRegression(class_weight=class_weights_strategy, max_iter=1000, **kwargs)
    elif model_type == "svm":
        from sklearn.svm import SVC
        return SVC(class_weight=class_weights_strategy, probability=True, **kwargs)
    elif model_type == "rf":
        from sklearn.ensemble import RandomForestClassifier
        return RandomForestClassifier(class_weight=class_weights_strategy, n_estimators=300, random_state=42, **kwargs)
    else:
        raise ValueError("Unsupported model_type. Extend make_model().")

In [None]:
#  Tuning the model through calibration and threshold tuning,
# Calibration ensures the predicted probabilities to reflect the true likelihoods; threshold tuning adjusts the cutoff for converting probabilities into class labe;s

from sklearn.model_selection import StratifiedKFold
from sklearn.calibration import CalibratedClassifierCV

# using the SMOTE and class weights function
if 'base_model' not in globals():
    base_model = make_model("svm", kernel="rbf", C=1.0, gamma="scale")

X_for_fit = X_train_sm if 'X_train_sm' in globals() else globals()[x_train_var]
y_for_fit = y_train_sm if 'y_train_sm' in globals() else globals()[y_train_var]

X_cal_train, X_cal_valid, y_cal_train, y_cal_valid = train_test_split(X_for_fit, y_for_fit, test_size=0.2, random_state=42, stratify=y_for_fit)

base_model.fit(X_cal_train, y_cal_train)


# wrapping the base model in the calibration technique
# isotonic: more flexible but requires more data; OR sigmoid: simpler, works well with less data
# uses cross-validation to carry out calibration on the validation set and avoid overfitting on the dataset.

calibration_method = "isotonic"  # or "sigmoid"
calibrated_model = CalibratedClassifierCV(estimator=base_model, method=calibration_method, cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=42))
calibrated_model.fit(X_cal_valid, y_cal_valid)

# threshold tuning
def tune_threshold(model, X, y):
    proba = model.predict_proba(X)[:, 1]
    thresholds = np.linspace(0.1, 0.9, 81)

    from sklearn.metrics import f1_score
    best_t, best_f1 = 0.5, -1.0

    for t in thresholds:
        yhat = (proba >= t).astype(int)
        f1 = f1_score(y, yhat, zero_division=0)
        if f1 > best_f1:
            best_t, best_f1 = t, f1
    return best_t

THRESHOLD = tune_threshold(calibrated_model, X_cal_valid, y_cal_valid)
print(f"[MOD C] Using decision threshold: {THRESHOLD:.3f}")

In [None]:
# Classification algorithms to train the model for the predicted outcome
# Run to get the accuracy, recall and F1 score - IF NEEDED

# ML model A
svm_model = make_model("svm", kernel="rbf", C=1.0, gamma="scale")
svm_model.fit(X_train_sm, y_train_sm)

# ML model B
rf_model = make_model("rf")
rf_model.fit(X_train_sm, y_train_sm)

# ML model C
logr_model = make_model("logreg")
logr_model.fit(X_train_sm, y_train_sm)
#
#
# predict
rf_pred = rf_model.predict(x_test)
svm_pred = svm_model.predict(x_test)
logr_pred = logr_model.predict(x_test)


# RF
accuracy = accuracy_score(y_test, rf_pred)
print("Random Forest Classifier")
print(classification_report(y_test, rf_pred))
print('\n')

# SVM
accuracy = accuracy_score(y_test,svm_pred)
print("SVM Classifier")
print(classification_report(y_test,svm_pred))
print('\n')

# LogR
accuracy = accuracy_score(y_test,logr_pred)
print("Log R Classifier")
print(classification_report(y_test, logr_pred))
print('\n')


In [None]:
# Feature importance selection
# SHAP: highlight the global importance

# performance boost
background_data = X_train_sm.sample(n=100, random_state=42)
masker = shap.maskers.Independent(background_data)

# # svm model (output is the same when ran for the logistic regression model)
svm_shap_explainer = shap.Explainer(svm_model.predict_proba, masker)
svm_s_values = svm_shap_explainer.shap_values(x_test[:100])

In [None]:
# # global feature importance -
# SHAP
# Mean absolute SHAP value per feature

# svm model
svm_s_values_total = svm_s_values[:, :, 1]
svm_feature_importance = pd.DataFrame({
        'feature': x_test.columns,
        'importance': np.abs(svm_s_values_total).mean(axis=0)
        }).sort_values(by='importance', ascending=False)    # the higher the number, the more influential it is

print('SVM: SHAP\n', svm_feature_importance)

svm_shap_values = svm_feature_importance

In [None]:
# causal graph from selected features
# uses the FCI and DirectLINGAM method
# FCI: constraint-based method that infers causal skeletons from conditional independence tests
# DirectLINGAM: functional method that assumes linear non-guassian relationships to infer causal directions
# both: FCI handles latent cofounders, and LiNGAM gives directionality; improves reliability of causal edges
# output: A directed causal graph that is grounded in both statistical independence and functional modeling, that provides causal validity

import networkx as nx
import matplotlib.pyplot as plt
from causallearn.search.ConstraintBased.FCI import fci
from causallearn.utils.cit import fisherz
from lingam import DirectLiNGAM

# important features
important_features = svm_shap_values.iloc[:, 0].astype(str).tolist()
# identified features from the dataset
f_data = cc_data[important_features]

print("Selected columns:", important_features)
print("f_data shape:", f_data.shape)

# Remove rows with missing values (FCI and Lingam don't handle NaNs well)
f_data = f_data.dropna().reset_index(drop=True)

# convert to numeric datatype
f_data = f_data.apply(lambda col:pd.factorize(col)[0] if col.dtypes == 'object' else col)

# Normalize (required for Lingam to perform well)
norm_f_data = ((f_data - f_data.mean()) / f_data.std())


########################################
# Step 1: Constraint-Based Search (FCI)
########################################
data_d = norm_f_data.to_numpy()
fci_result = fci(data_d, fisherz, alpha=0.01, verbose=False)
fci_graph = fci_result[0]

print("\nFCI Skeleton Edges:")
for edge in fci_graph.get_graph_edges():
    i = edge.node1
    j = edge.node2
    print(f"{i} <-> {j}")
    # print(f"{f_data.columns[i]} <-> {f_data.columns[j]}")

########################################
# Step 2: Functional Causal Discovery (DirectLiNGAM)
########################################
model = DirectLiNGAM()
model.fit(data_d)
lingam_adj = model.adjacency_matrix_

print("\nLiNGAM Causal Edges:")
for i in range(len(lingam_adj)):
    for j in range(len(lingam_adj)):
        if lingam_adj[i, j] != 0:
            print(f"{f_data.columns[i]} --> {f_data.columns[j]}")

########################################
# Step 3: Merge FCI Skeleton with LiNGAM directions
########################################
combined_graph = nx.DiGraph()

# Add nodes
combined_graph.add_nodes_from(f_data.columns)
# map edge_nodes to a feature name
column_map = {f"X{i}": col for i, col in enumerate (f_data.columns)}
# Add FCI undirected edges first
columns = list(f_data.columns)

for edge in fci_graph.get_graph_edges():
    i_name = edge.node1.get_name()
    j_name = edge.node2.get_name()
    if i_name in column_map and j_name in column_map:
        i = column_map[i_name]
        j = column_map[j_name]
        combined_graph.add_edge(i, j, causal_source='fci')

# Add directions from LiNGAM if they match FCI edges
for i in range(len(lingam_adj)):
    for j in range(len(lingam_adj)):
        if lingam_adj[i, j] != 0:
            src, tgt = f_data.columns[i], f_data.columns[j]
            if combined_graph.has_edge(src, tgt):
                # Already exists: keep as directed
                combined_graph[src][tgt]['weight'] = 'lingam'
            elif combined_graph.has_edge(tgt, src):
                # Reverse direction if undirected in FCI
                combined_graph.remove_edge(tgt, src)
                combined_graph.add_edge(src, tgt, causal_source='lingam')
            else:
                combined_graph.add_edge(src, tgt, causal_source='lingam')

# ########################################
# # Step 4: Visualize the Combined Graph
# ########################################
# plt.figure(figsize=(10, 6))
# pos = nx.spring_layout(combined_graph, seed=42, weight = None)
# edge_labels = nx.get_edge_attributes(combined_graph, 'causal_source')
#
# nx.draw(combined_graph, pos, with_labels=True, node_size=2000, node_color='lightblue', font_size=10, arrowsize=20)
# nx.draw_networkx_edge_labels(combined_graph, pos, edge_labels=edge_labels)
# plt.title("Combined Causal Graph (FCI + LiNGAM)")
# plt.show()


In [None]:
# feature extraction - immutable and actionable features

def detect_actionable_immutable(df):
    immutable_f = {col for col in df.columns if any(word in col.lower() for word in ["sex", "age",  "ID", "gender", "race", "marital-status", 'ethnicity', 'birth', 'native-country', 'education'])}
    actionable_f = set(df.columns) - immutable_f - {"class"}
    return sorted(actionable_f), sorted(immutable_f)

actionable_f, immutable_f = detect_actionable_immutable(cc_data)

# # Reserve 1–2 intermediate features for plausibility
# These must have parents in the graph
plausibility_nodes = ['PAY_AMT1', 'BILL_AMT4']  # <- adjust based on your graph

# Remove from actionable/immutable
actionable_f = [f for f in actionable_f if f not in plausibility_nodes]
immutable_f = [f for f in immutable_f if f not in plausibility_nodes]


In [None]:
# SEM to enforce causal validity for the counterfactual generation (validate causal assumption)
# DAG: a graph with directed edges and no cycles, represents causal flow
# SEM: a set of equations that models each variable as a function of its causal parents

# deterministic
def estimate_sems(df, graph):
    sem = {}
    for node in nx.topological_sort(graph):
        parents = list(graph.predecessors(node))
        if not parents:
            sem[node] = ([], 0)
        else:
            X = df[parents]
            y = df[node]
            beta = np.linalg.pinv(X.values) @ y.values
            sem[node] = (parents, beta)
    return sem

# probabilistic
# captures noise and variability, required to calculate the plausibility of the generated counterfactual
def estimate_p_sems(df, graph):
    sem = {}
    for node in nx.topological_sort(graph):
        parents = list(graph.predecessors(node))

        if not parents:
            mu = df[node].mean()
            sigma = df[node].std()
            sem[node] = ([], np.array([mu]), sigma)  # store mean as beta-like vector
        else:
            X = df[parents]
            y = df[node]
            beta = np.linalg.pinv(X.values) @ y.values
            residuals = y - X @ beta
            sigma = np.std(residuals)
            sem[node] = (parents, beta, sigma)
    return sem


# extract a DAG with only LiNGAM-directed edges (to ensure acyclicity)
DAG_graph = nx.DiGraph(
    (u, v, d) for u, v, d in combined_graph.edges(data=True)
    if d.get("causal_source") == "lingam")

 #Optional: Check if DAG is valid
assert nx.is_directed_acyclic_graph(DAG_graph), "Graph must be a DAG for SEM estimation"

# calling the SEM - follows linear_regression
sem_equations = estimate_p_sems(norm_f_data, DAG_graph)

In [None]:
# SEM - conversion to callable functions, so it can be used for the counterfactual generation
# Enables modular, reusable causal modeling to ensure causal dependencies are respected (causal plausibility enforcement)

# a deterministic SEM for the causal plausibility - returns a single value
def make_linear_sem_function(parents, beta):
    def sem_func(X_parent_values):
        return X_parent_values @ beta
    return sem_func

# a probabilistic SEM for the causal plausibility - returns a distribution
def make_probabilistic_sem_function(parents, beta, sigma):
    def sem_func(X_parent_values):
        if not parents:
            mu = beta[0] if isinstance(beta, np.ndarray) else beta
            return np.full((X_parent_values.shape[0], 1), mu), sigma
        else:
            mu = X_parent_values @ beta
            return mu, sigma
    return sem_func

default_sigma = 0.5
sem_functions = {}

for node, values in sem_equations.items():
    if len(values) == 3:
        # Probabilistic SEM with parents
        parents, beta, sigma = values
        sem_func = make_probabilistic_sem_function(parents, beta, sigma)
        sem_functions[node] = (sem_func, parents)
    elif len(values) == 2:
        # Root node: values = ([], mu)
        _, beta = values
        sigma = default_sigma  # assign a default sigma to root node
        sem_func = make_probabilistic_sem_function([], beta, sigma)
        sem_functions[node] = (sem_func, [])
    else:
        raise ValueError(f"Unexpected SEM format for node {node}: {values}")

for node, val in sem_functions.items():
    print(node, type(val), len(val))


In [None]:
# ----------------------------
# Helper functions for mutation
# ----------------------------
# generates candidates for mutation
# maintains structural validity, avoids premature convergence, prevents invalid mutations, and provides efficient mutation of multiple candidates
# encourages exploration and robustness

# perturbs actionable features of a single data point and propagate changes using SEM to maintain causal consistency
# ensures mutation are realistic and causally plausible and used for generating diverse counterfactuals

# balances exploration (diversity) and causal realism
def batch_mutate_diverse(X, actionable_idx, sem_functions, step=0.05,
                         rng=None, mutation_rate=0.1,
                         categorical_idx=None):
    """
    Mutates a batch of solutions X according to SEM functions,
    ensuring diversity in each generation.

    Parameters
    ----------
    X : np.ndarray
        Candidate solutions (n_solutions, n_features)
    actionable_idx : list[int]
        Indices of actionable features.
    sem_functions : dict
        Mapping feature index -> (sem_func, parents).
    step : float
        Magnitude of perturbation for continuous root nodes.
    rng : np.random.Generator
        Random number generator for reproducibility.
    mutation_rate : float
        Probability of mutating each actionable feature.
    categorical_idx : set[int] or None
        Indices of categorical actionable features (optional).
    """
    rng = np.random.default_rng() if rng is None else rng
    categorical_idx = set() if categorical_idx is None else set(categorical_idx)

    X_new = X.copy()

    for i in range(X.shape[0]):
        updated = set()

        # ✅ Guarantee at least one actionable feature will mutate
        forced_mutation = rng.choice(actionable_idx)

        for j, (sem_func, parents) in sem_functions.items():
            if j not in actionable_idx:
                continue  # skip immutable features

            do_mutate = (rng.random() < mutation_rate) or (j == forced_mutation)

            if do_mutate:
                if parents:
                    # Dependent node: recompute from parents + small noise for diversity
                    parent_vals = X_new[i, parents]
                    val = sem_func(parent_vals)
                    if j not in categorical_idx:
                        val += rng.normal(0, step)  # add diversity noise
                    X_new[i, j] = val
                else:
                    # Root node: SEM output + stochastic step
                    val = sem_func([])
                    if j not in categorical_idx:
                        val += rng.normal(0, step)
                    X_new[i, j] = val

                updated.add(j)

            # Propagate if any parent was updated
            elif any(p in updated for p in parents):
                parent_vals = X_new[i, parents]
                val = sem_func(parent_vals)
                if j not in categorical_idx:
                    val += rng.normal(0, step/2)  # small noise even if parents updated
                X_new[i, j] = val
                updated.add(j)

    return X_new


In [None]:
# NSGA-III Counterfactual Generation

import networkx as nx
from pymoo.core.problem import Problem
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting
from typing import Callable, List, Dict

# generates causally, valid, diverse, and efficient counterfactuals.

# ----------------------------
# CounterfactualGeneration Class
# ----------------------------
class CounterfactualGeneration(Problem):
    def __init__(self, model, original_data, desired_class, actionable, immutable, sem_functions, causal_graph, target_column, stochastic=True):
        self.model = model
        self.original_data = original_data.reset_index(drop=True)
        self.desired_class = desired_class
        self.actionable = actionable
        self.immutable = immutable
        self.sem_functions = sem_functions
        self.graph = causal_graph
        self.target_column = target_column
        self.stochastic = stochastic


        self.features = list(original_data.columns)
        if target_column in self.actionable:
            self.actionable.remove(target_column)
        if target_column in self.immutable:
            self.immutable.remove(target_column)

        self.feature_dim = len(self.features)
        self.mutable_indices = [self.features.index(f) for f in actionable if f in self.features]


        # bounds for all features
        original_values = original_data.drop(columns=[target_column]).iloc[0].values
        epsilon = 0.5
        self.min_bounds = original_values - epsilon
        self.max_bounds = original_values + epsilon

        super().__init__(
            n_var=len(original_data.columns) - 1,  # total features excluding target
            n_obj=5,
            n_constr = 2,  # validity, proximity constraint
            xl=self.min_bounds,
            xu=self.max_bounds
                )


    # Integrated _evaluate with constraint
    # The constraint ensures the predicted probability for the desired class is >= 0.5
    def _evaluate(self, X, out, *args, **kwargs):
        x_orig = self.original_data.iloc[0].copy()
        objective_matrix = []
        chain_lengths = []
        epsilon = 1e-8
        pred_probs = []

        # constraint for proximity
        max_proximity = 3.0


        for i, cf_vec in enumerate(X):
            x_cf = x_orig.copy()
            for j, feat_idx in enumerate(self.mutable_indices):
                x_cf[self.features[feat_idx]] = cf_vec[j]

            x_input = x_cf.drop(self.target_column).values.reshape(1, -1)
            pred = self.model.predict(x_input)[0]
            p1 = self.model.predict_proba(x_input)[0, self.desired_class]
            pred_probs.append(p1)

            # Validity
            validity = 0 if pred == self.desired_class else 1

            # Proximity
            prox = np.linalg.norm(x_cf.drop(self.target_column).values - x_orig.drop(self.target_column).values)


            # Effort
            r_chain = self.build_recourse_chain(x_orig.drop(self.target_column).copy(),
                                                x_cf.drop(self.target_column).copy())
            effort = len(r_chain)
            chain_lengths.append(effort)

            # Plausibility
            plausibility = 0.0
            x_cf_raw = x_cf.copy()
            for node in nx.topological_sort(self.graph):
                if node in self.immutable or node in self.actionable:
                    continue
                parents = list(self.graph.predecessors(node))
                if all(p in x_cf_raw for p in parents):
                    inputs = x_cf_raw[parents].values.reshape(1, -1)
                    sem_func, _ = self.sem_functions[node]
                    result = sem_func(inputs)
                    if isinstance(result, tuple) and len(result) == 2:
                        mu, sigma = result
                    else:
                        mu = result
                        sigma = 0.5
                    mu_scalar = float(mu.flatten()[0])
                    sampled_val = np.random.normal(mu_scalar, sigma) if self.stochastic else mu_scalar
                    x_cf[node] = sampled_val
                    nll = 0.5 * np.log(2 * np.pi * sigma ** 2) + ((sampled_val - mu_scalar) ** 2)/(2 * sigma ** 2)
                    plausibility += float(nll.item())

            # Recourse efficiency
            downstream_features = set()
            for node in self.actionable:
                downstream_features.update(nx.descendants(self.graph, node))

            features_without_target = list(x_orig.drop(self.target_column).index)
            impact_features = list(downstream_features.intersection(features_without_target))

            x_orig_model_inputs = x_orig.drop(self.target_column).values
            x_cf_model_inputs = x_cf.drop(self.target_column).values

            impact_indices = [features_without_target.index(f) for f in impact_features]
            causal_impact = np.sum(np.abs(x_cf_model_inputs[impact_indices] - x_orig_model_inputs[impact_indices]))

            # recourse_efficiency = effort / (causal_impact + epsilon)
            recourse_efficiency = (causal_impact + epsilon) / effort


            objective_matrix.append([
                float(validity),
                float(prox),
                float(effort),
                float(plausibility),
                float(recourse_efficiency)
                ])

        out["F"] = np.array(objective_matrix)
        self._last_population = X
        self._last_objectives = np.array(objective_matrix)
        self._last_chains = chain_lengths

        # Constraint:
        # validity - desired class probability >= threshold
        threshold = 0.5
        pred_probs = np.array(pred_probs)
        validity_constraint = np.maximum(0.0, threshold - pred_probs).reshape(-1, 1)
        proximity_vals = np.array([obj[1] for obj in objective_matrix])
        proximity_constraint = np.maximum(0.0, proximity_vals - max_proximity).reshape(-1, 1)

        out["G"] = np.hstack([validity_constraint, proximity_constraint])


    # validates candidates by ensuring full causal consistency
    def _apply_sem(self, x: pd.Series) -> pd.Series:
        # Propagate changes forward in the DAG; ensures that non-actionable, non-immutable features are updated based on their causal parents
        sorted_nodes = list(nx.topological_sort(self.graph))

        for node in sorted_nodes:
            if node in self.immutable or node in self.actionable:
                continue

            parents = list(self.graph.predecessors(node))
            if all(p in x for p in parents):
                inputs = x[parents].values.reshape(1, -1)
                sem_func, _ = self.sem_functions[node]
                result = sem_func(inputs)

                if isinstance(result, tuple) and len(result) == 2:
                    mu, sigma = result
                else:
                    mu = result
                    d_sigma = 0.5
                    sigma = d_sigma
                x[node] = np.random.normal(mu[0], sigma) if self.stochastic else mu[0]

        return x


    # sequential chain using topological order
    def build_recourse_chain(self, x_orig: pd.Series, x_cf: pd.Series):

        changes = []
        x_current = x_orig.copy()

        for node in nx.topological_sort(self.graph):
            if node in self.immutable:
                continue

            original_value = x_orig[node]
            new_value = x_cf[node]

            if not np.isclose(original_value, new_value):
                changes.append({
                    "features": node,
                    "original_value": original_value,
                    "new_value" : new_value,
                   "type": "direct_change" if node in self.actionable else "causal_propagation"
                    })

                # SEM application to propagate descendants
                x_current[node] = new_value
                x_current = self._apply_sem(x_current)

        return changes


    # Pareto filtering and chain output
    # return counterfactuals and recourse chains for pareto-optimal solutions as a list[Dict]
    def extract_pareto_c_chains(self):
        """
        Returns a list of dicts for each Pareto-optimal counterfactual:
            - 'cf': the full counterfactual instance (pd.Series)
            - 'chain': the recourse chain as List[(feature, new_value)]
            - 'objectives': list of objective values [validity, prox, sparse, effort, plausibility, diversity]
        """

        nd_indices = NonDominatedSorting().do(self._last_objectives, only_non_dominated_front=True)
        x_orig = self.original_data.iloc[0].drop(self.target_column).copy()

        pareto_c_chains = []
        for i in nd_indices:
            cf_values = self._last_population[i]
            objective = self._last_objectives[i]

            # reconstruct the full counterfactual
            x_cf = x_orig.copy()
            for j, feat_idx in enumerate(self.mutable_indices):
                x_cf[self.features[feat_idx]] = cf_values[j]
            x_cf = self._apply_sem(x_cf)

            recourse_chain = self.build_recourse_chain(x_orig.copy(), x_cf.copy())
            pareto_c_chains.append({
                "counterfactual": x_cf,
                "recourse_chain": recourse_chain,
                "objectives": objective,
                })

        return pareto_c_chains


In [None]:
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.termination import get_termination
from pymoo.core.callback import Callback
from pymoo.optimize import minimize
from pymoo.operators.crossover.sbx import SBX
from pymoo.core.termination import Termination

# ----------------------------
# Logging callback
# ----------------------------
# tracks and visualize the search for optimal counterfactuals
class LogCallback(Callback):
    def __init__(self):
        super().__init__()
        self.data["gen"] = []
        self.data["n_evals"] = []
        self.data["opt"] = []

    def notify(self, algorithm):
        self.data["gen"].append(algorithm.n_gen)
        self.data["n_evals"].append(algorithm.evaluator.n_eval)
        self.data["opt"].append(algorithm.opt.get("F"))

# ----------------------------
# Safe log odds
# ----------------------------
# transforms model outputs into a space where optimization is more effective or interpretable
def safe_log_odds(prob):
    eps = 1e-9
    prob = np.clip(prob, eps, 1 - eps)
    return np.log(prob / (1 - prob))

# ----------------------------
# Early Termination
# ----------------------------
class CounterfactualTermination(Termination):
    def __init__(self, N=5, validity_index=0):
        """
        Parameters
        ----------
        N : int
            Number of valid counterfactuals to stop after.
        validity_index : int
            Column index in F where validity is stored (default=0).
        """
        super().__init__()
        self.N = N
        self.validity_index = validity_index

    def _update(self, algorithm):
        # Access the objective matrix
        F = algorithm.opt.get("F")

        # Valid CFs = rows where validity == 0
        valid_count = np.sum(F[:, self.validity_index] == 0)

        # Stop when enough valid CFs are found
        return valid_count >= self.N


# ----------------------------
# Population init near boundary
# ----------------------------
def init_population_near_boundary(model, X_pool, original_instance, actionable_features, n_pop=100,
                           eps=1, epsilon=0.05, noise_scale=0.01, rng=None):
    """
    Initialize a population for counterfactual search that is both:
      1. Close to the original instance (low proximity)
      2. Near the decision boundary (predicted probability ≈ 0.5)

    Parameters
    ----------
    model : scikit-learn model
        Trained model with predict_proba.
    X_pool : pd.DataFrame
        Full feature set (to select boundary candidates).
    original_instance : pd.Series
        Original instance for proximity reference.
    actionable_features : list
        Names of actionable features.
    n_pop : int
        Population size.
    eps : float
        Probability window around 0.5 for boundary selection.
    epsilon : float
        Max deviation from original instance for proximity.
    noise_scale : float
        Gaussian noise scale for actionable feature perturbation.
    rng : int or np.random.Generator
        Random seed or generator.

    Returns
    -------
    X_init : np.ndarray
        Initialized population of shape (n_pop, n_features).
    """
    if rng is None:
        rng = np.random.default_rng()
    elif isinstance(rng, int):
        rng = np.random.default_rng(rng)

    # Step 1: Select boundary-near candidates
    X_full = X_pool.copy().values
    p1 = model.predict_proba(X_full)[:, 1]
    mask = (p1 >= 0.5 - eps) & (p1 <= 0.5 + eps)
    candidates = X_full[mask] if np.any(mask) else X_full
    n_candidates = len(candidates)

    # Step 2: Initialize population
    X_init = np.zeros((n_pop, X_full.shape[1]))
    X_init = X_init.astype(np.float32)
    orig_values = original_instance.values

    for i in range(n_pop):
        # Pick a random boundary candidate
        idx = rng.integers(0, n_candidates)
        x = candidates[idx].copy()

        # Perturb actionable features:
        for f in actionable_features:
            col_idx = X_pool.columns.get_loc(f)
            # Stay close to the original (low proximity) + small Gaussian noise
            x[col_idx] = orig_values[col_idx] + rng.uniform(-epsilon, epsilon) \
                         + rng.normal(0, noise_scale)

        X_init[i] = x

    return X_init


# ----------------------------
# Counterfactual generation
# ----------------------------
def generate_counterfactual(input_instance, n_generations=100, n_partitions=5, seed=42, N= 4, multiple_instance_mode = False):
    model = input_instance.model
    x_input = input_instance.original_data.drop(input_instance.target_column, axis=1).values.reshape(1, -1)

    # --- Initial probability & log-odds ---
    if hasattr(model, "predict_proba"):
        proba = model.predict_proba(x_input)[0]
        desired_prob = proba[input_instance.desired_class]
    elif hasattr(model, "decision_function"):
        score = model.decision_function(x_input)
        if score.ndim > 1:
            score = score[:, input_instance.desired_class]
        desired_prob = 1 / (1 + np.exp(-score))
    else:
        raise ValueError("Model does not support probability or decision_function output.")

    print(f"Initial probability for desired class {input_instance.desired_class}: {desired_prob:.4f}")
    log_odds = safe_log_odds(desired_prob)
    print(f"Log-odds for desired class {input_instance.desired_class}: {log_odds:.4f}")

    if abs(log_odds) > 0.2:
        print("⚠ Far from decision boundary — increasing search budget.")
        n_generations = int(n_generations * 1.5)

    # NSGA-III setup
    n_obj = input_instance.n_obj
    ref_dirs = get_reference_directions("das-dennis", n_dim=n_obj, n_partitions=n_partitions)
    pop_size = len(ref_dirs)
    print(f"[INFO] Using {pop_size} reference directions for {n_obj} objectives (n_partitions={n_partitions})")

    # Initial population
    x_orig = input_instance.original_data.drop(columns=[input_instance.target_column]).iloc[0].values
    rng = np.random.default_rng(seed)
    initial_population = np.tile(x_orig, (pop_size, 1)).astype(float)

    for idx in input_instance.mutable_indices:
        initial_population[:, idx] += rng.normal(0, 0.01, size=pop_size)

    actionable_idx = input_instance.mutable_indices

    # NSGA-III algorithm
    algorithm = NSGA3(
        ref_dirs=ref_dirs, pop_size=pop_size, sampling=initial_population,
        crossover=SBX(prob=0.9, eta=15),
        mutation=lambda problem, X, **kwargs: batch_mutate_diverse(
            X,
            actionable_idx=actionable_idx,
            sem_functions=sem_functions,
            step=0.02,
            rng=np.random.default_rng(seed),
            mutation_rate=0.5,
            categorical_idx={0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}
        ),
        eliminate_duplicates=True
    )

    termination = CounterfactualTermination(N=N, validity_index=0)
    callback = LogCallback()

    result = minimize(
        problem=input_instance,
        algorithm=algorithm,
        termination=termination,
        seed=seed,
        callback=callback,
        save_history=False,
        verbose=False
    )
    # ----------------------------
    # Post-hoc filtering
    # ----------------------------
     # --- Extract valid counterfactuals ---
    cf_results = input_instance.extract_pareto_c_chains()

    if len(cf_results) == 0:
        print("⚠ No valid counterfactuals found.")
        return None, None

    # Use dict keys instead of attributes
    pop = np.array([cf['counterfactual'] for cf in cf_results])
    objs = np.array([cf['objectives'] for cf in cf_results])

    # Keep only valid counterfactuals (validity = 0 at index 0)
    valid_mask = objs[:, 0] == 0
    pop = pop[valid_mask]
    objs = objs[valid_mask]

    if len(pop) == 0:
        print("⚠ No valid counterfactuals after filtering.")
        return None, None

    # Apply N-constraint and multiple-instance mode
    cap = int(N * 2) if multiple_instance_mode else N
    if len(pop) > cap:
        # Sort by proximity (second objective)
        sorted_idx = np.argsort(objs[:, 1])
        pop = pop[sorted_idx][:cap]
        objs = objs[sorted_idx][:cap]

    print(f"Returning {len(pop)} valid counterfactual(s) from extract_pareto_c_chains.")
    return pop, objs



In [None]:
# prep the data for counterfactual generation
x_test['default payment next month'] = y_test.values
print(len(x_test))

# x_test.head(5)

# single instance
cf_data_instance = x_test[x_test['default payment next month'] == 0].iloc[[4]].copy()
#multiple instance
cf_data_instances = x_test[x_test['default payment next month'] == 0].iloc[:200].copy()


In [None]:
# Counterfactual Generation
# warning occurs because the prediction is made without the column name, while it was trained with the column name
import warnings
warnings.filterwarnings("ignore", message="X does not have valid feature names")

input_instance = CounterfactualGeneration(model = rf_model,
                                          original_data= cf_data_instance,
                                          desired_class = 1,
                                          actionable = actionable_f,
                                          immutable = immutable_f,
                                          sem_functions = sem_functions,
                                          causal_graph = DAG_graph,
                                          target_column = "default payment next month",
                                          stochastic = True )

import time
start_time = time.time()
cf_result = generate_counterfactual(input_instance)
end_time = time.time()
print(f"Time taken to generate counterfactuals: {end_time - start_time}")

cf_results = input_instance.extract_pareto_c_chains()

In [None]:
for idx, cf_entry in enumerate(cf_results, start=1):
    print(f"=== Counterfactual {idx} ===")

    # Print counterfactual values
    cf_vector = cf_entry.get("counterfactual", None)
    if cf_vector is not None:
        print("Counterfactual vector:")
        print(cf_vector.to_string(float_format=lambda x: f"{x:.6f}"))
    else:
        print("No counterfactual data.")

    print("\nRecourse Chain:")
    recourse_chain = cf_entry.get("recourse_chain", None)
    if not recourse_chain:
        print("  None")
    else:
        for step in recourse_chain:
            feature = step.get('features')
            old_val = step.get('original_value')
            new_val = step.get('new_value')
            change_type = step.get('type')
            print(f"  - {feature}: {old_val:.6f} -> {new_val:.6f} ({change_type})")

    print("\nObjectives:", cf_entry.get("objectives", None))
    print()

In [None]:
# best counterfactual with validity as zero (successful counterfactuals)

def is_pareto_efficient(costs):
    """
    Find the Pareto-efficient points.
    Returns a boolean array indicating whether each point is Pareto efficient.
    """
    is_efficient = np.ones(costs.shape[0], dtype=bool)
    for i, c in enumerate(costs):
        if is_efficient[i]:
            is_efficient[is_efficient] = (
                np.any(costs[is_efficient] < c, axis=1) |
                np.all(costs[is_efficient] == c, axis=1)
            )
            is_efficient[i] = True
    return is_efficient

# Extract all objective vectors from cf_chains
objectives_array = np.array([cf['objectives'] for cf in cf_results])

# Find Pareto-efficient CFs
pareto_mask = is_pareto_efficient(objectives_array)

# Filter further for validity == 1
# Assuming 'validity' is the first objective in the list
validity_mask = objectives_array[:, 0] == 0

# Combined filter
final_mask = pareto_mask & validity_mask

print(f"\nPrinting only Pareto-efficient counterfactuals with validity = 0 ({final_mask.sum()} found):\n")

for idx, (cf_entry, is_best) in enumerate(zip(cf_results, final_mask), start=1):
    if not is_best:
        continue

    print(f"=== Counterfactual {idx} ===")

    # Counterfactual vector
    cf_vector = cf_entry.get("counterfactual", None)
    if cf_vector is not None:
        print("Counterfactual vector:")
        if hasattr(cf_vector, "to_string"):
            print(cf_vector.to_string(float_format=lambda x: f"{x:.6f}"))
        else:
            print(cf_vector)

    print("\nRecourse Chain:")
    recourse_chain = cf_entry.get("recourse_chain", None)
    if not recourse_chain:
        print("  None")
    else:
        for step in recourse_chain:
            if isinstance(step, dict):
                feature = step.get('features')
                old_val = step.get('original_value')
                new_val = step.get('new_value')
                change_type = step.get('type', '')
                print(f"  - {feature}: {old_val:.6f} -> {new_val:.6f} {change_type}")
            else:
                print(f"  Unexpected step format: {step}")

    print("\nObjectives:", cf_entry.get("objectives", None))
    print("-" * 50)


In [None]:
# Decode and inverse-transform generated counterfactuals
decoded_cf_list = []

target_column = "risk"
desired_class = 1

for cf_entry, is_best in zip(cf_results, final_mask):
    if not is_best:
        continue

    cf_vector = cf_entry.get("counterfactual", None)
    if cf_vector is None:
        continue

    data_columns = dc_data.columns

    # Convert to array
    cf_data_array = np.array(cf_vector).reshape(1, -1)

    # Inverse transform to original scale
    cf_data_unscaled = scaler.inverse_transform(cf_data_array)

    # Convert to DataFrame
    counterfactual_df = pd.DataFrame(cf_data_unscaled, columns=data_columns)

    decoded_cf_list.append(counterfactual_df)

# Combine all into one DataFrame
decoded_cf_df = pd.concat(decoded_cf_list, ignore_index=True)

In [None]:
decoded_cf_df

In [None]:
# decode the original instance
cf_data_instance = x_test[x_test['default payment next month'] == 0].iloc[[4]].copy()
original_data= cf_data_instance

# Drop the target column before inverse transform
features_only = cf_data_instance.drop(columns=['default payment next month'])

# Inverse transform
original_data_unscaled = scaler.inverse_transform(features_only)

# Convert back to DataFrame for readability
original_decoded = pd.DataFrame(original_data_unscaled, columns=features_only.columns)

In [None]:
original_decoded


In [None]:
# visible highlight the changes

def highlight_changes(val):
    if isinstance(val, str) and "→" in val:
        return "background-color: #d4edda; color: green; font-weight: bold;"  # light green
    return ""

counterfactual_gen = decoded_cf_df
# Repeat the original row 22 times to match CF rows
original_repeated = pd.concat([original_decoded]*counterfactual_gen.shape[0], ignore_index=True)

# Convert both DataFrames to string
original_str = original_repeated.astype(str)
cf_str = decoded_cf_df.astype(str)

# Align columns just in case
original_str, cf_str = original_str.align(cf_str, join='inner', axis=1)

# Create comparison DataFrame to show old → new values where different
comparison_df = original_str.copy()

for col in comparison_df.columns:
    diff_mask = original_str[col] != cf_str[col]
    comparison_df.loc[diff_mask, col] = original_str.loc[diff_mask, col] + " → " + cf_str.loc[diff_mask, col]

# Now apply highlight with Styler
styled_df = comparison_df.style.applymap(highlight_changes)

styled_df  # In Jupyter this will display the styled DataFrame


In [None]:
# Objectives

objectives_list = []

for cf_entry, is_best in zip(cf_results, final_mask):
    if not is_best:
        continue

    objectives = cf_entry.get("objectives", None)
    if objectives is None:
        continue

    obj_names = ['validity', 'proximity', 'effort', 'plausibility', 'recourse_efficiency']
    obj_dict = {name: float(obj) for name, obj in zip(obj_names, objectives)}
    objectives_list.append(obj_dict)

# Convert list of dicts to DataFrame
objectives_df = pd.DataFrame(objectives_list)
objectives_df
