In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os

import numpy as np
import torch

from counterfactuals.datasets import AdultDataset, GermanCreditDataset, LawDataset
from counterfactuals.discriminative_models import MultilayerPerceptron

In [3]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

In [4]:
datasets = {
    "adult": (
        AdultDataset("../data/adult.csv"),
        "adult_disc_model.pt",
        "adult_flow.pth",
    ),
    "law": (LawDataset("../data/law.csv"), "law_disc_model.pt", "law_flow.pth"),
    "german": (
        GermanCreditDataset("../data/german_credit.csv"),
        "german_disc_model.pt",
        "german_flow.pth",
    ),
}

dataset, disc_model_path, gen_model_path = datasets["law"]

In [5]:
dataset.X_train = dataset.X_train.astype(np.float32)
dataset.X_test = dataset.X_test.astype(np.float32)

In [6]:
# disc_model = MultilayerPerceptron(dataset.X_test.shape[1], [512, 512], 2)
disc_model = MultilayerPerceptron(dataset.X_test.shape[1], [256, 256], 1)
# disc_model.fit(
#     dataset.train_dataloader(batch_size=128, shuffle=True),
#     dataset.test_dataloader(batch_size=128, shuffle=False),
#     epochs=5000,
#     patience=100,
#     lr=1e-3,
#     checkpoint_path=disc_model_path,
# )
disc_model.load(disc_model_path)
# disc_model.load("german_disc_model_onehot.pt")

  self.load_state_dict(torch.load(path))


In [7]:
y_pred = disc_model.predict(dataset.X_test).detach().numpy().flatten()
print("Test accuracy:", (y_pred == dataset.y_test).mean())

Test accuracy: 0.740990990990991


In [8]:
dataset.y_train = disc_model.predict(dataset.X_train).detach().numpy()
dataset.y_test = disc_model.predict(dataset.X_test).detach().numpy()

In [9]:
from counterfactuals.cf_methods.c_chvae import CCHVAE
from counterfactuals.cf_methods.c_chvae.data import Data
from counterfactuals.cf_methods.c_chvae.mlmodel import MLModel
from counterfactuals.datasets.law import LawDataset

  from .autonotebook import tqdm as notebook_tqdm
2025-04-17 14:49:31.383034: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-17 14:49:31.384712: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-17 14:49:31.407307: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-04-17 14:49:31.407340: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-04-17 14:49:31.407358: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Una

In [10]:
import os
from typing import Union

import numpy as np
import pandas as pd

In [None]:
class CustomData(Data):
    """
    Custom implementation of Data class for use with CCHVAE
    """

    def __init__(self, dataset, target_column="target"):
        """
        Initialize with a dataset that has transformer and feature information
        """
        self._dataset = dataset
        self._df = pd.DataFrame(
            data=dataset.X_train,
            columns=[str(i) for i in range(dataset.X_train.shape[1])],
        )
        self._df_train = self._df.copy()

        # Create test dataframe
        self._df_test = pd.DataFrame(
            data=dataset.X_test,
            columns=[str(i) for i in range(dataset.X_test.shape[1])],
        )

        # Add target column
        self._target_column = target_column
        self._df[self._target_column] = dataset.y_train
        self._df_train[self._target_column] = dataset.y_train
        self._df_test[self._target_column] = dataset.y_test

        # Setup encoder
        class Encoder:
            def get_feature_names(self, categorical):
                # Handle encoding of categorical features
                return [str(i) for i in dataset.categorical_features]

        self.encoder = Encoder()

    @property
    def categorical(self):
        """Column names of categorical features"""
        return [str(i) for i in self._dataset.categorical_features]

    @property
    def continuous(self):
        """Column names of continuous features"""
        numerical_features = list(
            set(range(self._dataset.X_train.shape[1]))
            - set(self._dataset.categorical_features)
        )
        return [str(i) for i in numerical_features]

    @property
    def immutables(self):
        """Column names of immutable features (example: demographic features)"""
        # This is application-specific - for demonstration we'll consider no features immutable
        return []

    @property
    def target(self):
        """Name of the target column"""
        return self._target_column

    @property
    def df(self):
        """Full dataframe"""
        return self._df

    @property
    def df_train(self):
        """Training dataframe"""
        return self._df_train

    @property
    def df_test(self):
        """Testing dataframe"""
        return self._df_test

    def transform(self, df):
        """Transform data (apply scaling/encoding)"""
        # Here we assume data is already transformed
        return df

    def inverse_transform(self, df):
        """Inverse transform (undo scaling/encoding)"""
        # Here we assume simple implementation for demonstration
        return df


class CustomMLModel(MLModel):
    """
    Custom implementation of MLModel for use with CCHVAE
    """

    def __init__(self, model, data: Data):
        """
        Initialize with a trained PyTorch model and dataset

        Parameters
        ----------
        model: torch.nn.Module
            A trained PyTorch model
        data: Data
            A Data object
        """
        super().__init__(data)
        self._model = model
        self._feature_input_order = [
            str(i) for i in range(len(data.categorical) + len(data.continuous))
        ]

    @property
    def feature_input_order(self):
        """Required order of features"""
        return self._feature_input_order

    @property
    def backend(self):
        """Type of backend used"""
        return "pytorch"

    @property
    def raw_model(self):
        """The raw ML model"""
        return self._model

    def predict(self, x: Union[np.ndarray, pd.DataFrame, torch.Tensor]):
        """One-dimensional prediction"""
        with torch.no_grad():
            if isinstance(x, pd.DataFrame):
                x = x[self.feature_input_order].values

            if isinstance(x, np.ndarray):
                x = torch.tensor(x, dtype=torch.float32)

            return self._model.predict(x)

    def predict_proba(self, x: Union[np.ndarray, pd.DataFrame, torch.Tensor]):
        """Two-dimensional probability prediction"""
        with torch.no_grad():
            if isinstance(x, pd.DataFrame):
                x = x[self.feature_input_order].values

            if isinstance(x, np.ndarray):
                x = torch.tensor(x, dtype=torch.float32)

            return self._model.predict_proba(x)

In [12]:
input_size = dataset.X_train.shape[1]

hyperparams = {
    "data_name": "law",
    "n_search_samples": 300,  # Increase if needed
    "p_norm": 1,
    "step": 0.1,
    "max_iter": 2000,  # Increased from 1000 for better exploration
    "clamp": True,
    "binary_cat_features": True,
    "vae_params": {
        "layers": [
            input_size,
            64,
            32,
            16,
        ],  # Deeper/wider network for better representation
        "train": True,  # Train a new autoencoder
        "kl_weight": 0.3,
        "lambda_reg": 1e-6,
        "epochs": 10,  # Increased from 5 for better VAE training
        "lr": 1e-3,
        "batch_size": 32,
    },
}

In [13]:
data_wrapper = CustomData(dataset)
model_wrapper = CustomMLModel(disc_model, data_wrapper)

In [14]:
cf_generator = CCHVAE(model_wrapper, hyperparams)

AttributeError: 'CustomData' object has no attribute 'encoder'

In [None]:
test_df = data_wrapper.df_test.copy()
test_df = test_df[test_df[data_wrapper.target] == 0]

# Select a small number of examples for demonstration
num_examples = 10  # Increased from 5 to better measure success rate
factuals = test_df.sample(num_examples).drop(columns=[data_wrapper.target])

In [None]:
factuals

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12
165,0.324324,0.52381,0.306581,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
27,0.621622,0.809524,0.308186,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
143,0.405405,0.571429,0.285714,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
271,0.351351,0.52381,0.295345,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
363,0.513514,0.619048,0.313002,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
68,0.756757,0.619048,0.338684,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
64,0.581081,0.714286,0.447833,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
164,0.581081,0.904762,0.335474,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
278,0.72973,0.571429,0.247191,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
335,0.513514,0.428571,0.266453,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0


In [None]:
model_wrapper.predict(factuals.to_numpy())

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [None]:
counterfactuals = cf_generator.get_counterfactuals_without_check(factuals)

In [None]:
counterfactuals

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12
165,0.519758,0.573048,0.392712,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
27,0.638026,0.658789,0.515756,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
143,0.514052,0.558654,0.399633,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
271,0.520251,0.503858,0.413262,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
363,0.632549,0.654185,0.51335,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
68,0.639167,0.659263,0.516597,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
64,0.637816,0.658684,0.51626,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
164,0.503601,0.4992,0.38326,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
278,0.657688,0.586958,0.51119,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
335,0.512536,0.565683,0.402039,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [None]:
model_wrapper.predict(counterfactuals.to_numpy())

tensor([1., 1., 0., 1., 0., 0., 0., 0., 0., 0.])