## Inverse frequency reweighting and adversarial debiasing



### Preprocessing (repeated from baseline)

In [None]:
!pip install torch torchvision numpy matplotlib seaborn scikit-learn facenet-pytorch

In [4]:
import torch
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import DataLoader
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

In [5]:
# ref: https://www.kaggle.com/datasets/shuvoalok/raf-db-dataset
# do this so we don't have to upload the zip file manually every time we run this notebook
import kagglehub

path = kagglehub.dataset_download("shuvoalok/raf-db-dataset")
print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/shuvoalok/raf-db-dataset?dataset_version_number=2...


100%|██████████| 37.7M/37.7M [00:00<00:00, 119MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/shuvoalok/raf-db-dataset/versions/2


In [6]:
import shutil
destination_dir = '/content/raf-db'
shutil.copytree(path, destination_dir)

'/content/raf-db'

In [7]:
# load labels from train/test
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os

train_labels = pd.read_csv("raf-db/train_labels.csv")
test_labels = pd.read_csv("raf-db/test_labels.csv")
transform = transforms.Compose([
    transforms.Resize((128, 128)), # for ResNet
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

class RAFDBDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        # need to take into account the file structure: e.g. DATASET/train/1/train_0001_aligned.jpg
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # file walkthrough: with some help from LLM
        for label in sorted(os.listdir(root_dir)):
            label_path = os.path.join(root_dir, label)
            if os.path.isdir(label_path): # should be subdirectory
                for img_name in os.listdir(label_path):
                    self.image_paths.append(os.path.join(label_path, img_name))
                    self.labels.append(int(label) - 1) # convert to zero indexing

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        return image, label

train_dir = "raf-db/DATASET/train"
test_dir = "raf-db/DATASET/test"

train_dataset = RAFDBDataset(root_dir=train_dir, transform=transform)
test_dataset = RAFDBDataset(root_dir=test_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [14]:
import torch.nn.functional as F

# model (copied from baseline notebook) with dropout, early stopping, weight decay
class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = F.relu(out)

        return out

# with dropout
class ResNet(nn.Module):
    def __init__(self, num_classes=7, dropout_prob=0.5):
        super(ResNet, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(64, 64, 2)
        self.layer2 = self._make_layer(64, 128, 2, stride=2)
        self.layer3 = self._make_layer(128, 256, 2, stride=2)
        self.layer4 = self._make_layer(256, 512, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        self.dropout = nn.Dropout(dropout_prob)
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, in_channels, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        layers = []
        layers.append(ResNetBlock(in_channels, out_channels, stride, downsample))
        for _ in range(1, blocks):
            layers.append(ResNetBlock(out_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)

        x = self.dropout(x)
        x = self.fc(x)

        return x

### Inverse frequency reweighting

In [1]:
import numpy as np

# race labels from the FairFace embeddings task
raf_race_labels_train = np.load('raf_race_labels_train.npy', allow_pickle=True).item()

print(f"Total training samples with race labels: {len(raf_race_labels_train)}")
print("Sample:", list(raf_race_labels_train.items())[:5])

Total training samples with race labels: 12271
Sample: [('train_08777_aligned.jpg', np.str_('White')), ('train_04175_aligned.jpg', np.str_('White')), ('train_06866_aligned.jpg', np.str_('Latino')), ('train_05827_aligned.jpg', np.str_('Latino')), ('train_08913_aligned.jpg', np.str_('Middle_Eastern'))]


#### Compute inverse frequency weights

In [2]:
from collections import Counter

# no. samples per race
race_counts = Counter(raf_race_labels_train.values())
print("Race counts:", race_counts)

total_samples = sum(race_counts.values())
num_races = len(race_counts)

# compute inverse frequency weights
race_weights = {race: total_samples / (num_races * count) for race, count in race_counts.items()}
print("Race weights (inverse frequency):", race_weights)

Race counts: Counter({np.str_('White'): 7525, np.str_('Latino'): 2401, np.str_('Black'): 1178, np.str_('Middle_Eastern'): 674, np.str_('Asian'): 493})
Race weights (inverse frequency): {np.str_('White'): 0.32613953488372094, np.str_('Latino'): 1.0221574344023323, np.str_('Middle_Eastern'): 3.641246290801187, np.str_('Black'): 2.0833616298811544, np.str_('Asian'): 4.978093306288033}


#### Load dataset and train

In [9]:
# modify the original RAFDBDataset class to also include race labels so we can implement the debiasing techniques

class RAFDBTrainDatasetWithRace(Dataset):
    def __init__(self, root_dir, emotions, race_labels_dict, transform=None):
        self.root_dir = root_dir
        self.emotions = emotions
        self.race_labels_dict = race_labels_dict
        self.transform = transform

        self.image_paths = []
        self.labels = []
        self.filenames = []
        self.race_labels = []

        for emotion in emotions:
            emotion_folder = os.path.join(root_dir, emotion)
            for img_name in os.listdir(emotion_folder):
                img_path = os.path.join(emotion_folder, img_name)

                self.image_paths.append(img_path)
                self.labels.append(int(emotion) - 1)  # 0-indexed labels
                self.filenames.append(img_name)

                race_label = race_labels_dict.get(img_name, "Unknown")
                self.race_labels.append(race_label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        filename = self.filenames[idx]
        race_label = self.race_labels[idx]

        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)

        return img, label, filename, race_label

In [None]:
train_dir = 'raf-db/DATASET/train'
train_emotions = sorted(os.listdir(train_dir), key=lambda x: int(x))
print("train emotions:", train_emotions)

train_dataset = RAFDBTrainDatasetWithRace(
    root_dir=train_dir,
    emotions=train_emotions,
    race_labels_dict=raf_race_labels_train,
    transform=transform
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

In [None]:
from tqdm import tqdm

# params copied over from baseline training w/ dropout, early stopping, weight decay
num_classes = 7
num_epochs = 15
patience = 5 # early stopping
dropout_prob = 0.5
weight_decay = 1e-4
learning_rate = 0.001

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet(num_classes=num_classes, dropout_prob=dropout_prob).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

for epoch in range(num_epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    with tqdm(train_loader, desc=f"epoch [{epoch+1}/{num_epochs}]", unit="batch") as t:
        for images, labels, filenames, race_labels in t:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(images)
            loss_per_sample = criterion(outputs, labels)  # shape: (batch_size,)

            # Convert race labels to their weights
            batch_weights = torch.tensor([race_weights[race] for race in race_labels], dtype=torch.float32).to(device)

            # Apply race weights to the loss
            weighted_loss = (loss_per_sample * batch_weights).mean()

            weighted_loss.backward()
            optimizer.step()

            running_loss += weighted_loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            t.set_postfix(loss=running_loss / (total / 32), acc=correct / total)

torch.save(model.state_dict(), 'inverse_freq.pth')

#### Evaluate (copied from baseline ResNet code)

In [16]:
import numpy as np
# dependency on the first FairFace race annotation task
race_labels_test = np.load('/content/raf_race_labels_test.npy', allow_pickle=True).item()

In [None]:
from collections import defaultdict
from sklearn.metrics import accuracy_score, classification_report

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_model = ResNet(num_classes=7).to(device)
best_model.load_state_dict(torch.load("inverse_freq.pth", map_location=device))
best_model.eval()

# Dictionaries to store predictions and ground truths grouped by race
race_predictions = defaultdict(list)
race_ground_truths = defaultdict(list)

with torch.no_grad():
    for images, emotion_labels, filenames in test_loader:
        images = images.to(device)
        emotion_labels = emotion_labels.to(device)
        outputs = best_model(images)
        _, predicted_emotions = torch.max(outputs, 1)

        for idx in range(len(filenames)):
            filename = filenames[idx]

            # match race_labels_test keys
            normalized_filename = filename
            if not normalized_filename.endswith('_aligned.jpg'):
                normalized_filename = normalized_filename.replace('.jpg', '_aligned.jpg')

            race = race_labels_test.get(normalized_filename, "Unknown")
            if race == "Unknown":
                print(f"race label missing for {normalized_filename}")

            race_predictions[race].append(predicted_emotions[idx].cpu().item())
            race_ground_truths[race].append(emotion_labels[idx].cpu().item())

emotion_classes = ["surprise", "fear", "disgust", "happiness", "sadness", "anger", "neutral"]
labels_used = list(range(len(emotion_classes)))  # [0, 1, 2, 3, 4, 5, 6]

In [None]:
races = []
accuracies = []
standard_errors = []

for race in race_predictions.keys():
    y_true = race_ground_truths[race]
    y_pred = race_predictions[race]

    acc = accuracy_score(y_true, y_pred)

    n_samples = len(y_true)
    # error bars
    se = np.sqrt(acc * (1 - acc) / n_samples)

    races.append(race)
    accuracies.append(acc)
    standard_errors.append(se)

    # got some help from LLM here for how to nicely format the output in a tabular format
    # using classification_report.
    print(f"\n=== Race: {race} ===")
    print(f"Samples: {n_samples}")
    print(f"Accuracy: {acc:.4f}")
    print(f"Standard Error: {se:.4f}")

    # Generate classification report
    report = classification_report(
        y_true,
        y_pred,
        labels=labels_used,
        target_names=emotion_classes,
        zero_division=0
    )
    print("Classification Report:")
    print(report)
    print("-" * 50)

plt.figure(figsize=(10, 6))
plt.bar(races, accuracies, yerr=standard_errors, capsize=5, alpha=0.7)

plt.xlabel("Race")
plt.ylabel("Accuracy")
plt.title("Accuracy per race")
plt.ylim(0, 1.0)
plt.grid(axis='y', linestyle='--', alpha=0.7)

plt.show()

### Adversarial debiasing

#### Model setup

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Function

# got some help from ChatGPT since this was a more unknown component compared to
# the ResNet setup. Literature also didn't provide a specific implementation.
class GradientReversalFunction(Function):
    def forward(ctx, x, lambda_):
        ctx.lambda_ = lambda_
        return x.view_as(x)

    def backward(ctx, grad_output):
        return grad_output.neg() * ctx.lambda_, None

def grad_reverse(x, lambda_=1.0):
    return GradientReversalFunction.apply(x, lambda_)

In [None]:
# use previously implemented ResNet as a basis, and make sure the output is 512d.
# this will be used as the feature extractor that later feeds the output into
# the emotion classifier and race classifier.
class ResNetFeatureExtractor(nn.Module):
    def __init__(self, dropout_prob=0.5):
        super(ResNetFeatureExtractor, self).__init__()

        # using the same ResNet as before!
        self.backbone = ResNet(dropout_prob=dropout_prob)
        self.feature_dim = 512 # size of the feature representation / embedding

    def forward(self, x):
        x = self.backbone.conv1(x)
        x = self.backbone.bn1(x)
        x = self.backbone.relu(x)
        x = self.backbone.maxpool(x)

        x = self.backbone.layer1(x)
        x = self.backbone.layer2(x)
        x = self.backbone.layer3(x)
        x = self.backbone.layer4(x)

        x = self.backbone.avgpool(x)
        x = torch.flatten(x, 1)

        return x

In [None]:
class AdversarialDebiasingModel(nn.Module):
    def __init__(self, feature_extractor, num_emotions=7, num_races=7, lambda_adv=1.0):
        super(AdversarialDebiasingModel, self).__init__()

        self.feature_extractor = feature_extractor
        self.lambda_adv = lambda_adv
        self.emotion_classifier = nn.Linear(self.feature_extractor.feature_dim, num_emotions)
        # adversary. keep the model simple so training doesn't take exponentially longer
        # (might run out of GPU credits)
        self.race_classifier = nn.Sequential(
            nn.Linear(self.feature_extractor.feature_dim, 128),
            nn.ReLU(),
            nn.Linear(128, num_races)
        )

    def forward(self, x):
        features = self.feature_extractor(x)
        emotion_logits = self.emotion_classifier(features)
        reversed_features = grad_reverse(features, self.lambda_adv)
        race_logits = self.race_classifier(reversed_features)

        return emotion_logits, race_logits

#### Train the model

In [None]:
race_classes = sorted(set(raf_race_labels_train.values()))
race_to_index = {race: idx for idx, race in enumerate(race_classes)}

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

num_emotions = 7
num_races = len(set(raf_race_labels_train.values()))
lambda_adv_start = 0.0
lambda_adv_max = 1.0 # adversarial strength. gradually increase the lambda

feature_extractor = ResNetFeatureExtractor(dropout_prob=0.5)
adv_model = AdversarialDebiasingModel(feature_extractor, num_emotions, num_races, lambda_adv_start).to(device)

emotion_criterion = nn.CrossEntropyLoss()
race_criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(adv_model.parameters(), lr=0.001)

num_epochs = 15

for epoch in range(num_epochs):
    adv_model.train()
    running_loss = 0.0
    correct_emotion, correct_race = 0, 0
    total_emotion, total_race = 0, 0

    # gradually increase the adversarial strength
    lambda_adv = min(lambda_adv_max, epoch / (num_epochs / 2))
    adv_model.lambda_adv = lambda_adv

    for images, emotion_labels, _, race_labels in train_loader:
        images = images.to(device)
        emotion_labels = emotion_labels.to(device)
        race_indices = torch.tensor([race_to_index[r] for r in race_labels], dtype=torch.long).to(device)

        optimizer.zero_grad()

        emotion_logits, race_logits = adv_model(images)
        loss_emotion = emotion_criterion(emotion_logits, emotion_labels)
        loss_race = race_criterion(race_logits, race_indices)

        total_loss = loss_emotion + loss_race
        total_loss.backward()

        optimizer.step()

        # for tracking / metrics
        running_loss += total_loss.item()

        _, pred_emotion = torch.max(emotion_logits, 1)
        _, pred_race = torch.max(race_logits, 1)

        correct_emotion += (pred_emotion == emotion_labels).sum().item()
        total_emotion += emotion_labels.size(0)

        correct_race += (pred_race == race_indices).sum().item()
        total_race += race_indices.size(0)

    emotion_acc = correct_emotion / total_emotion
    race_acc = correct_race / total_race

    # got some help from chatgpt for how to format this such that the training progress is clear.
    print(f"Epoch {epoch+1}/{num_epochs} | Lambda_adv: {lambda_adv:.2f} | Loss: {running_loss:.4f} | "
          f"Emotion Acc: {emotion_acc:.4f} | Race Acc (adv): {race_acc:.4f}")

In [None]:
torch.save(adv_model.state_dict(), "adv_debias_model.pth")

#### Evaluate (copied from previous eval code)

In [None]:
import numpy as np
# dependency on the first FairFace race annotation task
race_labels_test = np.load('/content/raf_race_labels_test.npy', allow_pickle=True).item()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_model = ResNet(num_classes=7).to(device)
best_model.load_state_dict(torch.load("inverse_freq.pth", map_location=device))
best_model.eval()

# Dictionaries to store predictions and ground truths grouped by race
race_predictions = defaultdict(list)
race_ground_truths = defaultdict(list)

with torch.no_grad():
    for images, emotion_labels, filenames in test_loader:
        images = images.to(device)
        emotion_labels = emotion_labels.to(device)
        outputs = best_model(images)
        _, predicted_emotions = torch.max(outputs, 1)

        for idx in range(len(filenames)):
            filename = filenames[idx]

            # match race_labels_test keys
            normalized_filename = filename
            if not normalized_filename.endswith('_aligned.jpg'):
                normalized_filename = normalized_filename.replace('.jpg', '_aligned.jpg')

            race = race_labels_test.get(normalized_filename, "Unknown")
            if race == "Unknown":
                print(f"race label missing for {normalized_filename}")

            race_predictions[race].append(predicted_emotions[idx].cpu().item())
            race_ground_truths[race].append(emotion_labels[idx].cpu().item())

emotion_classes = ["surprise", "fear", "disgust", "happiness", "sadness", "anger", "neutral"]
labels_used = list(range(len(emotion_classes)))  # [0, 1, 2, 3, 4, 5, 6]

In [None]:
races = []
accuracies = []
standard_errors = []

for race in race_predictions.keys():
    y_true = race_ground_truths[race]
    y_pred = race_predictions[race]

    acc = accuracy_score(y_true, y_pred)

    n_samples = len(y_true)
    # error bars
    se = np.sqrt(acc * (1 - acc) / n_samples)

    races.append(race)
    accuracies.append(acc)
    standard_errors.append(se)

    # got some help from LLM here for how to nicely format the output in a tabular format
    # using classification_report.
    print(f"\n=== Race: {race} ===")
    print(f"Samples: {n_samples}")
    print(f"Accuracy: {acc:.4f}")
    print(f"Standard Error: {se:.4f}")

    # Generate classification report
    report = classification_report(
        y_true,
        y_pred,
        labels=labels_used,
        target_names=emotion_classes,
        zero_division=0
    )
    print("Classification Report:")
    print(report)
    print("-" * 50)

plt.figure(figsize=(10, 6))
plt.bar(races, accuracies, yerr=standard_errors, capsize=5, alpha=0.7)

plt.xlabel("Race")
plt.ylabel("Accuracy")
plt.title("Accuracy per race")
plt.ylim(0, 1.0)
plt.grid(axis='y', linestyle='--', alpha=0.7)

plt.show()