In [127]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

NSAMPLES = 500
# Create toy Gaussian data
def create_gaussian_data(mean, std, n_samples):
    return np.random.randn(n_samples, 2) * std + mean

# Dataset class
class GaussianToyDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create complex multi-cluster Gaussian data
def create_complex_data(n_samples_per_cluster=100, n_clusters_per_class=3, std=0.5):
    X = []
    y = []
    centers_class0 = np.random.randn(n_clusters_per_class, 2) * 3.0  # Spread out
    centers_class1 = np.random.randn(n_clusters_per_class, 2) * 3.0 + 5.0  # Move class 1 far away

    for center in centers_class0:
        points = np.random.randn(n_samples_per_cluster, 2) * std + center
        X.append(points)
        y += [0] * n_samples_per_cluster

    for center in centers_class1:
        points = np.random.randn(n_samples_per_cluster, 2) * std + center
        X.append(points)
        y += [1] * n_samples_per_cluster

    X = np.vstack(X)
    y = np.array(y)
    return X, y

def nonlinear_warp(X, freq=2.0, amp=0.5):
    X_new = X.copy()
    X_new[:, 0] += amp * np.sin(freq * X[:, 1])
    X_new[:, 1] += amp * np.sin(freq * X[:, 0])
    return X_new

def add_moderate_label_noise(y, noise_rate=0.2):
    noisy_y = y.copy()
    flip_mask = np.random.rand(len(y)) < noise_rate
    noisy_y[flip_mask] = 1 - noisy_y[flip_mask]  # assuming binary
    return noisy_y

# SIMPLE DATA CREATION

# Task A (original)
# X0 = create_gaussian_data([-1, -1], 1, NSAMPLES)
# X1 = create_gaussian_data([1, 1], 1, NSAMPLES)
# X_A = np.vstack((X0, X1))
# y_A = np.array([0]*NSAMPLES + [1]*NSAMPLES)

# Task B (rotation)
# theta = np.pi / 4  # 45 degrees
# R = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]])
# X_B = X_A @ R.T
# y_B = y_A.copy()



NSAMPLES = 500

# Task A (Simple Gaussian)
X0 = create_gaussian_data([-1, -1], 1, NSAMPLES)
X1 = create_gaussian_data([1, 1], 1, NSAMPLES)
X_A = np.vstack((X0, X1))
y_A = np.array([0]*NSAMPLES + [1]*NSAMPLES)

# Task B (Rotated Simple Gaussian with minor label noise)
theta = np.pi / 4  # 45 degrees
R = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]])
X_B = X_A @ R.T
y_B = y_A.copy()
# y_B = add_moderate_label_noise(y_B, noise_rate=0.01)

# Task C (Multi Blob Gaussian with more blobs and more label noise)
X_C, y_C = create_complex_data(NSAMPLES // 5, n_clusters_per_class=5)
# y_C = add_moderate_label_noise(y_C, noise_rate=0.1)



trainset_A = GaussianToyDataset(X_A, y_A)
trainset_B = GaussianToyDataset(X_B, y_B)
trainset_C = GaussianToyDataset(X_C, y_C)

trainloader_A = DataLoader(trainset_A, batch_size=64, shuffle=True)
trainloader_B = DataLoader(trainset_B, batch_size=64, shuffle=True)
trainloader_C = DataLoader(trainset_C, batch_size=64, shuffle=True)


In [128]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(2, 16)   # First hidden layer
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(16, 2)   # Output layer (for 2 classes)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

In [129]:
# Define the prior and posterior distributions
def prior_distribution(model):
    return [param.data.clone() for param in model.parameters()]

def posterior_distribution(model):
    return [param.data.clone() for param in model.parameters()]

def kl_divergence(prior, posterior, sigma_sq=1.0):
    # we don't have access to a "distribution", therefore, we assume both the prior and the posterior have some shared covariance matrix
    kl = 0.0
    for p, q in zip(prior, posterior):
        kl += torch.sum((q - p) ** 2)
    return (0.5 / sigma_sq) * kl

def pac_bayes_bound(prior, posterior, n_samples, empirical_loss, delta=0.05, sigma_sq=1.0):
    kl = kl_divergence(prior, posterior, sigma_sq=sigma_sq)
    bound_term = (kl + np.log(2 * np.sqrt(n_samples) / delta)) / (2 * n_samples)
    return empirical_loss + torch.sqrt(torch.tensor(bound_term, dtype=torch.float32))


In [130]:
def train(model, loader, optimizer, criterion, epochs, n_samples):
    for epoch in range(epochs):
        total_loss = 0
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}")


def train_with_pacbayes(model, loader, optimizer, criterion, epochs, n_samples, prior):
    for epoch in range(epochs):
        total_loss = 0
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        if (epoch + 1) % 10 == 0:
            posterior = posterior_distribution(model)
            epsilon = total_loss / len(loader)
            bound = pac_bayes_bound(prior, posterior, n_samples, epsilon)
            print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}, PAC-Bayes Bound: {bound:.4f}")

def evaluate(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            preds = model(x).argmax(dim=1)
            correct += (preds == y).sum().item()
            total += len(y)
    return correct / total * 100

In [131]:
torch.manual_seed(1984)

model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

print("Training on Task A (Original Data)")
train(model, trainloader_A, optimizer, criterion, 100, NSAMPLES)
acc_A_before = evaluate(model, trainloader_A)
acc_B_before = evaluate(model, trainloader_B)
print(f"Task A Accuracy: {acc_A_before:.2f}%, Task B Before Training: {acc_B_before:.2f}%")

prior = prior_distribution(model)
print("\nTraining on Task B (Rotated Data, simple transformation)")
train_with_pacbayes(model, trainloader_B, optimizer, criterion, 100, NSAMPLES, prior)
acc_A_after = evaluate(model, trainloader_A)
acc_B_after = evaluate(model, trainloader_B)
print(f"After Task B → Task A Accuracy: {acc_A_after:.2f}%, Task B Accuracy: {acc_B_after:.2f}%")

print("\nTraining on Task C (Complex Multi-Blob Gaussian, more difficult transformation for the model to forget on)")
train_with_pacbayes(model, trainloader_C, optimizer, criterion, 100, NSAMPLES, prior)
acc_A_after = evaluate(model, trainloader_A)
acc_B_after = evaluate(model, trainloader_B)
acc_C_after = evaluate(model, trainloader_C)
print(f"After Task C → Task A Accuracy: {acc_A_after:.2f}%, Task B Accuracy: {acc_B_after:.2f}%, Task C Accuracy: {acc_C_after:.2f}%")

Training on Task A (Original Data)
Epoch 10/100, Loss: 0.2653
Epoch 20/100, Loss: 0.2014
Epoch 30/100, Loss: 0.2965
Epoch 40/100, Loss: 0.2704
Epoch 50/100, Loss: 0.1931
Epoch 60/100, Loss: 0.1577
Epoch 70/100, Loss: 0.2005
Epoch 80/100, Loss: 0.1188
Epoch 90/100, Loss: 0.2333
Epoch 100/100, Loss: 0.0326
Task A Accuracy: 92.20%, Task B Before Training: 84.10%

Training on Task B (Rotated Data, simple transformation)


  return empirical_loss + torch.sqrt(torch.tensor(bound_term, dtype=torch.float32))


Epoch 10/100, Loss: 0.2391, PAC-Bayes Bound: 0.3239
Epoch 20/100, Loss: 0.2650, PAC-Bayes Bound: 0.2966
Epoch 30/100, Loss: 0.1657, PAC-Bayes Bound: 0.2851
Epoch 40/100, Loss: 0.0715, PAC-Bayes Bound: 0.2793
Epoch 50/100, Loss: 0.2590, PAC-Bayes Bound: 0.2823
Epoch 60/100, Loss: 0.2234, PAC-Bayes Bound: 0.2808
Epoch 70/100, Loss: 0.1059, PAC-Bayes Bound: 0.2777
Epoch 80/100, Loss: 0.2235, PAC-Bayes Bound: 0.2804
Epoch 90/100, Loss: 0.0914, PAC-Bayes Bound: 0.2772
Epoch 100/100, Loss: 0.2490, PAC-Bayes Bound: 0.2809
After Task B → Task A Accuracy: 84.50%, Task B Accuracy: 92.10%

Training on Task C (Complex Multi-Blob Gaussian, more difficult transformation for the model to forget on)
Epoch 10/100, Loss: 0.3193, PAC-Bayes Bound: 0.4955
Epoch 20/100, Loss: 0.3210, PAC-Bayes Bound: 0.4384
Epoch 30/100, Loss: 0.4041, PAC-Bayes Bound: 0.4251
Epoch 40/100, Loss: 0.2753, PAC-Bayes Bound: 0.4126
Epoch 50/100, Loss: 0.3316, PAC-Bayes Bound: 0.4068
Epoch 60/100, Loss: 0.4370, PAC-Bayes Bound: 0.