## 0.0 Notebook Setup

We can pip install their library but I think it would be better to use their code as inspiration and develop our own instance. 

Their code was developed for a hackathon so could have been rushed / contain errors (i.e. has not been rigorously reviewed). 

In [None]:
#!pip install fairtorch

In [2]:
#from fairtorch import ConstraintLoss, DemographicParityLoss, EqualiedOddsLoss
import random
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
import os
import torch
from torch import nn
from torch.nn import functional as F

## 1.0 Code for Regularizers

Can be found in the [fairtorch file](https://github.com/wbawakate/fairtorch/tree/master/fairtorch) of the repo in the constraint.py file. 


Once these regularziers are defined, they can be very simply appended to the gradient descent loss function in pytorch.

We define forward() function and backward() will be automatically computed [Source](https://discuss.pytorch.org/t/does-backward-function-call-forward-function/84163).


Can also explore: 
- Equal Odds 
- Calibration

In [1]:
import torch
from torch import nn
from torch.nn import functional as F


class ConstraintLoss(nn.Module):
    def __init__(self, n_class=2, alpha=1, p_norm=2):
        super(ConstraintLoss, self).__init__()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.alpha = alpha
        self.p_norm = p_norm
        self.n_class = n_class
        self.n_constraints = 2
        self.dim_condition = self.n_class + 1
        self.M = torch.zeros((self.n_constraints, self.dim_condition))
        self.c = torch.zeros(self.n_constraints)

    def mu_f(self, X=None, y=None, sensitive=None):
        return torch.zeros(self.n_constraints)

    def forward(self, X, out, sensitive, y=None):
        # Reshapes sensitive attribute tensor to the same shape as the output 
        sensitive = sensitive.view(out.shape)
        # Reshapes y (label) tensor to the same shape as the output 
        if isinstance(y, torch.Tensor):
            y = y.view(out.shape)
        # Get probability output by applying sigmoid (like logistic regression?)
        out = torch.sigmoid(out)
        # Get the mu_f value given these tensors 
        mu = self.mu_f(X=X, out=out, sensitive=sensitive, y=y)
        # Gap constraint???
        # Apply relu to matrix vector product of M compared to mu-c 
        # Using cuda if applicable (self.device references)
        gap_constraint = F.relu(
            torch.mv(self.M.to(self.device), mu.to(self.device)) - self.c.to(self.device)
        )
        # Using the L2 Norm of the gap constraint as regularizer with alpha parameter. 
        if self.p_norm == 2:
            cons = self.alpha * torch.dot(gap_constraint, gap_constraint)
        else:
            cons = self.alpha * torch.dot(gap_constraint.detach(), gap_constraint)
        return cons


class DemographicParityLoss(ConstraintLoss):
    def __init__(self, sensitive_classes=[0, 1], alpha=1, p_norm=2):
        """loss of demograpfhic parity

        Args:
            sensitive_classes (list, optional): list of unique values of sensitive attribute. Defaults to [0, 1].
            alpha (int, optional): [description]. Defaults to 1.
            p_norm (int, optional): [description]. Defaults to 2.
        """
        self.sensitive_classes = sensitive_classes
        self.n_class = len(sensitive_classes)
        super(DemographicParityLoss, self).__init__(
            n_class=self.n_class, alpha=alpha, p_norm=p_norm
        )
        
        self.n_constraints = 2 * self.n_class
        self.dim_condition = self.n_class + 1
        self.M = torch.zeros((self.n_constraints, self.dim_condition))
        for i in range(self.n_constraints):
            j = i % 2
            if j == 0:
                self.M[i, j] = 1.0
                self.M[i, -1] = -1.0
            else:
                self.M[i, j - 1] = -1.0
                self.M[i, -1] = 1.0
            print(self.M)
        self.c = torch.zeros(self.n_constraints)

    def mu_f(self, X, out, sensitive, y=None):
        expected_values_list = []
        for v in self.sensitive_classes:
            # Get the index for each of the senstive classes 
            idx_true = sensitive == v  # torch.bool
            # Get the average prediction for that sensitive class
            expected_values_list.append(out[idx_true].mean())
        # Append the overall mean to the expected values list
        print(expected_values_list) 
        expected_values_list.append(out.mean())
        return torch.stack(expected_values_list)

    def forward(self, X, out, sensitive, y=None):
        return super(DemographicParityLoss, self).forward(X, out, sensitive)

# Convex 
class EqualiedOddsLoss(ConstraintLoss):
    def __init__(self, sensitive_classes=[0, 1], alpha=1, p_norm=2):
        """loss of demograpfhic parity

        Args:
            sensitive_classes (list, optional): list of unique values of sensitive attribute. Defaults to [0, 1].
            alpha (int, optional): [description]. Defaults to 1.
            p_norm (int, optional): [description]. Defaults to 2.
        """
        self.sensitive_classes = sensitive_classes
        self.y_classes = [0, 1]
        self.n_class = len(sensitive_classes)
        self.n_y_class = len(self.y_classes)
        super(EqualiedOddsLoss, self).__init__(n_class=self.n_class, alpha=alpha, p_norm=p_norm)
        # K:  number of constraint : (|A| x |Y| x {+, -})
        self.n_constraints = self.n_class * self.n_y_class * 2
        # J : dim of conditions  : ((|A|+1) x |Y|)
        self.dim_condition = self.n_y_class * (self.n_class + 1)
        self.M = torch.zeros((self.n_constraints, self.dim_condition))
        # make M (K * J): (|A| x |Y| x {+, -})  *   (|A|+1) x |Y|) )
        self.c = torch.zeros(self.n_constraints)
        element_K_A = self.sensitive_classes + [None]
        for i_a, a_0 in enumerate(self.sensitive_classes):
            for i_y, y_0 in enumerate(self.y_classes):
                for i_s, s in enumerate([-1, 1]):
                    for j_y, y_1 in enumerate(self.y_classes):
                        for j_a, a_1 in enumerate(element_K_A):
                            i = i_a * (2 * self.n_y_class) + i_y * 2 + i_s
                            j = j_y + self.n_y_class * j_a
                            self.M[i, j] = self.__element_M(a_0, a_1, y_1, y_1, s)

    def __element_M(self, a0, a1, y0, y1, s):
        if a0 is None or a1 is None:
            x = y0 == y1
            return -1 * s * x
        else:
            x = (a0 == a1) & (y0 == y1)
            return s * float(x)

    def mu_f(self, X, out, sensitive, y):
        expected_values_list = []
        for u in self.sensitive_classes:
            for v in self.y_classes:
                idx_true = (y == v) * (sensitive == u)  # torch.bool
                expected_values_list.append(out[idx_true].mean())
        # sensitive is star
        for v in self.y_classes:
            idx_true = y == v
            expected_values_list.append(out[idx_true].mean())
        return torch.stack(expected_values_list)

    def forward(self, X, out, sensitive, y):
        return super(EqualiedOddsLoss, self).forward(X, out, sensitive, y=y)

In [4]:
# What is M? 
n_class=2
n_constraints = 2 * n_class
dim_condition = n_class + 1
M = torch.zeros((n_constraints, dim_condition))
for i in range(n_constraints):
    j = i % 2
    if j == 0:
        M[i, j] = 1.0
        M[i, -1] = -1.0
    else:
        M[i, j - 1] = -1.0
        M[i, -1] = 1.0
print(M)

tensor([[ 1.,  0., -1.],
        [-1.,  0.,  1.],
        [ 1.,  0., -1.],
        [-1.,  0.,  1.]])


**Equalized Odds:** This loss function ensures that the model produces similar false positive and false negative rates across different groups. Mathematically, it can be expressed as the sum of absolute differences between false positive rates and false negative rates across different groups.

**Demographic Parity:** This loss function ensures that the model produces similar probabilities of positive outcome across different groups. Mathematically, it can be expressed as the absolute difference between the probabilities of positive outcome for different groups.

**Calibration:** This loss function ensures that the model produces similar predicted probabilities of positive outcome and actual probabilities of positive outcome across different groups. Mathematically, it can be expressed as the sum of squared differences between predicted probabilities and actual probabilities of positive outcome across different groups.

**Bounded Group Loss:** This loss function ensures that the model produces similar performance for different groups while still achieving high overall performance. Mathematically, it can be expressed as a combination of accuracy and a group fairness metric, such as the difference between false positive rates or the difference between positive predictive values for different groups.


## 2.0 Prep Artificial Dataset

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True


seed_everything(2020)

In [6]:
n_samples = 512
n_feature = 4

def genelate_data(n_samples = n_samples, n_feature=n_feature):

    y = np.random.randint(0, 2, size=n_samples)
    loc0 = np.random.uniform(-1, 1, n_feature)
    loc1 = np.random.uniform(-1, 1, n_feature)

    X = np.zeros((n_samples, n_feature))
    for i, u in enumerate(y):
        if y[i] ==0:
            X[i] = np.random.normal(loc = loc0, scale=1.0, size=n_feature)  
        else:
            X[i] = np.random.normal(loc = loc1, scale=1.0, size=n_feature)  

    sensi_feat = (X[:, 0] > X[:, 0].mean()).astype(int)
    X[:, 0] = sensi_feat.astype(np.float32)
    X = torch.from_numpy(X).float()
    y = torch.from_numpy(y).float()
    sensi_feat = torch.from_numpy(sensi_feat)
    return X, y, sensi_feat

In [7]:
dataset = genelate_data(1024)
# data split
n_train = int(0.7*len(dataset[0]))
X_train, y_train, sensi_train = map(lambda x : x[:n_train], dataset)
X_test, y_test, sensi_test = map(lambda x : x[n_train:], dataset)

In [None]:
model = nn.Sequential(nn.Linear(n_feature,1))

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(),lr=0.1)

for i in range(0, 200):
    optimizer.zero_grad()    
    logit = model(X_train)
    loss = criterion(logit.view(-1), y_train)
    
    loss.backward()
    optimizer.step()
y_pred = (torch.sigmoid(model(X_test)).view(-1) > 0.5 ).float()
acc_test = (y_pred  == y_test ).float().mean().item()

print("acc test: ",acc_test)

acc_test_vanilla = acc_test

gap_vanilla = np.abs(y_pred[sensi_test==0].mean().item() - y_pred[sensi_test==1].mean().item())
print("gap of expected values: ", gap_vanilla)

acc test:  0.8376623392105103
gap of expected values:  0.686927929520607


In [17]:
criterion = nn.BCEWithLogitsLoss()
model = nn.Sequential(nn.Linear(n_feature,1))
optimizer = optim.SGD(model.parameters(),lr=0.1)
criterion.__class__.__name__

'BCEWithLogitsLoss'

In [None]:
model = nn.Sequential(nn.Linear(n_feature,1))

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(),lr=0.1)

regularizers = []



for i in range(0, 200):
    optimizer.zero_grad()    
    logit = model(X_train)
    loss = criterion(logit.view(-1), y_train)
    

In [None]:
dim_hiden = 32
model = nn.Sequential(nn.Linear(n_feature,1))

dp_loss = DemographicParityLoss(sensitive_classes=[0, 1], alpha=100) # constraint 
optimizer = optim.SGD(model.parameters(),lr=0.1)


 
# train 
for i in range(0, 100):
    optimizer.zero_grad()    
    logit = model(X_train)
    loss = criterion(logit.view(-1), y_train)
    loss +=  dp_loss(X_train, logit, sensi_train) # add constraint
    loss.backward()
    optimizer.step()
y_pred = (torch.sigmoid(model(X_test)).view(-1) > 0.5 ).float()
acc_test = (y_pred  == y_test ).float().mean().float().item()

print("acc test: ",acc_test)

acc_test_vanilla = acc_test

gap_dp = np.abs(y_pred[sensi_test==0].mean().item() - y_pred[sensi_test==1].mean().item())
print("gap of expected values: ", gap_dp)

tensor([[ 1.,  0., -1.],
        [ 0.,  0.,  0.],
        [ 0.,  0.,  0.],
        [ 0.,  0.,  0.]])
tensor([[ 1.,  0., -1.],
        [-1.,  0.,  1.],
        [ 0.,  0.,  0.],
        [ 0.,  0.,  0.]])
tensor([[ 1.,  0., -1.],
        [-1.,  0.,  1.],
        [ 1.,  0., -1.],
        [ 0.,  0.,  0.]])
tensor([[ 1.,  0., -1.],
        [-1.,  0.,  1.],
        [ 1.,  0., -1.],
        [-1.,  0.,  1.]])
[tensor(0.5046, grad_fn=<MeanBackward0>), tensor(0.4542, grad_fn=<MeanBackward0>)]
[tensor(0.4933, grad_fn=<MeanBackward0>), tensor(0.4737, grad_fn=<MeanBackward0>)]
[tensor(0.4957, grad_fn=<MeanBackward0>), tensor(0.4828, grad_fn=<MeanBackward0>)]
[tensor(0.5006, grad_fn=<MeanBackward0>), tensor(0.4893, grad_fn=<MeanBackward0>)]
[tensor(0.5057, grad_fn=<MeanBackward0>), tensor(0.4948, grad_fn=<MeanBackward0>)]
[tensor(0.5105, grad_fn=<MeanBackward0>), tensor(0.4997, grad_fn=<MeanBackward0>)]
[tensor(0.5148, grad_fn=<MeanBackward0>), tensor(0.5043, grad_fn=<MeanBackward0>)]
[tensor(0.5189,