# How to contribute?

Here we report a simple guide with the key features of RobustX in order to implement your own (robust) counterfactual explanation method.

# Setup preparation

In [None]:
# Import necessary components
from robustx.lib.models.BaseModel import BaseModel
from sklearn.model_selection import train_test_split
from robustx.lib.models.pytorch_models.SimpleNNModel import SimpleNNModel
from robustx.datasets.ExampleDatasets import get_example_dataset
from robustx.lib.tasks.ClassificationTask import ClassificationTask
from robustx.generators.CE_methods.Wachter import Wachter
import torch
from sklearn.metrics import accuracy_score, classification_report
import numpy as np
import pandas as pd
from robustx.generators.CEGenerator import CEGenerator


class EnsembleModelWrapper(BaseModel):
    def __init__(self, model_ensemble: list[torch.nn.Module], aggregation_method: str = 'majority_vote'):
        super().__init__(EnsembleModelWrapper)
        self.model_ensemble = model_ensemble
        self.pt_model_ensemble = [model._model for model in model_ensemble]
        self.aggregation_method = aggregation_method
    
    def train(self, X: pd.DataFrame, y: pd.DataFrame) -> None:
        print("Training should be done on individual models in the ensemble.")
    
    def predict(self, X: pd.DataFrame) -> pd.DataFrame:
        device = next(self.pt_model_ensemble[0].parameters()).device
        X_tensor = torch.Tensor(X.to_numpy()).to(device)
        preds_ensemble = []
        for model in self.pt_model_ensemble:
            model.eval()
            with torch.no_grad():
                outputs = model(X_tensor).cpu().numpy()
                preds_ensemble.append(outputs)
        preds_ensemble = np.array(preds_ensemble)  # Shape: (n_models, n_samples, n_classes)
        if self.aggregation_method == 'majority_vote':
            final_preds = np.squeeze(np.round(np.mean(preds_ensemble, axis=0)))
        else:
            raise ValueError(f"Unknown aggregation method: {self.aggregation_method}")
        predictions = final_preds.astype(int)
        return pd.DataFrame(predictions, columns=['prediction'], index=X.index)
    
    def predict_single(self, X: pd.DataFrame) -> int:
        return self.predict(X).values.item()
    
    def predict_ensemble_proba_tensor(self, X: torch.Tensor) -> torch.Tensor:
        device = next(self.pt_model_ensemble[0].parameters()).device
        X_tensor = torch.Tensor(X.to_numpy()).to(device)
        probs_ensemble = []
        for model in self.pt_model_ensemble:
            model.eval()
            with torch.no_grad():
                outputs = model(X_tensor)
                probs_ensemble.append(outputs)
        probs_ensemble = torch.tensor(probs_ensemble)  # Shape: (n_models, n_samples, n_classes)
        return probs_ensemble
    
    def predict_proba(self, X: pd.DataFrame) -> pd.DataFrame:
        X_tensor = torch.Tensor(X.to_numpy())
        probs_ensemble = self.predict_ensemble_proba_tensor(X_tensor).numpy()  # Shape: (n_models, n_samples, n_classes)
        if self.aggregation_method == 'majority_vote':
            aggregated_probs = np.mean(probs_ensemble, axis=0)
        return pd.DataFrame(aggregated_probs, columns=[f'class_{i}' for i in range(aggregated_probs.shape[1])], index=X.index)
        
    def predict_proba_tensor(self, X: torch.Tensor) -> torch.Tensor:
        X_numpy = X.numpy()
        probabilities = self.predict_proba(X_numpy)
        return torch.tensor(probabilities)
    
    def evaluate(self, X: pd.DataFrame, y: pd.DataFrame):
        y_pred = self.predict(X)
        accuracy = accuracy_score(y, y_pred)
        report = classification_report(y, y_pred)
        return {
            'accuracy': accuracy,
            'classification_report': report
        }
    
    def compute_accuracy(self, X_test, y_test):            
        return self.evaluate(X_test, y_test)['accuracy']

In [67]:
# Load and preprocess dataset
dl = get_example_dataset("iris")
dl.preprocess(
    impute_strategy_numeric='mean',  # Impute missing numeric values with mean
    scale_method='minmax',           # Apply min-max scaling
    encode_categorical=False         # No categorical encoding needed (since no categorical features)
)

# remove the target column from the dataset that has labels 2
dl.data = dl.data[dl.data['target'] != 2]

# Load model, note some RecourseGenerators may only work with a certain type of model,
# e.g., MCE only works with a SimpleNNModel
n_models = 2
model_ensemble = [SimpleNNModel(4, [10], 1, seed=0) for _ in range(n_models)]

target_column = "target"
X_train, X_test, y_train, y_test = train_test_split(dl.data.drop(columns=[target_column]), dl.data[target_column], test_size=0.35, random_state=0)


# Train each model in the ensemble
all_indexes = np.arange(X_train.shape[0])
for model in model_ensemble:
    np.random.shuffle(all_indexes)
    sampled_indexes = all_indexes[:int(0.8 * len(all_indexes))]
    model.train(X_train.iloc[sampled_indexes], y_train.iloc[sampled_indexes], epochs=100, batch_size=16, verbose=0)
    print(f"model accuracy: {model.compute_accuracy(X_test.values, y_test.values):0.4f}")

emodel = EnsembleModelWrapper(model_ensemble=model_ensemble, aggregation_method='majority_vote')
print(f"ensemble accuracy: {emodel.compute_accuracy(X_test, y_test):0.4f}")


# Create task
task = ClassificationTask(emodel.model_ensemble[0], dl)

model accuracy: 0.5143
model accuracy: 1.0000
ensemble accuracy: 0.6571


# Example of an already implemented CE generation method in RobustX

In [69]:
class EntropicRisk(torch.nn.Module):
    def __init__(self, theta):
        super(EntropicRisk, self).__init__()
        self.theta = theta

    def forward(self, ensemble_proba):
        # ensemble_proba contains prediction probs of each model in ensemble
        # for the target class, shape: (n_models, batch_size)
        exp_logits = torch.exp(1 - self.theta * ensemble_proba)  # Apply exponential and 1-theta*m(x) loss
        avg_exp_logits = torch.mean(exp_logits, dim=0)  # Average over models, Shape: (batch_size)        
        risk = (1 / self.theta) * torch.log(avg_exp_logits + 1e-10)  # Add small constant for numerical stability
        return risk


# Implement the RecourseGenerator class
class EntropicCE(CEGenerator):
    # You must implement the _generation_method function, this returns the CE for a given
    # instance, if you take any extra arguments make sure to specify them before **kwargs,
    # like we have done for n and seed (they must have some default value)
    def _generation_method(self, 
                           instance, 
                           column_name="target", 
                           neg_value=0,
                           max_iter=10,
                           theta=1.0,
                           tau=0.5,
                           lr=0.01,
                           epsilon=0.1,
                           seed=None, 
                           ref_model_idx=None,
                           device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
                           **kwargs):
        # Remember, the RecourseGenerator has access to its Task! Use this to get access to your dataset or model,
        # or to use any of their methods, here we use the ClassificationTask's get_random_positive_instance() method

        model_ensemble = self.task.model.model_ensemble
        n_models = len(model_ensemble)
        if ref_model_idx is None:
            ref_model_idx = np.random.randint(0, n_models, random_state=seed)
        
        ref_model = model_ensemble[ref_model_idx]
        ref_task = ClassificationTask(ref_model, self.task.dataset)
        ce_gen = Wachter(self.task)
        ref_ce = ce_gen.generate_for_instance(instance, neg_value=neg_value, device=device)

        ref_ce = torch.Tensor(ref_ce.to_numpy()).to(device)
        ent_ce = torch.nn.Variable(ref_ce.clone(), requires_grad=True).to(device)

        optimiser = torch.optim.Adam([ent_ce], lr, amsgrad=True)
        entropic_risk = EntropicRisk(theta).to(device)

        iterations = 0
        cf_is_valid = False
        while not cf_is_valid and iterations <= max_iter:
            optimiser.zero_grad()
            class_prob = self.task.model.predict_ensemble_proba_tensor(ent_ce)
            risk = entropic_risk(class_prob[:, 1-neg_value])  # Assuming binary classification, get probs for positive class
            risk.sum().backward()
            optimiser.step()

            # break conditions
            if risk.item() < tau:
                cf_is_valid = True
            iterations += 1
        
        if not cf_is_valid:
            print("Warning: Entropic CE generation did not converge to a valid counterfactual within the max iterations.")

        res = pd.DataFrame(ent_ce.detach().numpy()).T
        res.columns = instance.index
        
        return res 

In [70]:
# Each counterfactual explanation generator takes the task on creation, it can also take a custom distance function, but for now we will use the default one.
ce_gen = EntropicCE(task)

# Get negative instances, the default column_name is always "target" but you can set it to the name of your dataset's target variable
negs = dl.get_negative_instances(neg_value=0, column_name="target")
print("Negative instances shape: ", negs.shape)
print(f"Example of a prediction for a negative instance:\n")
print(negs.head(1))
print("Output: ", emodel.predict(negs.head(1)).values.item())
print("Class: ", int(emodel.predict(negs.head(1)).values.item() > 0.5))  # Assuming binary classification with threshold 0.5

# You can generate for a set of instances stored in a DataFrame
print("\nGenerating counterfactual explanations using STCE for the first 5 negative instances:")
ce = ce_gen.generate_for_instance(negs.iloc[0], device='cpu')
print(ce)
print("Output: ", model.predict(ce).values.item())
print("Class: ", int(model.predict(ce).values.item() > 0.5))  # Assuming binary classification with threshold 0.5

Negative instances shape:  (50, 4)
Example of a prediction for a negative instance:

   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)
0           0.222222             0.625           0.067797          0.041667


ValueError: Must pass 2-d input. shape=()

In [4]:
# You can also implement a method to generate CEs for all the negative instance in one shot
ces = ce_gen.generate_for_all(neg_value=0, column_name="target")
print("All outputs are positive? ", np.all(model.predict(ces)>0.5))

All outputs are positive?  True


# Implementing your own CE Generator

Here is an example of creating your own RecourseGenerator. Let's make a simple one which gets
n different positive instances and chooses a random one. Let's say it also allows a random seed value.

Within the CEGenerator you can access:

- The Task - self.Task
- The DatasetLoader - self.task.training_data
- The BaseModel - self.task.model

and their respective methods. If your method needs additional arguments, you can put them in the function signature
but do NOT remove any other arguments (including **kwargs). Remember to return a DataFrame!

Here is our new CE in use below:

In [6]:
# Create RecourseGenerator
random_ce = RandomCE(task)

# Test it
ces = random_ce.generate_for_all()
print(ces)

    sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)
0            0.694444          0.333333           0.644068          0.541667
1            0.472222          0.083333           0.508475          0.375000
2            0.694444          0.333333           0.644068          0.541667
3            0.472222          0.083333           0.508475          0.375000
4            0.527778          0.083333           0.593220          0.583333
..                ...               ...                ...               ...
91           0.694444          0.333333           0.644068          0.541667
92           0.555556          0.125000           0.576271          0.500000
93           0.694444          0.333333           0.644068          0.541667
94           0.472222          0.083333           0.508475          0.375000
95           0.555556          0.125000           0.576271          0.500000

[96 rows x 4 columns]


We can verify it by seeing all the predictions for the CEs are positive.

In [7]:
print(model.predict(ces))

           0
0   0.501608
1   0.505629
2   0.501608
3   0.505629
4   0.502795
..       ...
91  0.501608
92  0.505989
93  0.501608
94  0.505629
95  0.505989

[96 rows x 1 columns]


# Benchmarking your method

After you have finished implementing your method, you can include it into DefaultBenchmark.py file and test it against other methods supported in the library using this lines of code:

In [None]:
from robustx.lib.DefaultBenchmark import default_benchmark
methods = ["KDTreeNNCE", "MCER"]
evaluations = ["Validity", "Distance"]
default_benchmark(task, methods, evaluations, neg_value=0, column_name="target", delta=0.005)

+------------+----------------------+------------+------------+
| Method     |   Execution Time (s) |   Validity |   Distance |
| KDTreeNNCE |            0.0493832 |          1 |   0.533264 |
+------------+----------------------+------------+------------+
| MCER       |            0.938715  |          1 |   0.407967 |
+------------+----------------------+------------+------------+
