In [None]:
import logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
        )
logger = logging.getLogger("CFE-Pipeline")

In [None]:
# MODULE ONE - DATA CONTEXT MANAGER

In [None]:
import shap
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

In [None]:
# german data
column_names = ['age', 'sex', 'job', 'housing', 'saving account', 'checking account',
                'credit amount', 'duration', 'purpose', 'risk']

german_data = pd.read_csv('german_credit_data.csv', header = None, names= column_names, na_values='?', skipinitialspace=True)
german_data.dropna(inplace=True)
print("Data has been cleaned. All rows with '?' have been removed.")
len(german_data)

In [None]:
german_data.head(10)

In [None]:
# dataset preprocessing

# dataset and target variable
g_data = german_data.drop(columns=['risk'], axis=1)
target = german_data['risk'].apply(lambda x: 1 if x == 'good' else 0)

# duplicate dataframe
gd_data = g_data.copy()

# encode categorical features
num_cols = ['age', 'job', 'credit amount', 'duration']
# to keep the order
ordinal_cols = ['saving account', 'checking account']
l_encoder_cols = ['sex', 'housing', 'purpose']

 #  label encoder
label_encoders = {}

for c in l_encoder_cols:
    le = LabelEncoder()
    gd_data[c] = le.fit_transform(gd_data[c])
    label_encoders[c] = le # Store the entire fitted encoder object

# ordinal encoder
# Performs ordinal encoding on multiple DataFrame columns.
def ordinal_encode(data_f, columns, category_orders):
    """
    Args:
        df (pd.DataFrame): The DataFrame to modify.
        columns (list): A list of column names to encode.
        category_orders (list of lists): order for each column.
    Returns:
        pd.DataFrame: The DataFrame with the ordinal encoded columns.
    """
    # Create a copy to avoid modifying the original DataFrame
    data_f_encoded = data_f.copy()

    # Check that the number of columns matches the number of order lists
    if len(columns) != len(category_orders):
        raise ValueError("The number of columns must match the number of category order lists.")

    # Iterate over the columns and their corresponding orders
    for i, column in enumerate(columns):
        order = category_orders[i]
        mapping = {category: j for j, category in enumerate(order)}
        data_f_encoded[column] = data_f_encoded[column].map(mapping)

    return data_f_encoded

# ordering based on the amount of money in the bank as stated from the dataset
s_account_order = ['little', 'moderate', 'rich', 'quite rich']
# ordering based on qty of money
c_account_order = ['little', 'moderate', 'rich']

order_list = [s_account_order, c_account_order]

# data for the model
g_features = ordinal_encode(gd_data, ordinal_cols, order_list)

# split dataset
x_train, x_test, y_train, y_test = train_test_split(g_features, target, test_size=0.3, random_state=45)

In [None]:
# scale numerical data features
scaler = StandardScaler().fit(x_train[num_cols])

x_train_scaled = scaler.transform(x_train[num_cols])
x_test_scaled = scaler.transform(x_test[num_cols])

# Combine scaled numerical columns with the already-encoded categorical columns
x_train_transformed = np.hstack((x_train_scaled, x_train[ordinal_cols].values, x_train[l_encoder_cols].values))
x_test_transformed = np.hstack((x_test_scaled, x_test[ordinal_cols].values, x_test[l_encoder_cols].values))

In [None]:
# decode the encoded values
def manual_inverse_transform(transformed_data, scaler, label_encs,
                             ord_cols, ord_orders, num_cols, l_enc_cols):

    # Preserve index to avoid row-splitting
    idx = transformed_data.index

    num_col_count = len(num_cols)
    ord_col_count = len(ord_cols)

    data_num_scaled = transformed_data.iloc[:, :num_col_count]
    data_ord_encoded = transformed_data.iloc[:, num_col_count : num_col_count + ord_col_count]
    data_nom_encoded = transformed_data.iloc[:, num_col_count + ord_col_count :]

    # Inverse numerical features
    inv_num = scaler.inverse_transform(data_num_scaled)
    df_num = pd.DataFrame(inv_num, columns=num_cols, index=idx)

    # Inverse ordinal features
    df_ord = pd.DataFrame(data_ord_encoded, columns=ord_cols, index=idx)
    for i, col in enumerate(ord_cols):
        inv_map = {j: category for j, category in enumerate(ord_orders[i])}
        df_ord[col] = (
            df_ord[col]
            .round()
            .astype(int)
            .map(inv_map)
        )

    # Inverse nominal (label-encoded) features
    df_nom = pd.DataFrame(index=idx)
    for col in l_enc_cols:
        raw_vals = data_nom_encoded[col].round().astype(int)

        classes = label_encs[col].classes_
        clipped_vals = raw_vals.clip(0, len(classes) - 1)

        df_nom[col] = label_encs[col].inverse_transform(clipped_vals)

    # Concatenate safely (same index everywhere)
    full_df = pd.concat([df_num, df_ord, df_nom], axis=1)

    # Reorder columns to original layout
    return full_df[g_data.columns]


In [None]:
# COMBINATION OF SMOTE AND CLASS WEIGHTS TRAINING
# SMOTE balances the training data by generating synthetic samples of the minority class; Class weights reinforce the importance of correctly classifying the examples..
# Tackles imbalance both at the data level and within the model's learning process, to lead to better model performance.

In [None]:
# classification algorithms to train the model for the predicted outcome

# ML model A
rf_model = RandomForestClassifier(random_state=42).fit(x_train, y_train)
# ML model B
svm_model = SVC(kernel='rbf', gamma=0.15, C=100, probability=True).fit(x_train, y_train)
# ML model C - change to NN?
log_r_model = LogisticRegression(max_iter=1000, solver='lbfgs').fit(x_train, y_train)

In [None]:
# # predict
rf_pred = rf_model.predict(x_test)
rf_accuracy = accuracy_score(y_test, rf_pred)

svm_pred = svm_model.predict(x_test)
svm_accuracy = accuracy_score(y_test, svm_pred)

logr_pred = log_r_model.predict(x_test)
logr_accuracy = accuracy_score(y_test, logr_pred)

# print scores
print(f" RF model accuracy: {rf_accuracy:.3f}")
print(f" SVM model accuracy: {svm_accuracy:.3f}")
print(f" LogR model accuracy: {logr_accuracy:.3f}")

In [None]:
# feature importance selection
# svm model
svm_shap_explainer = shap.KernelExplainer(svm_model.predict_proba, x_train)
svm_s_values = svm_shap_explainer.shap_values(x_test)


In [None]:
# # global feature importance -
# SHAP
# svm model
svm_s_values_total = svm_s_values[:, :, 1]
svm_feature_importance = pd.DataFrame({
        'feature': x_test.columns,
        'importance': np.abs(svm_s_values_total).mean(axis=0)
        }).sort_values(by='importance', ascending=False)

print('SVM: SHAP\n', svm_feature_importance)

# aggregate feature importance
svm_shap_values = svm_feature_importance


In [None]:
# causal graph from selected features
# uses the FCI and DirectLINGAM method
# FCI: constraint-based method that infers causal skeletons from conditional independence tests
# DirectLINGAM: functional method that assumes linear non-guassian relationships to infer causal directions
# both: FCI handles latent cofounders, and LiNGAM gives directionality; improves reliability of causal edges
# output: A directed causal graph that is grounded in both statistical independence and functional modeling, that provides causal validity

import networkx as nx
import matplotlib.pyplot as plt
from causallearn.search.ConstraintBased.FCI import fci
from causallearn.utils.cit import fisherz
from lingam import DirectLiNGAM
import pandas as pd

# Number of features to select, or None to use all
num_features = 10

# important features - select top `num_features` from l_values
if num_features is not None:
    important_features = svm_shap_values.iloc[:, 0].astype(str).tolist()[:num_features]
else:
    important_features = svm_shap_values.iloc[:, 0].astype(str).tolist()

# match features
def match_feature(feat, columns):
    # Return the first column name containing feat as substring, else None
    for col in columns:
        if feat in col:
            return col
    return None

important_features_mapped = []
for feat in important_features:
    matched_col = match_feature(feat, german_data.columns)
    if matched_col:
        important_features_mapped.append(matched_col)
    else:
        print(f"Warning: No match found for {feat}")

important_features = important_features_mapped

# identified features from the dataset
f_data = german_data[important_features]
print("Selected columns:", important_features)
print("f_data shape:", f_data.shape)

# Remove rows with missing values (FCI and Lingam don't handle NaNs well)
f_data = f_data.dropna().reset_index(drop=True)

# convert to numeric datatype
f_data = f_data.apply(lambda col: pd.factorize(col)[0] if col.dtypes == 'object' else col)

# Normalize (required for Lingam to perform well)
norm_f_data = (f_data - f_data.mean()) / f_data.std()

########################################
# Step 1: Constraint-Based Search (FCI)
########################################
data_d = norm_f_data.to_numpy()
fci_result = fci(data_d, fisherz, alpha=0.01, verbose=False)
fci_graph = fci_result[0]

print("\nFCI Skeleton Edges:")
for edge in fci_graph.get_graph_edges():
    i = edge.node1
    j = edge.node2
    print(f"{i} <-> {j}")

########################################
# Step 2: Functional Causal Discovery (DirectLiNGAM)
########################################
model = DirectLiNGAM()
model.fit(data_d)
lingam_adj = model.adjacency_matrix_

print("\nLiNGAM Causal Edges:")
for i in range(len(lingam_adj)):
    for j in range(len(lingam_adj)):
        if lingam_adj[i, j] != 0:
            print(f"{f_data.columns[i]} --> {f_data.columns[j]}")

########################################
# Step 3: Merge FCI Skeleton with LiNGAM directions
########################################
combined_graph = nx.DiGraph()

# Add nodes
combined_graph.add_nodes_from(f_data.columns)
# map edge_nodes to a feature name
column_map = {f"X{i}": col for i, col in enumerate(f_data.columns)}

# Add FCI undirected edges first
columns = list(f_data.columns)

for edge in fci_graph.get_graph_edges():
    i_name = edge.node1.get_name()
    j_name = edge.node2.get_name()
    if i_name in column_map and j_name in column_map:
        i = column_map[i_name]
        j = column_map[j_name]
        combined_graph.add_edge(i, j, causal_source='fci')

# Add directions from LiNGAM if they match FCI edges
for i in range(len(lingam_adj)):
    for j in range(len(lingam_adj)):
        if lingam_adj[i, j] != 0:
            src, tgt = f_data.columns[i], f_data.columns[j]
            if combined_graph.has_edge(src, tgt):
                # Already exists: keep as directed
                combined_graph[src][tgt]['weight'] = 'lingam'
            elif combined_graph.has_edge(tgt, src):
                # Reverse direction if undirected in FCI
                combined_graph.remove_edge(tgt, src)
                combined_graph.add_edge(src, tgt, causal_source='lingam')
            else:
                combined_graph.add_edge(src, tgt, causal_source='lingam')


plt.figure(figsize=(10, 6))
pos = nx.spring_layout(combined_graph, seed=42, weight=None)
edge_labels = nx.get_edge_attributes(combined_graph, 'causal_source')

# nx.draw(combined_graph, pos, with_labels=True, node_size=2000, node_color='lightblue', font_size=10, arrowsize=20)
# nx.draw_networkx_edge_labels(combined_graph, pos, edge_labels=edge_labels)
# plt.title("Combined Causal Graph (FCI + LiNGAM)")
# plt.show()


In [None]:
# feature extraction - immutable and actionable features
def detect_actionable_immutable(df):
    immutable_f = {col for col in df.columns if any(word in col.lower() for word in ["sex", "age",  "gender", "race", "marital-status", 'ethnicity', 'birth', 'native-country', 'education'])}
    actionable_f = set(df.columns) - immutable_f - {"class"}
    return sorted(actionable_f), sorted(immutable_f)

actionable_f, immutable_f = detect_actionable_immutable(german_data)

# # Reserve 1â€“2 intermediate features for plausibility
# These must have parents in the graph
plausibility_nodes = ['purpose', 'job']

# Remove from actionable/immutable
actionable_f = [f for f in actionable_f if f not in plausibility_nodes]
immutable_f = [f for f in immutable_f if f not in plausibility_nodes]


In [None]:
# SEM to enforce causal validity for the counterfactual generation (validate causal assumption)
# DAG: a graph with directed edges and no cycles, represents causal flow
# SEM: a set of equations that models each variable as a function of its causal parents

# deterministic
def estimate_sems(df, graph):
    sem = {}
    for node in nx.topological_sort(graph):
        parents = list(graph.predecessors(node))
        if not parents:
            sem[node] = ([], 0)
        else:
            X = df[parents]
            y = df[node]
            beta = np.linalg.pinv(X.values) @ y.values
            sem[node] = (parents, beta)
    return sem

# probabilistic
# captures noise and variability, required to calculate the plausibility of the generated counterfactual
def estimate_p_sems(df, graph):
    sem = {}
    for node in nx.topological_sort(graph):
        parents = list(graph.predecessors(node))

        if not parents:
            mu = df[node].mean()
            sigma = df[node].std()
            sem[node] = ([], np.array([mu]), sigma)  # store mean as beta-like vector
        else:
            X = df[parents]
            y = df[node]
            beta = np.linalg.pinv(X.values) @ y.values
            residuals = y - X @ beta
            sigma = np.std(residuals)
            sem[node] = (parents, beta, sigma)
    return sem

# extract a DAG with only LiNGAM-directed edges (to ensure acyclicity)
DAG_graph = nx.DiGraph(
    (u, v, d) for u, v, d in combined_graph.edges(data=True)
    if d.get("causal_source") == "lingam")

 #Optional: Check if DAG is valid
assert nx.is_directed_acyclic_graph(DAG_graph), "Graph must be a DAG for SEM estimation"

# calling the SEM - follows linear_regression
sem_equations = estimate_p_sems(norm_f_data, DAG_graph)

In [None]:
# SEM - conversion to callable functions, so it can be used for the counterfactual generation
# Enables modular, reusable causal modeling to ensure causal dependencies are respected (causal plausibility enforcement)

# a deterministic SEM for the causal plausibility - returns a single value
def make_linear_sem_function(parents, beta):
    def sem_func(X_parent_values):
        return X_parent_values @ beta
    return sem_func

# a probabilistic SEM for the causal plausibility - returns a distribution
def make_probabilistic_sem_function(parents, beta, sigma):
    def sem_func(X_parent_values):
        if not parents:
            mu = beta[0] if isinstance(beta, np.ndarray) else beta
            return np.full((X_parent_values.shape[0], 1), mu), sigma
        else:
            mu = X_parent_values @ beta
            return mu, sigma
    return sem_func

default_sigma = 0.5
sem_functions = {}

for node, values in sem_equations.items():
    if len(values) == 3:
        # Probabilistic SEM with parents
        parents, beta, sigma = values
        sem_func = make_probabilistic_sem_function(parents, beta, sigma)
        sem_functions[node] = (sem_func, parents)
    elif len(values) == 2:
        # Root node: values = ([], mu)
        _, beta = values
        sigma = default_sigma  # assign a default sigma to root node
        sem_func = make_probabilistic_sem_function([], beta, sigma)
        sem_functions[node] = (sem_func, [])
    else:
        raise ValueError(f"Unexpected SEM format for node {node}: {values}")

# debugging
for node, val in sem_functions.items():
    print(node, type(val), len(val))  # should be tuple, len == 2


In [None]:
# MODULE TWO: PREFERENCE ELICITATION MODULE

In [None]:
from dataclasses import dataclass

# class definition
@dataclass

class DataContext:
    def __init__( self, x: np.ndarray, x_df: pd.DataFrame, feature_names: list,
                  actionable_features: list, immutable_features: list,
                  feature_ranges: dict, causal_graph: dict, model: object,
                  num_cols, ord_cols, ord_orders, l_enc_cols):
        self.x = x
        self.x_df = x_df
        self.feature_names = feature_names
        self.actionable_features = actionable_features
        self.immutable_features = immutable_features
        self.feature_ranges = feature_ranges
        self.causal_graph = causal_graph
        self.model = model
        self.num_cols = num_cols
        self.ord_cols = ord_cols
        self.ord_orders = ord_orders
        self.l_enc_cols = l_enc_cols

In [None]:
# feature ranges definition
# using an hybrid approach - the min-max values of the features and incorporating domain knowledge

# domain feature ranges changes per dataset and its features
DOMAIN_FEATURE_RANGES = {
    "age": (18, 90),                 # legal working age
    "housing": (0, 3),        # German dataset encoding
    "saving account": (0, 3),       # realistic work hours
    "checking account": (0, 3)     # IRS-like practical bound
}

# the dataset without the target column
X = g_data
def build_feature_ranges(X, numerical_features, domain_ranges):
    feature_ranges = {}

    for f in numerical_features:
        if f in domain_ranges:
            feature_ranges[f] = domain_ranges[f]
        else:
            feature_ranges[f] = (
                X[f].quantile(0.01),
                X[f].quantile(0.99)
            )
    return feature_ranges

feature_ranges = build_feature_ranges(X=X, numerical_features=num_cols, domain_ranges=DOMAIN_FEATURE_RANGES)

In [None]:
# data prep for the input data context
x_test_transformed = np.hstack((x_test_scaled, x_test[ordinal_cols].values, x_test[l_encoder_cols].values))
encoded_columns = num_cols + ordinal_cols + l_encoder_cols
x_test_encoded = pd.DataFrame(x_test_transformed, columns=encoded_columns)
x_test_encoded["risk"] = y_test.values

# sanity check
print((x_test_encoded['risk'] == 0).sum())

# data instance
cf_data_instance = x_test_encoded[x_test_encoded['risk'] == 0].iloc[[20]].copy()
cf_data_instance_t = cf_data_instance.drop(columns = ['risk']).to_numpy()
features_names = g_data.columns.tolist()

# actual data instance = raw data for this index
data_instance = manual_inverse_transform( transformed_data=cf_data_instance.drop(columns=["risk"]),
                                scaler = scaler, label_encs=label_encoders,
                                ord_cols=ordinal_cols, ord_orders=order_list,
                                num_cols=num_cols, l_enc_cols=l_encoder_cols)

context = DataContext(
    x=cf_data_instance_t,
    x_df=data_instance,
    feature_names=features_names,
    actionable_features=actionable_f,
    immutable_features=immutable_f,
    feature_ranges=feature_ranges,
    causal_graph=DAG_graph,
    model=svm_model,
    num_cols=num_cols,
    ord_cols=ordinal_cols,
    ord_orders=order_list,
    l_enc_cols=l_encoder_cols
)

TARGET = "risk"

all_features = [f for f in g_data.columns if f != TARGET]
actionable_features = [f for f in actionable_f if f != TARGET]
immutable_features = [f for f in immutable_f if f != TARGET]

# Debugging purpose - confirming the instances are the same in encoded and transformed state.
encoded_instance = cf_data_instance_t
print(encoded_instance)

raw_instance = data_instance
raw_instance

In [None]:
# USER PROXY
# define a preference schema that determines the kind of preferences that exist,
# how they are expressed, and how they influence the CFE generation (traceability)

u_preference = {
    'feature': str,
    'importance_rank': int,
    'hard': bool,
}

# prompt constructor for the user (LLM-agent simulated user)
def build_preference_prompt(context: DataContext) -> str:
    """
    Constructs a prompt to simulate a user providing feature-level preferences
    for counterfactual generation.
    """
    # current instance
    instance_desc = context.x_df.to_dict(orient="records")[0]
    # features that can be modified
    actionable = ", ".join(context.actionable_features)

    prompt = f"""
    You are a user seeking to improve your desired outcome in a decision system.

    Your current profile:
    {instance_desc}

    You are ONLY allowed to change the following features:
    {actionable}

    Instructions:
    - Select **at least 3 and at most 5** features you would prefer to change
    - You do NOT need to select exactly 5 features
    - Do NOT include immutable features
    - For each selected feature, specify:
      - importance_rank: 1 = most important, higher numbers = less important
      - whether it is a hard constraint
      - a short rationale why you want it changed

    Return your response as JSON with this format:
    [
      {{
        "feature": "...",
        "importance_rank": 1,
        "hard": true|false
      }}
    ]
    Before the JSON, briefly explain your reasoning in plain text.
    """
    return prompt


In [None]:
# interface for LLM agents
from abc import ABC, abstractmethod

class UserPreferenceAgent(ABC):
    @abstractmethod
    def elicit_preferences(self, context: DataContext) -> list:
        pass

# OpenAI / GPT-Based Agent
# current model: GPT-4.1-mini
import logging
from openai import OpenAI
import json
import re

def extract_json(llm_output: str):
    """
    Extracts the first JSON array or object from an LLM response.
    Assumes reasoning text may precede the JSON.
    """
    match = re.search(r"(\[.*\]|\{.*\})", llm_output, re.DOTALL)
    if not match:
        raise ValueError("No JSON found in LLM output")

    return json.loads(match.group())

# # WORKS! Doesn't show the simulated user reasoning or thought process
# temperature = 0.3 (mostly consistent); 0.5-0.7: has more randomness /high creativity;  0.0 - 0.2: very deterministic/factual/repitive
class GPTUserPreferenceAgent(UserPreferenceAgent):
    def __init__(self, model="gpt-4.1-mini", temperature=0.5):
        self.client = OpenAI()
        self.model = model
        self.temperature = temperature
        self.logger = logging.getLogger("CFE.PreferenceAgent")

    def elicit_preferences(self, context: DataContext) -> list:
        self.logger.info("Preference elicitation started")

        prompt = build_preference_prompt(context)
        self.logger.info("Prompt constructed for LLM")
        self.logger.debug("PROMPT:\n%s", prompt)

        self.logger.info("Invoking LLM model=%s", self.model)

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=self.temperature
        )

        self.logger.info("LLM response received")

        raw_output = response.choices[0].message.content
        self.logger.debug("RAW LLM OUTPUT:\n%s", raw_output)

        try:
            preferences = json.loads(raw_output)
        except json.JSONDecodeError:
            self.logger.error("Invalid JSON returned by LLM")
            raise ValueError("LLM did not return valid JSON")

        self.logger.info("Preference parsing successful")
        return preferences

In [None]:
# simulating multiple user personas
# WHY: Real users are inconsistent, may not fully understand features semtnatics, and priorites change over time. Additionally, it provides robustness through preference diversity as it can enable the pipeline to detect preference instability, identify domninant features, and discard CFEs that suit a narrow preference profile.

SIMULATED_USER_PERSONAS = [
    {
        "name": "Effort-Minimizing User",
        "system_prompt": (
            "You prefer changes that require minimal effort, "
            "avoid education or occupation changes, "
            "and favor small numeric adjustments."
        ),
        "temperature": 0.9
    },
    {
        "name": "Income-Maximizing User",
        "system_prompt": (
            "You prioritize changes that most strongly improve income, "
            "even if they require substantial effort."
        ),
        "temperature": 0.7
    },
    {
        "name": "Stability-Oriented User",
        "system_prompt": (
            "You prefer changes that preserve demographic and employment stability."
        ),
        "temperature": 0.5
    }
]

# preference stochasticity - introduce controlled randomness into each simulated user's elicitation process. It captures ambiguity in user articulation, tradeoff between competing priorities and sensitivity to prompt phrasing. Promotes robust CFE generation as it enables expected utility and worst case analysis
def inject_preference_stochasticity(preferences, drop_prob=0.2):
    """
    Randomly drops preferences to simulate uncertainty.
    """
    noisy_prefs = []
    for p in preferences:
        if random.random() > drop_prob:
            noisy_prefs.append(p)
    return noisy_prefs


import random
class MultiUserPreferenceAgent:
    def __init__(self, model="gpt-4.1-mini", verbose=True):
        self.client = OpenAI()
        self.model = model
        self.verbose = verbose
        self.logger = logging.getLogger("CFE.PreferenceAgent")

    def elicit_preferences(self, context, num_users=3):
        self.logger.info("Starting multi-user preference elicitation")

        sampled_personas = random.sample(
            SIMULATED_USER_PERSONAS,
            min(num_users, len(SIMULATED_USER_PERSONAS))
        )

        all_preferences = []

        for persona in sampled_personas:
            prefs = self._elicit_single_user(context, persona)
            all_preferences.append({
                "persona": persona["name"],
                "preferences": prefs
            })

        return all_preferences

    def _elicit_single_user(self, context, persona):
        prompt = build_preference_prompt(context)

        if self.verbose:
            print(f"\nðŸ§  Simulated User: {persona['name']}")
            print("--------------------------------------------------")
            print(prompt)
            print("--------------------------------------------------")

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": persona["system_prompt"]},
                {"role": "user", "content": prompt}
            ],
            temperature=persona["temperature"]
        )
        raw_output = response.choices[0].message.content

        if self.verbose:
            print("\nðŸ§  Raw Response")
            print("--------------------------------------------------")
            print(raw_output)
            print("--------------------------------------------------")

        preferences = extract_json(raw_output)
        preferences = inject_preference_stochasticity(preferences, drop_prob=0.0)

        return preferences

In [None]:
# threshold represents consensus constraints, a minimum level of consensus among simulated users required for a feature to be accepted as a global preference constraints

from collections import Counter
def aggregate_preferences(multi_user_prefs, threshold=0.5):
    """
    Keeps features preferred by at least `threshold` fraction of users.

    Accepts:
    - Single user dict
    - List of user dicts
    - Dicts with 'preferences' or 'features'
    - Flat list of preference dicts (single user)
    """
    if not multi_user_prefs:
        return []

    # ---- Normalize to list of users ----
    if isinstance(multi_user_prefs, dict):
        users = [multi_user_prefs]
    elif isinstance(multi_user_prefs, list):
        users = multi_user_prefs
    else:
        raise TypeError(
            f"Expected dict or list, got {type(multi_user_prefs)}"
        )

    feature_votes = Counter()
    valid_users = 0

    for user in users:
        # ---- Normalize user preferences ----
        if isinstance(user, dict):
            prefs = (
                user.get("preferences")
                or user.get("features")
            )
        elif isinstance(user, list):
            # already a flat list of prefs
            prefs = user
        else:
            continue

        if not prefs:
            continue

        valid_users += 1

        for p in prefs:
            if isinstance(p, dict) and "feature" in p:
                feature_votes[p["feature"]] += 1

    if valid_users == 0:
        return []

    # ---- Aggregate by threshold ----
    return [
        feature
        for feature, count in feature_votes.items()
        if count / valid_users >= threshold
    ]

# for single user
def normalize_user_preferences(user_prefs):
    """
    Converts single-user preferences into the format expected by `aggregate_preferences`.
    Output format:
    {"preferences": [{"feature": FEATURE_NAME}, ...]}
    """
    if isinstance(user_prefs, dict):
        # Already a dict; make sure inner list has 'feature'
        prefs = user_prefs.get("preferences") or user_prefs.get("features") or []
        normalized = []
        for p in prefs:
            if isinstance(p, dict):
                if "feature" in p:
                    normalized.append(p)
                else:
                    # assume key is the feature name
                    feature_name = list(p.keys())[0]
                    normalized.append({"feature": feature_name})
        return {"preferences": normalized}

    elif isinstance(user_prefs, list):
        # list of dicts
        normalized = []
        for p in user_prefs:
            if isinstance(p, dict):
                if "feature" in p:
                    normalized.append(p)
                else:
                    feature_name = list(p.keys())[0]
                    normalized.append({"feature": feature_name})
        return {"preferences": normalized}

    else:
        raise TypeError(f"Unsupported preference type: {type(user_prefs)}")

In [None]:
# Validate user preference adheres to the defined preference
# single user
def validate_single_user_preferences(preferences, context):
    """
    Validates a single user's preference list.
    """
    valid = []

    for p in preferences:
        feature = p["feature"]

        if feature not in context.actionable_features:
            continue
        if feature in context.immutable_features:
            continue

        valid.append(p)

    return valid

# multiple user
def validate_multi_user_preferences(multi_user_prefs, context):
    """
    Validates preferences for each simulated user independently.
    """
    validated = []

    for user_block in multi_user_prefs:
        persona = user_block["persona"]
        prefs = user_block["preferences"]

        valid_prefs = validate_single_user_preferences(prefs, context)

        validated.append({
            "persona": persona,
            "preferences": valid_prefs
        })

    return validated

In [None]:
agent = MultiUserPreferenceAgent(verbose=True)
logger.info("Requesting user preferences from LLM agent")

# SINGLE USER
raw_prefs = agent._elicit_single_user(context, SIMULATED_USER_PERSONAS[1])
logger.info("User preferences received")
user_preferences = validate_single_user_preferences(raw_prefs, context)

# normalize single-user prefs
normalized_prefs = normalize_user_preferences(user_preferences)
aggregated_features = aggregate_preferences(normalized_prefs, threshold=0.5)

# # MULTI USER
# raw_prefs = agent.elicit_preferences(context)
# logger.info("User preferences received")
# user_preferences = validate_multi_user_preferences(raw_prefs, context)
# aggregated_features = aggregate_preferences(user_preferences, threshold=0.6)

In [None]:
print("Validated User Preferences:")
for p in user_preferences:
    print(p)

length_user_preference = len(user_preferences)
length_user_preference

feasible_features = user_preferences

In [None]:
# MODULE 3: FEATURE UTILITY AND CAUSAL FEASIBILITY FILTER MODULE

In [None]:
#  Feature utility and the split of features into tiers

# Step 1: Parse user preferences into feature-conditioned utility
# lambda weight: 100% represented as 1
def build_feature_utility(user_preferences, lambda_weight=1.0):
    """
    Converts LLM user preferences into feature-conditioned utility weights.
    """
    feature_utilities = {}

    # Normalize ranks to [0,1] (1 = most important)
    ranks = [p['importance_rank'] for p in user_preferences]
    max_rank = max(ranks) if ranks else 1

    for p in user_preferences:
        norm_rank = 1 - ((p['importance_rank'] - 1) / max_rank)  # 1 = most important
        alpha = lambda_weight * (1 - norm_rank)  # lower penalty for important features
        feature_utilities[p['feature']] = {
                                            'alpha': alpha,
                                            'hard': p['hard'],
                                            'rank': p['importance_rank'],

                                        }
    return feature_utilities

# Step 2: Split features into Tier 1 and Tier 2 candidates, using causal feasibility
def split_tiers_with_causal_filter(feature_utilities, context: "DataContext"):
    """
    Splits features into Tier 1 (user-preferred) and Tier 2 (causally feasible remaining)
    using causal graph checks.
    """
    tier1_candidates = list(feature_utilities.keys())

    # Filter Tier 1 features for causal feasibility
    tier1_features, rejected_tier1 = filter_causally_feasible_features(tier1_candidates, context)

    # Identify causally related features for Tier 2
    tier2_candidates = set()
    for f in tier1_features:
        # Get children of f in causal graph
        if f in context.causal_graph.nodes:
            children = list(context.causal_graph.successors(f))
            tier2_candidates.update(children)

    # Exclude Tier 1 features and immutable features
    tier2_candidates = [
        f for f in tier2_candidates
        if f not in tier1_features and f in context.actionable_features
    ]

    # Filter Tier 2 candidates for causal feasibility
    tier2_features, rejected_tier2 = filter_causally_feasible_features(tier2_candidates, context)

    return tier1_features, tier2_features, rejected_tier1, rejected_tier2

In [None]:
# causal feasibility check of user preferences
# looking up the causal parents
def get_causal_parents(feature, causal_graph):
    if feature not in causal_graph.nodes:
        return []
    return list(causal_graph.predecessors(feature))

# checking the causal feasibility for a single feature
def is_feature_causally_feasible(feature, context: DataContext, selected_features, hard_only=True):
    """
    Checks if intervening on `feature` is causally feasible
    given current selected preference features.
    """
    parents = get_causal_parents(feature, context.causal_graph)

    # Immutable parent blocks unless feature is actionable
    for p in parents:
        if (
            p in context.immutable_features
            and feature not in context.actionable_features
        ):
            return False, f"{feature} has immutable parent {p}"

    # Parent-change requirement unless feature is actionable
    for p in parents:
        if (
            feature not in context.actionable_features
            and p not in selected_features
            and p not in context.actionable_features
        ):
            return False, f"{feature} requires parent {p} to change"

    return True, None

# checking the causal feasibility for a set of preferences
def filter_causally_feasible_features(aggregated_features, context: DataContext):
    """
    Filters aggregated preference features based on causal feasibility.
    """
    feasible = []
    rejected = []

    for f in aggregated_features:
        ok, reason = is_feature_causally_feasible( feature=f, context=context, selected_features=aggregated_features)
        if ok:
            feasible.append(f)
        else:
            rejected.append((f, reason))

    return feasible, rejected

feasible_features = aggregated_features

In [None]:
# Step 1: build feature-conditioned utility
feature_utils = build_feature_utility(user_preferences, lambda_weight=1.0)

# TIERED FEATURE PARTITIONING
# Step 2: split tiers with causal feasibility
tier1_features, tier2_features, rejected_t1, rejected_t2 = split_tiers_with_causal_filter(feature_utils, context)

print("\nFeature-conditioned utilities:")
for f, v in feature_utils.items():
    print(f"{f}: {v}")

print("\nTier 1 features (user-preferred & causally feasible):", tier1_features)
print("Tier 2 features (causally related & feasible):", tier2_features)
print("\nRejected Tier 1 features due to causal constraints:", rejected_t1)
print("Rejected Tier 2 features due to causal constraints:", rejected_t2)

In [None]:
# MODULE 4: MULTI-OBJECTIVE OPTIMIZATION MODULE
# optimizes the defined objectives to produce CFEs that are both user-aligned and actionable.

In [None]:
# reconstruct the counterfactuals to the human-readable data
def decode_data(valid_cfs: np.ndarray, scaler: StandardScaler,label_encoders: dict,
                ord_cols: list, ord_orders: list, num_cols: list, l_enc_cols: list,
                feature_names: list) -> pd.DataFrame:
    """
    Reconstruct counterfactuals generated by the CFE pipeline to human-readable values.

    Args:
        cf_array: np.ndarray of counterfactuals (shape: num_cfs x num_features)
        scaler: Fitted StandardScaler for numerical features
        label_encoders: dict of fitted LabelEncoders for nominal features
        ord_cols: List of ordinal column names
        ord_orders: List of lists with the ordering of each ordinal column
        num_cols: List of numerical feature names
        l_enc_cols: List of nominal categorical feature names
        feature_names: List of all feature names in order (num_cols + ord_cols + l_enc_cols)

    Returns:
        pd.DataFrame: Counterfactuals in original, human-readable format
    """
    cf_df = pd.DataFrame(valid_cfs, columns=feature_names)

    # a. Inverse transform numerical features
    df_num = pd.DataFrame(
        scaler.inverse_transform(cf_df[num_cols]),
        columns=num_cols
    )

    # b. Inverse transform ordinal features
    df_ord = pd.DataFrame(cf_df[ord_cols].round().astype(int), columns=ord_cols)
    for i, col in enumerate(ord_cols):
        inv_map = {idx: val for idx, val in enumerate(ord_orders[i])}
        df_ord[col] = df_ord[col].map(inv_map)

    # c. Inverse transform label-encoded nominal features
    df_nom = pd.DataFrame(columns=l_enc_cols)
    for col in l_enc_cols:
        vals = cf_df[col].round().astype(int)
        vals = vals.clip(lower=0, upper=len(label_encoders[col].classes_) - 1)
        df_nom[col] = label_encoders[col].inverse_transform(vals)

    # Combine all features in the original order
    reconstructed_cf = pd.concat([df_num, df_ord, df_nom], axis=1)

    return reconstructed_cf[num_cols + ord_cols + l_enc_cols]

In [None]:
import networkx as nx
from pymoo.core.problem import Problem
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.optimize import minimize
from pymoo.operators.crossover.sbx import SBX
from pymoo.core.callback import Callback
from pymoo.core.termination import Termination

# warning occurs because the prediction is made without the column name, while it was trained with the column name
import warnings
warnings.filterwarnings("ignore", message="X does not have valid feature names")

In [None]:
# helper function for the mutation
from pymoo.core.mutation import Mutation
from pymoo.core.population import Population

class DiverseBatchMutation(Mutation):
    def __init__(self, actionable_idx, sem_functions,
                 discrete_indices, xl, xu,  # <--- Essential: Receive bounds
                 mutation_rate, rng):
        super().__init__()
        self.actionable_idx = actionable_idx
        self.sem_functions = sem_functions
        self.mutation_rate = mutation_rate
        self.rng = rng

        # Store bounds for use in _do()
        self.discrete_indices = set(discrete_indices)
        self.xl = xl
        self.xu = xu

    def _do(self, problem, X, **kwargs):
        # Pass the STORED bounds, not the problem's bounds (safety)
        X_mut = self.batch_mutate_diverse(
            X,
            actionable_idx=self.actionable_idx,
            discrete_indices=self.discrete_indices,
            xl=self.xl,
            xu=self.xu,
            mutation_rate=self.mutation_rate,
            rng=self.rng
        )

        # Sanitize
        X_mut = np.nan_to_num(X_mut, nan=0.0, posinf=1e6, neginf=-1e6)
        if self.xl is not None and self.xu is not None:
             X_mut = np.clip(X_mut, self.xl, self.xu)

        return X_mut

    @staticmethod
    def batch_mutate_diverse(X, actionable_idx, discrete_indices, xl, xu, mutation_rate, rng):
        X = np.asarray(X)
        is_1d = X.ndim == 1
        X_mut = np.atleast_2d(X).copy()
        n_samples, n_vars = X_mut.shape

        for i in range(n_samples):
            for idx in actionable_idx:
                if rng.random() < mutation_rate:

                    # --- CATEGORICAL RESAMPLING ---
                    if idx in discrete_indices:
                        current_val = int(round(X_mut[i, idx]))

                        # Use the Monotonic/Relaxed bounds we calculated in __init__
                        min_val = int(xl[idx])
                        max_val = int(xu[idx])

                        # Generate candidates: integers in range [min, max] EXCLUDING current
                        candidates = [v for v in range(min_val, max_val + 1) if v != current_val]

                        if candidates:
                            # Force a jump to a new category
                            X_mut[i, idx] = rng.choice(candidates)

                    # --- CONTINUOUS PERTURBATION ---
                    else:
                        # Standard Gaussian noise
                        X_mut[i, idx] += rng.normal(0, 0.2)

        if is_1d:
            return X_mut[0]
        return X_mut

In [None]:
# Counterfactual Generation
class CounterfactualGeneration(Problem):
    def __init__(self, model, original_data: pd.DataFrame, desired_class: int,
                 actionable_features: list, immutable_features: list, feasible_preference_features: list,
                 sem_functions: dict, causal_graph: nx.DiGraph, scaler,
                 label_encoders, ord_cols, ord_orders, num_cols,
                 l_enc_cols, feature_names, stochastic: bool = True,
                 opt_cfg=None, seed=40, feature_utilities=None,
                 tier_1_objectives=None, tier_2_objectives=None,
                 lex_thresholds=None,):

        self.opt_cfg = opt_cfg or {}
        self.seed = seed
        self.model = model
        self.original_data = original_data.reset_index(drop=True)
        self.desired_class = desired_class

        # --- 1. SETUP FEATURES & INDICES ---
        # We trust the column order of the original dataframe
        self.features = list(original_data.columns)
        self.actionable = [f for f in actionable_features if f in causal_graph.nodes]
        self.immutable = [f for f in immutable_features if f in causal_graph.nodes]
        self.feasible_preference_features = feasible_preference_features
        # Map preferred features to their numerical index in the gene vector
        self.mutable_indices = [
            self.features.index(f) for f in feasible_preference_features if f in self.features
        ]

        # --- 2. CAUSAL & DECODER SETUP ---
        self.sem_functions = sem_functions
        self.graph = causal_graph
        self.stochastic = stochastic
        self.scaler = scaler
        self.label_encoders = label_encoders
        self.ord_cols = ord_cols
        self.ord_orders = ord_orders
        self.num_cols = num_cols
        self.l_enc_cols = l_enc_cols
        self.feature_names = feature_names
        self.feature_utilities = feature_utilities or {}

        self.tier_1_objectives = tier_1_objectives or ["proximity"]
        self.tier_2_objectives = tier_2_objectives or ["plausibility", "sparsity", "weighted_cost"]
        self.lex_thresholds = lex_thresholds or {"validity": 0, "proximity": 1.5}

        # --- 3. QUANTIZATION SETUP ---
        # Identify all columns that are NOT numerical (Ordinal + Categorical)
        # We will force these to be integers during evaluation.
        self.discrete_indices = []
        for col in self.features:
            if col not in self.num_cols:
                self.discrete_indices.append(self.features.index(col))

        # --- 4. PROBLEM DEFINITION ---
        # 1. Define Standard Bounds (tight for continuous)
        x_orig = original_data.iloc[0].values
        eps = 0.5
        xl = x_orig - eps
        xu = x_orig + eps

        # 2. RELAX BOUNDS for Categorical/Discrete Features (The Fix)
        # We allow these features to span their full encoded range (e.g., 0 to n_classes-1)
        for idx in self.discrete_indices:
            feature_name = self.features[idx]

            if idx in self.discrete_indices:
                # Determine valid range based on encoder
                if feature_name in self.label_encoders:
                    # Nominal: 0 to len(classes) - 1
                    lower = 0
                    upper = len(self.label_encoders[feature_name].classes_) - 1
                elif feature_name in self.ord_cols:
                    # Ordinal: 0 to len(order) - 1
                    # Find which ordinal order list corresponds to this col
                    ord_idx = self.ord_cols.index(feature_name)
                    lower = 0
                    upper = len(self.ord_orders[ord_idx]) - 1
                else:
                    # Fallback
                    lower, upper = xl[idx], xu[idx]

                # Update bounds specifically for this feature
                xl[idx] = lower
                xu[idx] = upper

        super().__init__(
            n_var=len(self.features),
            n_obj=4,
            n_constr=2,
            xl=xl,
            xu=xu,
        )

    def _evaluate(self, X, out, *args, **kwargs):
        # ---------------------------------------------------------
        # STEP 1: STRICT QUANTIZATION (The "Snap-to-Grid" Fix)
        # ---------------------------------------------------------
        # Force discrete genes to nearest integer.
        # This ensures the Model sees exactly what the Decoder sees.
        X_quantized = X.copy()
        if len(self.discrete_indices) > 0:
            X_quantized[:, self.discrete_indices] = np.round(X_quantized[:, self.discrete_indices])

        # ---------------------------------------------------------
        # STEP 2: REFERENCE DATA DECODING
        # ---------------------------------------------------------
        x_orig_encoded = self.original_data.iloc[0].values
        orig_df_encoded = pd.DataFrame([x_orig_encoded], columns=self.features)

        decoded_orig_df = decode_data(
            valid_cfs=orig_df_encoded.values, scaler=self.scaler, label_encoders=self.label_encoders,
            ord_cols=self.ord_cols, ord_orders=self.ord_orders, num_cols=self.num_cols,
            l_enc_cols=self.l_enc_cols, feature_names=self.feature_names,
        )
        decoded_orig_row = decoded_orig_df.iloc[0]

        # ---------------------------------------------------------
        # STEP 3: CANDIDATE DECODING
        # ---------------------------------------------------------
        # Critical: Re-align columns to match what decode_data expects
        X_quant_df = pd.DataFrame(X_quantized, columns=self.features)
        X_quant_reordered = X_quant_df[self.feature_names].values

        decoded_X = decode_data(
            valid_cfs=X_quant_reordered, scaler=self.scaler, label_encoders=self.label_encoders,
            ord_cols=self.ord_cols, ord_orders=self.ord_orders, num_cols=self.num_cols,
            l_enc_cols=self.l_enc_cols, feature_names=self.feature_names,
        )

        F = []
        pred_probs = []
        # ---------------------------------------------------------
        # STEP 4: EVALUATION LOOP
        # ---------------------------------------------------------
        for i, cf_vec in enumerate(X_quantized):

            # A. Reconstruction (Apply gene changes to original data)
            x_cf = self.original_data.iloc[0].copy()
            for idx in self.mutable_indices:
                x_cf.iloc[idx] = cf_vec[idx]

            # B. Prediction (Using the STRICT QUANTIZED values)
            x_input = x_cf.values.reshape(1, -1)
            pred = self.model.predict(x_input)[0]
            prob = self.model.predict_proba(x_input)[0, self.desired_class]
            pred_probs.append(prob)

            validity = 0 if pred == self.desired_class else 1

            # C. Proximity
            proximity = np.linalg.norm(x_cf.values - x_orig_encoded)

            # D. Plausibility
            plausibility = self.compute_plausibility(x_cf)

            # E. Sparsity & Weighted Cost
            x_cf_dec = decoded_X.iloc[i]
            sparsity_count = 0.0
            weighted_cost = 0.0

            for idx in self.mutable_indices:
                fname = self.features[idx]

                # --- FIX START: Initialize the flag ---
                changed = False
                # --------------------------------------

                # Determine Rank-Based Weight
                if fname in self.feature_utilities:
                    feature_weight = 1.0 + self.feature_utilities[fname].get("alpha", 1.0)
                else:
                    feature_weight = 2.0

                # Extract Values
                val_cf = x_cf_dec[fname]
                if hasattr(val_cf, 'item'): val_cf = val_cf.item()
                val_orig = decoded_orig_row[fname]
                if hasattr(val_orig, 'item'): val_orig = val_orig.item()

                # Detect Change
                if fname in self.num_cols:
                    if not np.isclose(float(val_cf), float(val_orig), atol=1e-5):
                        changed = True
                else:
                    if val_cf != val_orig:
                        changed = True

                # Accumulate Costs (Updated with Stagnation Penalty logic)
                if changed:
                    sparsity_count += 1.0
                    weighted_cost += feature_weight

                # Optional: Stagnation Penalty for Tier 1 (from previous suggestion)
                elif fname in self.feature_utilities and self.feature_utilities[fname]['rank'] == 1:
                     weighted_cost += 5.0 # Penalty for NOT changing a priority feature

            # F. Constraints
            tier1_violation = int(
                (validity > self.lex_thresholds.get("validity", 0)) or
                (proximity > self.lex_thresholds.get("proximity", 1.5))
            )
            penalty = 1e3 * tier1_violation

            F.append([proximity + penalty, plausibility, sparsity_count, weighted_cost])

        F = np.asarray(F)
        pred_probs = np.asarray(pred_probs)

        out["F"] = F
        out["G"] = np.hstack([
            np.maximum(0, 0.5 - pred_probs).reshape(-1, 1),
            np.maximum(0, F[:, 0] - 6.0).reshape(-1, 1),
        ])
    def compute_plausibility(self, x_cf):
        nll = 0.0
        for node in nx.topological_sort(self.graph):
            if node in self.actionable or node in self.immutable: continue
            if node not in self.sem_functions: continue
            parents = list(self.graph.predecessors(node))
            if not parents: continue
            parent_vals = x_cf[parents].values.reshape(1, -1)
            if parent_vals.dtype == object: continue
            sem_func, _ = self.sem_functions[node]
            mu, sigma = sem_func(parent_vals)
            mu = float(np.squeeze(mu))
            sigma = float(np.squeeze(sigma)) + 1e-6
            sampled = np.random.normal(mu, sigma) if self.stochastic else mu
            nll += (sampled - mu) ** 2 / (2 * sigma ** 2)
        return float(nll)


In [None]:
# optimization problem
verified_feasible_features = tier1_features + tier2_features

def build_cfe_problem(model, cf_data_instance, context,
                      feasible_features, sem_functions,
                      scaler, label_encoders, feature_names,
                      desired_class=1, stochastic=True, opt_cfg=None,
                      seed=45, feature_utilities=None, lex_thresholds=None,
                      ):
    """
    Builds a lexicographic CounterfactualGeneration problem.
    """

    opt_cfg = opt_cfg or BASELINE_OPT.copy()

    problem = CounterfactualGeneration(
        model=model,
        original_data=cf_data_instance,
        desired_class=desired_class,
        actionable_features=context.actionable_features,
        immutable_features=context.immutable_features,
        feasible_preference_features=verified_feasible_features,
        sem_functions=sem_functions,
        causal_graph=context.causal_graph,
        scaler=scaler,
        label_encoders=label_encoders,
        ord_cols=context.ord_cols,
        ord_orders=context.ord_orders,
        num_cols=context.num_cols,
        l_enc_cols=context.l_enc_cols,
        feature_names=feature_names,
        stochastic=stochastic,
        opt_cfg=opt_cfg,
        seed=seed,
        feature_utilities=feature_utilities,
        lex_thresholds=lex_thresholds,
    )

    return problem

def build_nsga3_algorithm(problem, opt_cfg, seed=40):
    """
    Build NSGA-III algorithm using problem-aware configuration.
    """
    rng = np.random.default_rng(seed)

    ref_dirs = get_reference_directions(
        "das-dennis",
        n_dim=problem.n_obj,
        n_partitions=opt_cfg.get("ref_partitions", 4),
    )

    algorithm = NSGA3(
        ref_dirs=ref_dirs,
        pop_size=opt_cfg.get("pop_size", len(ref_dirs)),
        crossover=SBX(
            prob=opt_cfg.get("crossover_prob", 0.9),
            eta=opt_cfg.get("mutation_eta", 20),
        ),
        mutation=DiverseBatchMutation(
            actionable_idx=problem.mutable_indices,
            sem_functions=problem.sem_functions,

            # --- INJECTING PROBLEM DATA HERE ---
            discrete_indices=problem.discrete_indices,
            xl=problem.xl,
            xu=problem.xu,
            # -----------------------------------

            mutation_rate=opt_cfg.get("mutation_rate", 0.4),
            rng=rng,
        ),
        eliminate_duplicates=True,
    )
    return algorithm

def generate_cfe(problem, algorithm=None, n_generations=100,
                 n_seeds=5, N_valid=5,
                 seed=40, verbose=True):
    """
    Run lexicographic NSGA-III optimization and return diverse CF solutions.
    """

    # Boundary awareness
    x_input = problem.original_data.values.reshape(1, -1)
    proba = problem.model.predict_proba(x_input)[0][problem.desired_class]
    log_odds = safe_log_odds(proba)

    if abs(log_odds) > 0.2:
        n_generations = int(n_generations * 1.5)

    # Build algorithm if needed
    if algorithm is None:
        algorithm = build_nsga3_algorithm(
            problem, opt_cfg=problem.opt_cfg, seed=seed
        )

    # --- FIX: Removed 'validity_index=0' to match your new class ---
    # from pymoo.termination import get_termination
    # termination = get_termination("n_gen", n_generations)

    termination = CounterfactualTermination(N=N_valid)

    callback = LogCallback()

    solutions = []

    for s in range(n_seeds):
        problem.seed = s
        res = minimize(
            problem,
            algorithm,
            termination=termination,
            seed=s,
            callback=callback,
            verbose=verbose,
        )
        solutions.append(res.X)

    return solutions


# sensitivity analysis for the lexicographic ordering threshold
def lexicographic_sensitivity_analysis(problem, algorithm, proximity_grid,
                                       min_valid=5, seed=40, verbose=False,
                                       ):
    """
    Sensitivity analysis for Îµ-lexicographic proximity threshold.
    """
    from pymoo.termination import get_termination

    records = []

    for eps in proximity_grid:
        problem.lex_thresholds["proximity"] = eps

        res = minimize(problem, algorithm,
            termination=get_termination("n_gen", 50),
            seed=seed, verbose=verbose,
                       )
        F = res.F

        # Valid solutions: no constraint violations
        valid_mask = np.all(res.G <= 0, axis=1)

        records.append({
            "eps": eps,
            "n_valid": valid_mask.sum(),
            "avg_plausibility": np.mean(F[valid_mask, 1]) if valid_mask.any() else np.inf,
            "avg_sparsity": np.mean(F[valid_mask, 2]) if valid_mask.any() else np.inf,
        })

    df = pd.DataFrame(records)

    # Stability heuristic
    stable_eps = df[
        (df["n_valid"] >= min_valid) &
        (df["avg_plausibility"].diff().abs() < 0.1)
        ]

    return df, stable_eps

In [None]:
# ----------------------------
# Logging callback
# ----------------------------
# tracks and visualize the search for optimal counterfactuals
class LogCallback(Callback):
    def __init__(self):
        super().__init__()
        self.data["gen"] = []
        self.data["n_evals"] = []
        self.data["opt"] = []

    def notify(self, algorithm):
        self.data["gen"].append(algorithm.n_gen)
        self.data["n_evals"].append(algorithm.evaluator.n_eval)
        self.data["opt"].append(algorithm.opt.get("F"))

# ----------------------------
# Safe log odds
# ----------------------------
# transforms model outputs into a space where optimization is more effective or interpretable
def safe_log_odds(prob):
    eps = 1e-9
    prob = np.clip(prob, eps, 1 - eps)
    return np.log(prob / (1 - prob))

# ----------------------------
# Early Termination
# ----------------------------
class CounterfactualTermination(Termination):
    def __init__(self, N=5):
        super().__init__()
        self.N = N

    def _update(self, algorithm):
        # Get Constraint violations (G)
        # G[:, 0] corresponds to the validity constraint defined in _evaluate
        G = algorithm.opt.get("G")

        if G is None:
            return False

        # Count how many solutions satisfy the validity constraint (G <= 0)
        valid_count = np.sum(G[:, 0] <= 0)

        # Stop if we have found enough valid counterfactuals
        return valid_count >= self.N

In [None]:
# CFE reconstruction

def reconstruct_counterfactuals(problem, X):
    """
    Converts PyMOO decision vectors into full counterfactual records.

    All features are preserved.
    Only feasible user-preferred features are modified.
    """
    x_orig = problem.original_data.iloc[0].copy()
    cfs = []

    for x in X:
        x_cf = x_orig.copy()
        # Apply ONLY to feasible preference features
        for idx in problem.mutable_indices:
            x_cf.iloc[idx] = x[idx]

        cfs.append(x_cf)

    return pd.DataFrame(cfs)

def extract_valid_counterfactuals(problem, X_solutions, feasible_features):
    # 1. Re-evaluate
    out = {}
    problem._evaluate(X_solutions, out)
    F = out["F"]
    G = out["G"]

    # 2. Reconstruct (Returns ENCODED data in problem.features order)
    cf_df_encoded = reconstruct_counterfactuals(problem, X_solutions)

    # --- FIX 1: DECODE COUNTERFACTUALS WITH CORRECT COLUMN ORDER ---
    # We must use problem.features (Encoded Order) so the data aligns with the names.
    cf_df = decode_data(
        valid_cfs=cf_df_encoded.values,
        scaler=problem.scaler,
        label_encoders=problem.label_encoders,
        ord_cols=problem.ord_cols,
        ord_orders=problem.ord_orders,
        num_cols=problem.num_cols,
        l_enc_cols=problem.l_enc_cols,
        # CRITICAL FIX: Use Encoded Order
        feature_names=problem.features
    )

    # 3. Define Names
    objective_names = ["proximity", "plausibility", "sparsity", "weighted_cost"]
    obj_df = pd.DataFrame(F, columns=objective_names)

    # 4. Filter Validity
    is_valid = G[:, 0] <= 0
    cf_with_scores = pd.concat([cf_df.reset_index(drop=True), obj_df], axis=1)
    cf_with_scores["is_valid"] = is_valid
    valid_cfs = cf_with_scores[cf_with_scores["is_valid"] == True].copy()

    # --- FIX 2: DECODE ORIGINAL DATA WITH CORRECT COLUMN ORDER ---
    x_orig_encoded = problem.original_data.iloc[0].values.reshape(1, -1)
    decoded_orig_df = decode_data(
        valid_cfs=x_orig_encoded,
        scaler=problem.scaler,
        label_encoders=problem.label_encoders,
        ord_cols=problem.ord_cols,
        ord_orders=problem.ord_orders,
        num_cols=problem.num_cols,
        l_enc_cols=problem.l_enc_cols,
        # CRITICAL FIX: Use Encoded Order
        feature_names=problem.features
    )
    decoded_orig_row = decoded_orig_df.iloc[0]

    # 5. PSR Calculation
    def get_weighted_psr(row):
        actual_score = 0.0
        max_score = 0.0

        full_wishlist = list(problem.feature_utilities.keys())
        if not full_wishlist: return 100.0

        num_prefs = len(full_wishlist)

        # DEBUG HEADER
        if row.name == 0:
            print(f"\n--- PSR COMPARISON DEBUG ---")

        for fname in full_wishlist:
            util = problem.feature_utilities[fname]
            rank = util['rank']
            weight = (num_prefs + 1) - rank
            max_score += weight

            orig_val = decoded_orig_row[fname]
            cf_val = row[fname]

            changed = False

            # Robust Comparison
            if fname in problem.num_cols:
                 if not np.isclose(float(cf_val), float(orig_val), atol=1e-5):
                     changed = True
            else:
                s_cf = str(cf_val).strip().lower()
                s_orig = str(orig_val).strip().lower()
                if s_cf != s_orig:
                    changed = True
                    if row.name == 0:
                        print(f"CHANGE: {fname} ('{s_orig}' -> '{s_cf}')")

            if changed:
                actual_score += weight

        if max_score == 0: return 0.0

        if row.name == 0:
            print(f"Final PSR: {(actual_score/max_score)*100}%")
            print("----------------------------\n")

        return (actual_score / max_score) * 100

    if not valid_cfs.empty:
        valid_cfs['psr_percent'] = valid_cfs.apply(get_weighted_psr, axis=1)

    return valid_cfs.drop(columns=["is_valid"])


In [None]:
# -----------------------------
# Baseline (fixed)
# -----------------------------
BASELINE_OPT = dict(
    pop_size=150,
    n_generations=100,
    mutation_eta=10,
    crossover_prob=0.9,
    ref_partitions=6
)

# data instance
cf_data_instance = x_test_encoded[x_test_encoded['risk'] == 0].iloc[[20]].copy()
cf_data_instance_t = cf_data_instance.drop(columns=['risk']).reset_index(drop=True)
features_names = g_data.columns.tolist()


In [None]:
# ==============================================================================
# STEP 1: INITIALIZE PROBLEM & ALGORITHM
# ==============================================================================
# This creates the objects required to run the sensitivity analysis.

lex_thresholds = {
    "validity": 0,     # must flip class
    "proximity": 1.5   # initial Îµ (will be tuned later)
}

problem = build_cfe_problem(
    model=svm_model,
    cf_data_instance=cf_data_instance_t,
    context=context,
    feasible_features=feasible_features,
    sem_functions=sem_functions,
    scaler=scaler,
    label_encoders=label_encoders,
    feature_names=features_names,
    desired_class=1,
    stochastic=True,
    opt_cfg=BASELINE_OPT,
    feature_utilities=feature_utils,
    lex_thresholds=lex_thresholds, # Starts with your default/wide thresholds
    seed=45
)

algorithm = build_nsga3_algorithm(problem, opt_cfg=problem.opt_cfg, seed=45)

solutions = generate_cfe(problem, algorithm=algorithm, n_generations=100,
                         n_seeds=5, seed=45, verbose=True)

# ==============================================================================
# STEP 2: LEXICOGRAPHIC SENSITIVITY ANALYSIS
# ==============================================================================
# Now we use the objects defined above to find the "Sweet Spot"

print("--- Starting Sensitivity Analysis ---")
proximity_grid = np.linspace(0.5, 5.0, 10)

sensitivity_df, stable_eps = lexicographic_sensitivity_analysis(
    problem=problem,     # Passes the problem you built in Step 1
    algorithm=algorithm, # Passes the algorithm you built in Step 1
    proximity_grid=proximity_grid,
    min_valid=5,
    seed=45,
    verbose=False
)

print("Sensitivity Results:")
display(sensitivity_df)

print("Stable Values")
display(stable_eps)

# ==============================================================================
# STEP 3: UPDATE PROBLEM WITH OPTIMAL THRESHOLD
# ==============================================================================
# We now modify the 'problem' object you created in Step 1 with the new value.

if not stable_eps.empty:
    optimal_epsilon = stable_eps.iloc[0]['eps']
    print(f"\n>>> Selected Optimal Proximity Threshold (Îµ): {optimal_epsilon}")
else:
    optimal_epsilon = 2.5 # Fallback
    print(f"\n>>> Warning: No stable threshold found. Using fallback: {optimal_epsilon}")

# CRITICAL STEP: Update the existing problem instance
problem.lex_thresholds["proximity"] = optimal_epsilon

# ==============================================================================
# STEP 4: FINAL GENERATION
# ==============================================================================
# The 'problem' variable now contains the tuned threshold.

print(f"\n--- Running Final Generation with Îµ={optimal_epsilon} ---")

# Run the final optimization
res = minimize(problem, algorithm,
               ("n_gen", 150), seed=45,
               verbose=True)

X_solutions = res.X
valid_cfs = extract_valid_counterfactuals(problem, X_solutions, feasible_features)

In [None]:
valid_cfs

In [None]:
# List of objectives you defined
# objective_names = ["validity", "proximity", "plausibility", "sparsity", "psr"]

objective_names = ["proximity", "plausibility", "sparsity", "psr_percent", "weighted_cost"]
# Extract only the objective columns
objectives_df = valid_cfs[objective_names]
objectives_df


In [None]:
cf_data_instance

In [None]:
data_instance

In [None]:
def compare_instance_with_cfes(data_instance: pd.DataFrame, decoded_cfes: pd.DataFrame):
    """
    Compare a single original instance with multiple decoded counterfactuals.
    Highlights changes with "old â†’ new" notation.

    Args:
        data_instance: pd.DataFrame with a single row (original instance, human-readable)
        decoded_cfes: pd.DataFrame of counterfactuals (human-readable)

    Returns:
        pd.io.formats.style.Styler: styled DataFrame showing changes
    """

    # Function to highlight changes
    def highlight_changes(val):
        if isinstance(val, str) and "â†’" in val:
            return "background-color: #d4edda; color: green; font-weight: bold;"  # light green
        return ""

    # Repeat the original instance to match CF rows
    original_repeated = pd.concat([data_instance]*decoded_cfes.shape[0], ignore_index=True)

    # Convert both to strings for comparison
    original_str = original_repeated.astype(str)
    cf_str = decoded_cfes.astype(str)

    # Align columns in case of any mismatch
    original_str, cf_str = original_str.align(cf_str, join='inner', axis=1)

    # Create comparison DataFrame
    comparison_df = original_str.copy()

    for col in comparison_df.columns:
        diff_mask = original_str[col] != cf_str[col]
        comparison_df.loc[diff_mask, col] = original_str.loc[diff_mask, col] + " â†’ " + cf_str.loc[diff_mask, col]

    # Use Styler.map instead of deprecated applymap
    styled_df = comparison_df.style.map(highlight_changes)
    return styled_df


In [None]:
# original_decoded: single-row original instance
# decoded_cf_df: reconstructed human-readable CFEs
styled_comparison = compare_instance_with_cfes(data_instance, valid_cfs)
styled_comparison


In [None]:
user_preferences

In [None]:
feasible_features