In [58]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from arc.PyTorch_Model_Architecture import Model
import os
from data.Data_Interface import DataInterface
from model.PyTorch_Gradient_Model_Interface import ModelInterface
from cf.PyTorch_CF import GradientCF
from eval.PyTorch_Evaluation import Evaluate
import warnings
pd.set_option('display.max_columns', None)
warnings.filterwarnings("ignore")

In [59]:
df = pd.read_csv('datasets/heart.csv')

In [60]:
X = df.drop(columns=['target'])
y = df['target']

In [61]:
X = pd.get_dummies(X, columns=['sex', 'cp', 'fbs', 'restecg', 'exang', 'slope', 'ca', 'thal'])

In [62]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [63]:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_scaled = scaler.transform(X)

In [64]:
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

In [65]:
input_size = X_train.shape[1]
surrogate_model = Model(input_size)

In [66]:
new_target = model.predict(X_scaled)

In [67]:
X_train, X_test, y_train, y_test = train_test_split(X, new_target, test_size=0.2, random_state=42)

In [68]:
X_train = np.array(X_train, dtype=np.float32)
X_test = np.array(X_test, dtype=np.float32)

In [69]:
X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
y_tensor = torch.tensor(new_target, dtype=torch.float32).view(-1, 1)
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

In [70]:
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(surrogate_model.parameters(), lr=0.001)

In [71]:
epochs = 500
for epoch in range(epochs):
    optimizer.zero_grad()
    outputs = surrogate_model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item()}')

Epoch [10/500], Loss: 0.6788796186447144
Epoch [20/500], Loss: 0.6462814807891846
Epoch [30/500], Loss: 0.6073206067085266
Epoch [40/500], Loss: 0.5912814736366272
Epoch [50/500], Loss: 0.5814175605773926
Epoch [60/500], Loss: 0.5752567648887634
Epoch [70/500], Loss: 0.5685024261474609
Epoch [80/500], Loss: 0.5618963837623596
Epoch [90/500], Loss: 0.5533291101455688
Epoch [100/500], Loss: 0.5437347292900085
Epoch [110/500], Loss: 0.5336272120475769
Epoch [120/500], Loss: 0.522408127784729
Epoch [130/500], Loss: 0.5108649730682373
Epoch [140/500], Loss: 0.49798160791397095
Epoch [150/500], Loss: 0.48400062322616577
Epoch [160/500], Loss: 0.46795257925987244
Epoch [170/500], Loss: 0.4511812627315521
Epoch [180/500], Loss: 0.43289268016815186
Epoch [190/500], Loss: 0.412040114402771
Epoch [200/500], Loss: 0.38909873366355896
Epoch [210/500], Loss: 0.3712584376335144
Epoch [220/500], Loss: 0.3555489778518677
Epoch [230/500], Loss: 0.3430408537387848
Epoch [240/500], Loss: 0.33222696185112


In [72]:
with torch.no_grad():
    surrogate_model.eval()
    outputs = surrogate_model(X_test_tensor)
    predicted = (outputs >= 0.5).float()
    accuracy = (predicted == y_test_tensor).sum().item() / y_test.shape[0]
    print(f'Test Accuracy: {accuracy}')

Test Accuracy: 0.9180327868852459


In [73]:
model_path = 'pytorch_model.pth'
torch.save(surrogate_model.state_dict(), model_path)

In [74]:
d = DataInterface(dataframe=df, target='target', continuous_features=['age', 'trestbps', 'chol', 'thalach', 'oldpeak'])
m = ModelInterface(model_path='/Users/jacobsanderson/Documents/NewCF/pytorch_model.pth', input_size = input_size)

In [75]:
query_index = np.random.choice(len(df))
query_instance = df.iloc[8].drop('target').to_list()

In [76]:
exp = GradientCF(d, m)
cf = exp.generate_counterfactuals(query_instance, total_CFs=5, desired_ranges={'trestbps': [100, 200]}, desired_directions={'age': 'increase'}, immutable_features=['sex'])

Desired class: tensor([0.], grad_fn=<RsubBackward1>)
All CFs are classified as the desired class.
Final Gradient: tensor(0.6187)
Final Loss: tensor(2.0004, grad_fn=<AddBackward0>)
Unacceptably high loss. Perturbing relevant features.
All CFs are classified as the desired class.
Final Gradient: tensor(0.6289)
Final Loss: tensor(1.6356, grad_fn=<AddBackward0>)
Unacceptably high loss. Perturbing relevant features.
All CFs are classified as the desired class.
Final Gradient: tensor(0.6170)
Final Loss: tensor(1.6724, grad_fn=<AddBackward0>)
Unacceptably high loss. Perturbing relevant features.
All CFs are classified as the desired class.
Final Gradient: tensor(0.6556)
Final Loss: tensor(1.6529, grad_fn=<AddBackward0>)
Unacceptably high loss. Perturbing relevant features.
All CFs are classified as the desired class.
Final Gradient: tensor(0.6494)
Final Loss: tensor(1.6595, grad_fn=<AddBackward0>)
Unacceptably high loss. Perturbing relevant features.
All CFs are classified as the desired clas

In [77]:
query_instance, cf_instances = cf
query_instance = torch.Tensor(query_instance).float()
cf_instances = torch.Tensor(cf_instances).float()

In [78]:
eval = Evaluate(d, m)
eval.evaluate(query_instance, cf_instances, X_tensor, k=1)

Proximity: tensor(0.2687)
Sparsity: tensor(8.6000)
Diversity: tensor(0.9124)
Plausibility: tensor(14.0489)
Confidence: tensor(0.4592, grad_fn=<MeanBackward0>)


Testing Metrics

In [161]:
repeated_query_instance = query_instance.repeat(5, 1)

In [162]:
def evaluate_proximity(query_instance, cf_instances):
        """Evaluates the proximity of the counterfactuals to the query instance.

        :param query_instance: Query instance.
        :param cf_instances: Counterfactual instances.
        :return: Proximity scores.
        """
        proximity_scores = torch.mean(torch.abs(cf_instances - query_instance))
        return proximity_scores

In [163]:
def compute_sparsity_loss(query_instance, cf_instances, epsilon=torch.sqrt(torch.tensor(torch.finfo(torch.float32).eps))):
        """
        Calculate the number of features changed from the original instance to each counterfactual instance,
        ignoring small differences for numerical features, using PyTorch tensors.
        """
        differences = torch.zeros_like(cf_instances, dtype=torch.bool)
        differences = torch.abs(cf_instances - query_instance[0]) > epsilon
        differences = torch.sum(differences, axis=1)
        return torch.mean(differences.float())

In [164]:
def compute_plausibility_loss(cf_instances, observed_instances, k=1):
        """
        Calculate the plausibility term.
        """
        diff = torch.abs(cf_instances[:, None, :] - observed_instances[None, :, :])
        distances = torch.sum(diff, dim=2)
        sorted_distances, _ = torch.sort(distances, dim=1)
        nearest_distances = sorted_distances[:, :k]
        return torch.mean(nearest_distances)

In [165]:
def compute_dist(x1, x2):
        return torch.sum((torch.abs(x1 - x2)), dim = 0)

def dpp_style(cf_instances):
        """Computes the DPP of a matrix."""
        total_CFs = cf_instances.size(0)
        det_entries = torch.ones(total_CFs, total_CFs)
        for i in range(total_CFs):
            for j in range(total_CFs):
                det_entries[i, j] = compute_dist(cf_instances[i], cf_instances[j])

        det_entries = 1.0 / (1.0 + det_entries)

        #det_entries += torch.eye(total_CFs) * 0.0001
        return torch.det(det_entries)

def evaluate_diversity(cf_instances):
        """Computes the third part (diversity) of the loss function."""
        total_CFs = cf_instances.size(0)
        if total_CFs == 1:
            return torch.tensor(0.0)
        else:
            return dpp_style(cf_instances)

In [166]:
test_prox = evaluate_proximity(query_instance, query_instance)
test_prox

tensor(0.)

In [167]:
test_spars = compute_sparsity_loss(query_instance, query_instance)
test_spars

tensor(0.)

In [168]:
test_plaus = compute_plausibility_loss(repeated_query_instance, query_instance, k=1)
test_plaus

tensor(0.)

In [169]:
test_div = evaluate_diversity(repeated_query_instance)
test_div

tensor(0.)

In [81]:
def apply_perturbation(query_instance, feature_indices, perturbation_values):
    """
    Applies perturbations to specified features of the query instance.

    :param query_instance: Original query instance (as a tensor or numpy array).
    :param feature_indices: List of indices for features to be perturbed.
    :param perturbation_values: List of values by which to perturb the features.
                                This can be a fixed value or a percentage.
    :return: A new instance with perturbations applied.
    """
    perturbed_instance = query_instance.clone()
    print(perturbed_instance)
    for feature, perturbation in zip(feature_indices, perturbation_values):
        perturbed_instance[0][feature] += perturbation
    print(perturbed_instance)  
    return perturbed_instance

In [140]:
feature_index = 0  # Index of the feature to perturb
perturbation_value = 0.5  # The value to add/subtract from the feature
perturbed_instance = apply_perturbation(query_instance, [feature_index], [perturbation_value])

tensor([[-0.2610,  2.3060, -0.9134,  0.5402, -0.4655,  0.0000,  1.0000,  0.0000,
          0.0000,  1.0000,  0.0000,  0.0000,  1.0000,  0.0000,  1.0000,  0.0000,
          1.0000,  0.0000,  0.0000,  0.0000,  1.0000,  1.0000,  0.0000,  0.0000,
          0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  1.0000]])
tensor([[ 0.2390,  2.3060, -0.9134,  0.5402, -0.4655,  0.0000,  1.0000,  0.0000,
          0.0000,  1.0000,  0.0000,  0.0000,  1.0000,  0.0000,  1.0000,  0.0000,
          1.0000,  0.0000,  0.0000,  0.0000,  1.0000,  1.0000,  0.0000,  0.0000,
          0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  1.0000]])


In [84]:
test_prox = evaluate_proximity(query_instance, perturbed_instance)
test_prox

tensor(0.0167)

In [85]:
test_spars = compute_sparsity_loss(query_instance, perturbed_instance)
test_spars

tensor(1.)

In [170]:
test_plaus = compute_plausibility_loss(query_instance, perturbed_instance, k=1)
test_plaus

tensor(0.5000)