## Import required libraries

In [None]:
import numpy as np
import tarfile
import os

import torch
from torch import nn
import torch.nn.functional as F
from torchvision.datasets.utils import download_url
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.transforms as tt
from torchvision.models import resnet18

torch.manual_seed(100)

## Mounting drive to load saved model
Saved Model - [Google Drive](https://drive.google.com/file/d/1QkEXtdRpYvDdRUi4LtF9OZen8kiayqVo/view?usp=drive_link)


> You can download the model from the above link and upload it to colab files using the cell below



In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))



> Or you can copy the model to your drive and use it



In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Helper functions

In [None]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

def training_step(model, batch):
    images, labels = batch
    images, labels = images.to(device), labels.to(device)
    out = model(images)
    loss = F.cross_entropy(out, labels)
    return loss

def validation_step(model, batch):
    images, labels = batch
    images, labels = images.to(device), labels.to(device)
    out = model(images)
    loss = F.cross_entropy(out, labels)
    acc = accuracy(out, labels)
    return {'Loss': loss.detach(), 'Acc': acc}

def validation_epoch_end(model, outputs):
    batch_losses = [x['Loss'] for x in outputs]
    epoch_loss = torch.stack(batch_losses).mean()
    batch_accs = [x['Acc'] for x in outputs]
    epoch_acc = torch.stack(batch_accs).mean()
    return {'Loss': epoch_loss.item(), 'Acc': epoch_acc.item()}

def epoch_end(model, epoch, result):
    print("Epoch [{}], last_lr: {:.5f}, train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
        epoch, result['lrs'][-1], result['train_loss'], result['Loss'], result['Acc']))

def distance(model,model0):
    distance=0
    normalization=0
    for (k, p), (k0, p0) in zip(model.named_parameters(), model0.named_parameters()):
        space='  ' if 'bias' in k else ''
        current_dist=(p.data0-p0.data0).pow(2).sum().item()
        current_norm=p.data0.pow(2).sum().item()
        distance+=current_dist
        normalization+=current_norm
    print(f'Distance: {np.sqrt(distance)}')
    print(f'Normalized Distance: {1.0*np.sqrt(distance/normalization)}')
    return 1.0*np.sqrt(distance/normalization)

In [None]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [validation_step(model, batch) for batch in val_loader]
    return validation_epoch_end(model, outputs)

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_one_cycle(epochs, max_lr, model, train_loader, val_loader,
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):
    torch.cuda.empty_cache()
    history = []

    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)

    sched = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

    for epoch in range(epochs):
        model.train()
        train_losses = []
        lrs = []
        for batch in train_loader:
            loss = training_step(model, batch)
            train_losses.append(loss)
            loss.backward()

            if grad_clip:
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)

            optimizer.step()
            optimizer.zero_grad()

            lrs.append(get_lr(optimizer))


        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['lrs'] = lrs
        epoch_end(model, epoch, result)
        history.append(result)
        sched.step(result['Loss'])
    return history

### Dataset download and preprocessing

In [None]:
# Dowload the dataset
dataset_url = "https://s3.amazonaws.com/fast-ai-imageclas/cifar10.tgz"
download_url(dataset_url, '.')

# Extract from archive
with tarfile.open('./cifar10.tgz', 'r:gz') as tar:
    tar.extractall(path='./data')

# Look into the data directory
data_dir = './data/cifar10'
print(os.listdir(data_dir))
classes = os.listdir(data_dir + "/train")
print(classes)

In [None]:
transform_train = tt.Compose([
    tt.ToTensor(),
    tt.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = tt.Compose([
    tt.ToTensor(),
    tt.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

In [None]:
train_ds = ImageFolder(data_dir+'/train', transform_train)
valid_ds = ImageFolder(data_dir+'/test', transform_test)

In [None]:
batch_size = 256
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=2, pin_memory=True)
valid_dl = DataLoader(valid_ds, batch_size*2, num_workers=2, pin_memory=True)

### Training configuration

In [None]:
device = "cuda"
model = resnet18(num_classes = 10).to(device = device)

epochs = 40
max_lr = 0.01
grad_clip = 0.1
weight_decay = 1e-4
opt_func = torch.optim.Adam

### Train and save the model

In [None]:
# %%time
# history = fit_one_cycle(epochs, max_lr, model, train_dl, valid_dl,
#                              grad_clip=grad_clip,
#                              weight_decay=weight_decay,
#                              opt_func=opt_func)

# torch.save(model.state_dict(), "ResNET18_CIFAR10_ALL_CLASSES.pt")

### Load the saved model (from files or drive)

In [None]:
# model.load_state_dict(torch.load("/content/drive/MyDrive/Machine-Unlearning-Workshop 2025/ResNET18_CIFAR10_ALL_CLASSES.pt"))
model.load_state_dict(torch.load("/content/ResNET18_CIFAR10_ALL_CLASSES.pt"))
original_model_history = [evaluate(model, valid_dl)]
original_model_history

### Create noise for unlearning step

In [None]:
# defining the noise structure
class Noise(nn.Module):
    def __init__(self, *dim):
        super().__init__()
        self.noise = torch.nn.Parameter(torch.randn(*dim), requires_grad = True)

    def forward(self):
        return self.noise

### Select classes to unlearn

In [None]:
# list of all classes
classes = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# classes which are required to un-learn
classes_to_forget = [0, 2]

In [None]:
# classwise list of samples
num_classes = 10
classwise_train = {}
for i in range(num_classes):
    classwise_train[i] = []

for img, label in train_ds:
    classwise_train[label].append((img, label))

classwise_test = {}
for i in range(num_classes):
    classwise_test[i] = []

for img, label in valid_ds:
    classwise_test[label].append((img, label))

In [None]:
# getting some samples from retain classes
num_samples_per_class = 1000

retain_samples = []
for i in range(len(classes)):
    if classes[i] not in classes_to_forget:
        retain_samples += classwise_train[i][:num_samples_per_class]

In [None]:
# retain validation set
retain_valid = []
for cls in range(num_classes):
    if cls not in classes_to_forget:
        for img, label in classwise_test[cls]:
            retain_valid.append((img, label))

# forget validation set
forget_valid = []
for cls in range(num_classes):
    if cls in classes_to_forget:
        for img, label in classwise_test[cls]:
            forget_valid.append((img, label))

forget_valid_dl = DataLoader(forget_valid, batch_size, num_workers=2, pin_memory=True)
retain_valid_dl = DataLoader(retain_valid, batch_size*2, num_workers=2, pin_memory=True)

In [None]:
# loading the model
model = resnet18(num_classes = 10).to(device = device)
# model.load_state_dict(torch.load("/content/drive/MyDrive/Machine-Unlearning-Workshop 2025/ResNET18_CIFAR10_ALL_CLASSES.pt"))
model.load_state_dict(torch.load("/content/ResNET18_CIFAR10_ALL_CLASSES.pt"))

### Training the noise

In [None]:
%%time

noises = {}
for cls in classes_to_forget:
    print("Optiming loss for class {}".format(cls))
    noises[cls] = Noise(batch_size, 3, 32, 32).cuda()
    opt = torch.optim.Adam(noises[cls].parameters(), lr = 0.1)

    num_epochs = 5
    num_steps = 8
    class_label = cls
    for epoch in range(num_epochs):
        total_loss = []
        for batch in range(num_steps):
            inputs = noises[cls]()
            labels = torch.zeros(batch_size).cuda()+class_label
            outputs = model(inputs)
            loss = -F.cross_entropy(outputs, labels.long()) + 0.1*torch.mean(torch.sum(torch.square(inputs), [1, 2, 3]))
            opt.zero_grad()
            loss.backward()
            opt.step()
            total_loss.append(loss.cpu().detach().numpy())
        print("Loss: {}".format(np.mean(total_loss)))

### Impair step

In [None]:
%%time

batch_size = 256
noisy_data = []
num_batches = 20
class_num = 0

for cls in classes_to_forget:
    for i in range(num_batches):
        batch = noises[cls]().cpu().detach()
        for i in range(batch[0].size(0)):
            noisy_data.append((batch[i], torch.tensor(class_num)))

other_samples = []
for i in range(len(retain_samples)):
    other_samples.append((retain_samples[i][0].cpu(), torch.tensor(retain_samples[i][1])))
noisy_data += other_samples
noisy_loader = torch.utils.data.DataLoader(noisy_data, batch_size=256, shuffle = True)


optimizer = torch.optim.Adam(model.parameters(), lr = 0.02)


for epoch in range(1):
    model.train(True)
    running_loss = 0.0
    running_acc = 0
    for i, data in enumerate(noisy_loader):
        inputs, labels = data
        inputs, labels = inputs.cuda(),torch.tensor(labels).cuda()

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = F.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item() * inputs.size(0)
        out = torch.argmax(outputs.detach(),dim=1)
        assert out.shape==labels.shape
        running_acc += (labels==out).sum().item()
    print(f"Train loss {epoch+1}: {running_loss/len(train_ds)},Train Acc:{running_acc*100/len(train_ds)}%")

### Performance of the model of retain and forget class after impair step

In [None]:
print("Performance of Standard Forget Model on Forget Class")
impair_step_history_f = [evaluate(model, forget_valid_dl)]
print("Accuracy: {}".format(impair_step_history_f[0]["Acc"]*100))
print("Loss: {}".format(impair_step_history_f[0]["Loss"]))

print("Performance of Standard Forget Model on Retain Class")
impair_step_history_r = [evaluate(model, retain_valid_dl)]
print("Accuracy: {}".format(impair_step_history_r[0]["Acc"]*100))
print("Loss: {}".format(impair_step_history_r[0]["Loss"]))

### Repair step

In [None]:
%%time

heal_loader = torch.utils.data.DataLoader(other_samples, batch_size=256, shuffle = True)

optimizer = torch.optim.Adam(model.parameters(), lr = 0.01)


for epoch in range(1):
    model.train(True)
    running_loss = 0.0
    running_acc = 0
    for i, data in enumerate(heal_loader):
        inputs, labels = data
        inputs, labels = inputs.cuda(),torch.tensor(labels).cuda()

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = F.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item() * inputs.size(0)
        out = torch.argmax(outputs.detach(),dim=1)
        assert out.shape==labels.shape
        running_acc += (labels==out).sum().item()
    print(f"Train loss {epoch+1}: {running_loss/len(train_ds)},Train Acc:{running_acc*100/len(train_ds)}%")

### Performance of the model on retain and forget class after repair step

In [None]:
print("Performance of Standard Forget Model on Forget Class")
repair_step_history_f = [evaluate(model, forget_valid_dl)]
print("Accuracy: {}".format(repair_step_history_f[0]["Acc"]*100))
print("Loss: {}".format(repair_step_history_f[0]["Loss"]))

print("Performance of Standard Forget Model on Retain Class")
repair_step_history_r = [evaluate(model, retain_valid_dl)]
print("Accuracy: {}".format(repair_step_history_r[0]["Acc"]*100))
print("Loss: {}".format(repair_step_history_r[0]["Loss"]))

In [None]:
print("Original model history:")
print(original_model_history)
print("\nPerformance on Forget Class after Impair step:")
print(impair_step_history_f)
print("\nPerformance on Retain Class after Impair step:")
print(impair_step_history_r)
print("\nPerformance on Forget Class after Repair step:")
print(repair_step_history_f)
print("\nPerformance on Retain Class after Repair step:")
print(repair_step_history_r)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Extracting accuracy and loss values
forget_accs = [original_model_history[0]['Acc'], impair_step_history_f[0]['Acc'], repair_step_history_f[0]['Acc']]
retain_accs = [original_model_history[0]['Acc'], impair_step_history_r[0]['Acc'], repair_step_history_r[0]['Acc']]

forget_losses = [original_model_history[0]['Loss'], impair_step_history_f[0]['Loss'], repair_step_history_f[0]['Loss']]
retain_losses = [original_model_history[0]['Loss'], impair_step_history_r[0]['Loss'], repair_step_history_r[0]['Loss']]

labels = ['Before Unlearning', 'After Impair', 'After Repair']
x = np.arange(len(labels))
width = 0.35

# Create a figure with two subplots (one for accuracy, one for loss)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))

# Accuracy plot
rects1_acc = ax1.bar(x - width/2, [acc * 100 for acc in forget_accs], width, label='Forget Class', color='#1f77b4', alpha=0.8)
rects2_acc = ax1.bar(x + width/2, [acc * 100 for acc in retain_accs], width, label='Retain Class', color='#ff7f0e', alpha=0.8)

ax1.set_ylabel('Accuracy (%)')
ax1.set_title('Model Accuracy')
ax1.set_xticks(x)
ax1.set_xticklabels(labels)
ax1.legend()
ax1.set_ylim(0, 100) # Set y-axis limit for accuracy

# Add percentage labels to accuracy bars
def autolabel_acc(rects):
    for rect in rects:
        height = rect.get_height()
        ax1.annotate('%.2f%%' % height,
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom')

autolabel_acc(rects1_acc)
autolabel_acc(rects2_acc)

# Loss plot
rects1_loss = ax2.bar(x - width/2, forget_losses, width, label='Forget Class', color='#1f77b4', alpha=0.8)
rects2_loss = ax2.bar(x + width/2, retain_losses, width, label='Retain Class', color='#ff7f0e', alpha=0.8)

ax2.set_ylabel('Loss')
ax2.set_title('Model Loss')
ax2.set_xticks(x)
ax2.set_xticklabels(labels)
ax2.legend()
ax2.set_ylim(0, max(max(forget_losses), max(retain_losses)) * 1.1) # Set y-axis limit for loss

# Add loss values labels to loss bars
def autolabel_loss(rects):
    for rect in rects:
        height = rect.get_height()
        ax2.annotate('%.2f' % height,
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom')

autolabel_loss(rects1_loss)
autolabel_loss(rects2_loss)


plt.tight_layout()
plt.show()