In [21]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim
from torch.utils import data

from tqdm.notebook import tqdm_notebook

import warnings
warnings.filterwarnings('ignore')

device = torch.device("mps" if torch.has_mps else "cpu")
print(device)
from itertools import product
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from aif360.algorithms.postprocessing import (CalibratedEqOddsPostprocessing,
                                              EqOddsPostprocessing,
                                              RejectOptionClassification)
from aif360.datasets import StandardDataset
from aif360.metrics import BinaryLabelDatasetMetric, ClassificationMetric
from aif360.metrics.utils import compute_boolean_conditioning_vector
random_state = 1

mps


In [22]:
def val(model,dataloader):
    y_true, y_pred, y_prot = [],[],[]
    model.eval()
    with torch.no_grad():
        for X_val, y_val,prot in dataloader:
            X_val, y_val = X_val.to(device), y_val.to(device)
            val_out = model(X_val)
            val_pred = torch.round(torch.sigmoid(val_out))[:,0]
            y_true.append(y_val)
            y_pred.append(val_pred)
            y_prot.append(prot)
    y_true, y_pred, y_prot = torch.cat(y_true), torch.cat(y_pred), torch.cat(y_prot)
    return y_true, y_pred, y_prot

def to_dataframe(y_true, y_pred, y_prot):
        y_true, y_pred, y_prot = y_true.float().cpu().numpy(), y_pred.float().cpu().numpy(), y_prot.float().cpu().numpy()
        df = pd.DataFrame({'y_true': y_true, 'y_pred': y_pred, 'y_prot': y_prot})
        dataset = StandardDataset(df, 'y_true', [1.], ['y_prot'], [[1.]])
        dataset.scores = y_pred.reshape(-1, 1)
        return dataset
        
def eval_aif360_algorithm(y_pred, dataset, verbose=True):
    acc = float(np.mean(y_pred == dataset.labels.reshape(-1)))
    bias = compute_bias(
        torch.tensor(y_pred),
        torch.tensor(dataset.labels.reshape(-1)),
        torch.tensor(dataset.protected_attributes.reshape(-1)),
        'aod'
    ).item()
    obj = compute_objective(acc, bias, margin=0.0)

    return {
        'roc_auc': None,
        'accuracy': float(acc),
        'bias': float(bias),
        'objective': float(obj)
    }

def compute_bias(y_pred, y_true, prot, metric):
    """Compute bias on the dataset"""
    def zero_if_nan(data):
        """Zero if there is a nan"""
        return 0. if torch.isnan(data) else data

    gtpr_prot = zero_if_nan(y_pred[prot * y_true == 1].mean())
    gfpr_prot = zero_if_nan(y_pred[prot * (1-y_true) == 1].mean())
    mean_prot = zero_if_nan(y_pred[prot == 1].mean())

    gtpr_unprot = zero_if_nan(y_pred[(1-prot) * y_true == 1].mean())
    gfpr_unprot = zero_if_nan(y_pred[(1-prot) * (1-y_true) == 1].mean())
    mean_unprot = zero_if_nan(y_pred[(1-prot) == 1].mean())

    if metric == "spd":
        return mean_prot - mean_unprot
    elif metric == "aod":
        return 0.5 * ((gfpr_prot - gfpr_unprot) + (gtpr_prot - gtpr_unprot))
    elif metric == "eod":
        return gtpr_prot - gtpr_unprot
        
def compute_objective(performance, bias, epsilon=0.05, margin=0.01):
    if abs(bias) <= (epsilon-margin):
        return performance
    else:
        return 0.0

In [20]:
y_true, y_pred, y_prot = val(model,val_loader)
aif_data = to_dataframe(y_true, y_pred, y_prot)
aif_data.protected_attribute_names

dict_keys(['feature_names', 'label_names', 'protected_attribute_names', 'instance_names', 'instance_weights', 'privileged_protected_attributes', 'unprivileged_protected_attributes'])

In [None]:
cost_constraint = 'weighted'
privileged_groups=[{'y_prot': 0.}],
unprivileged_groups=[{'y_prot': 1.}]
cpp = CalibratedEqOddsPostprocessing(privileged_groups=[{'y_prot': 0.}],
                                             unprivileged_groups=[{'y_prot': 1.}],
                                             cost_constraint=cost_constraint)
cpp.fit(aif_data,aif_data)


In [None]:
eval_aif360_algorithm(aif_data.scores.reshape(-1), aif_data)
valid_y_pred = cpp.predict(aif_data).labels.reshape(-1)
eval_aif360_algorithm(valid_y_pred, aif_data)