# FaUCI: Fairness Under Constrained Injection.
FaUCI (Fairness Under Constrained Injection) is an in-processing fairness algorithm that incorporates fairness constraints directly into the model training process through regularization.

This algorithm works by adding fairness-specific regularization terms to the loss function, which penalize the model when it produces unfair predictions. The model learns to balance between predictive accuracy and fairness during training.

The key idea is to use regularization to impose fairness constraints:
- The model is trained with a standard loss function (e.g., BCE loss by classification).
- A regularization term for fairness is added to the loss function to penalize unfair predictions
- The strength of the fairness constraint is controlled by a regularization weight parameter

FaUCI supports several fairness metrics as regularization targets:
- Statistical Parity Difference (SPD): Ensures similar prediction rates across demographic groups
- Disparate Impact (DI): Ensures that the ratio of positive prediction rates across groups is close to 1

The regularization weight controls the trade-off between fairness and accuracy:
- weight = 0: No fairness constraint (standard model)
- weight > 0: Increasing values impose stronger fairness constraints

In [1]:
import sys
import os

# Add the root directory of the project to PYTHONPATH
sys.path.append(os.path.abspath(os.path.join('..')))

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import openml

from fairlib import DataFrame
from fairlib.inprocessing import Fauci
from fairlib.metrics import statistical_parity_difference, disparate_impact

# Set random seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)

## Loading and Preparing the Adult Dataset.
We will use the Adult dataset from OpenML, which contains demographic information and predicts whether an individual earns more than $50K per year.

In [3]:
adult_dataset = openml.datasets.get_dataset(179)
adult_X, _, _, _ = adult_dataset.get_data(dataset_format="dataframe")

adult_X.rename(columns={'class': 'income'}, inplace=True)

adult = DataFrame(adult_X)

adult.targets = 'income'
adult.sensitive = ['sex']

adult.drop(columns=["fnlwgt"], inplace=True)

label_maps = {}

for col in adult.columns:
    if adult[col].dtype == 'object' or adult[col].dtype == 'category':
        adult[col], uniques = pd.factorize(adult[col])
        label_maps[col] = uniques

print(f"Dataset Form: {adult.shape}")
print(f"Target Column: {adult.targets}")
print(f"Sensitive Attributes: {adult.sensitive}")

adult.head()

## Exploratory Data Analysis.
We examine the distribution of income versus sex to understand potential inequalities in the dataset.

In [4]:
sex_labels = label_maps['sex'].tolist()
income_labels = label_maps['income'].tolist()

# Calculate the sex/income cross distribution.
counts = adult.groupby('sex')['income'].value_counts().unstack()

# Rename indexes and columns to make them readable
counts.index = [sex_labels[i] for i in counts.index]
counts.columns = [income_labels[i] for i in counts.columns]

# Plot
plt.figure(figsize=(10, 6))
counts.plot(kind='bar', stacked=True)
plt.title('Distribution of income by gender')
plt.xlabel('Sex')
plt.ylabel('Count')
plt.legend(title='Income')
plt.xticks(rotation=0)
plt.tight_layout()
plt.show()

## Preparing the Data for the Model.
We divide the dataset into training and test sets, and standardize the numerical characteristics.

In [5]:
X = adult.drop(columns=['income'])
y = adult['income']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
numeric_cols = ['age', 'education-num', 'capitalgain', 'capitalloss', 'hoursperweek']

X_train_scaled = X_train.copy()
X_test_scaled = X_test.copy()

X_train_scaled[numeric_cols] = scaler.fit_transform(X_train[numeric_cols])
X_test_scaled[numeric_cols] = scaler.transform(X_test[numeric_cols])

X_train_tensor = torch.tensor(X_train_scaled.values, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).reshape(-1, 1)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).reshape(-1, 1)

sensitive_idx = X_train.columns.get_loc('sex')
sensitive_train = X_train.iloc[:, sensitive_idx].values
sensitive_test = X_test.iloc[:, sensitive_idx].values

print(f"Size of the training set: {X_train_tensor.shape}")
print(f"Size of the test set: {X_test_tensor.shape}")

## Implementing the Baseline Model.
Before implementing FaUCI, we train a baseline model to establish a baseline.

In [6]:
input_dim = X_train_tensor.shape[1]

class BaseModel(nn.Module):
    def __init__(self):
        super(BaseModel, self).__init__()
        self.layer1 = nn.Linear(input_dim, 64)
        self.layer2 = nn.Linear(64, 32)
        self.layer3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.relu(self.layer2(x))
        x = self.sigmoid(self.layer3(x))
        return x

base_model = BaseModel()
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(base_model.parameters(), lr=0.001)

epochs = 100
batch_size = 128

for epoch in range(epochs):
    base_model.train()
    epoch_loss = 0.0
    correct = 0
    total = 0

    for i in range(0, len(X_train_tensor), batch_size):
        batch_X = X_train_tensor[i:i+batch_size]
        batch_y = y_train_tensor[i:i+batch_size]

        outputs = base_model(batch_X)
        loss = criterion(outputs, batch_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

        predictions = (outputs > 0.5).float()
        correct += (predictions == batch_y).sum().item()
        total += batch_y.size(0)

    if (epoch + 1) % 10 == 0:
        accuracy = 100 * correct / total
        avg_loss = epoch_loss / (len(X_train_tensor) // batch_size)
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%\n")

## Evaluation of the Base Model.
We evaluate the performance of the base model in terms of both accuracy and fairness.

In [7]:
SENSITIVE_COL_NAME="sex"
base_model.eval()
with torch.no_grad():
    y_pred_base = base_model(X_test_tensor)
    y_pred_base_binary = (y_pred_base > 0.5).float()

    X_test_baseline = X_test_tensor.detach().cpu().numpy()
    y_pred_baseline = y_pred_base_binary.detach().cpu().numpy()

    baseline_accuracy = accuracy_score(y_test, y_pred_baseline)
    print(f"Baseline model accuracy: {baseline_accuracy:.4f}")

    baseline_spd = statistical_parity_difference(y_pred_baseline, X_test[SENSITIVE_COL_NAME])
    baseline_di = disparate_impact(y_pred_baseline, X_test[SENSITIVE_COL_NAME])

    print(f"Statistical Parity Difference (SPD): {baseline_spd}")
    print(f"Disparate Impact (DI): {baseline_di}")

## Implementation of FaUCI.
We now implement FaUCI with different regularization weights to explore the trade-off between accuracy and fairness.

In [8]:
reg_weights = [0.0, 0.1, 0.3, 0.5, 0.6, 0.8]

accuracies = {}
spd_values = {}
di_values = {}

for weight in reg_weights:
    print(f" FaUCI training with regularization weight: {weight}")

    model = BaseModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    fauci_model = Fauci(
        torchModel=model,
        optimizer=optimizer,
        loss=nn.BCELoss(),
        fairness_regularization="spd",  # Statistical Parity Difference
        regularization_weight=weight
    )

    train_data = DataFrame(X_train_scaled)
    train_data['income'] = y_train.values
    train_data.targets = 'income'
    train_data.sensitive = ['sex']

    fauci_model.fit(train_data, epochs=epochs, batch_size=batch_size, verbose=False)

    base_model.eval()
    with torch.no_grad():
        y_pred_base = fauci_model.predict(X_test_tensor)
        y_pred_base_binary = (y_pred_base > 0.5).float()

        y_pred_baseline = y_pred_base_binary.detach().cpu().numpy()

        baseline_accuracy = accuracy_score(y_test, y_pred_baseline)
        print(f"Baseline model accuracy: {baseline_accuracy:.4f}")

        baseline_spd = statistical_parity_difference(y_pred_baseline, X_test[SENSITIVE_COL_NAME])
        baseline_di = disparate_impact(y_pred_baseline, X_test[SENSITIVE_COL_NAME])

        print(f"Statistical Parity Difference (SPD): {baseline_spd}")
        print(f"Disparate Impact (DI): {baseline_di}")