### Importing Necessary Libraries

In [1]:
!pip install umap-learn

Collecting umap-learn
  Downloading umap_learn-0.5.6-py3-none-any.whl.metadata (21 kB)
Collecting pynndescent>=0.5 (from umap-learn)
  Downloading pynndescent-0.5.12-py3-none-any.whl.metadata (6.8 kB)
Downloading umap_learn-0.5.6-py3-none-any.whl (85 kB)
   ---------------------------------------- 0.0/85.7 kB ? eta -:--:--
   -------------- ------------------------- 30.7/85.7 kB 640.0 kB/s eta 0:00:01
   ------------------- -------------------- 41.0/85.7 kB 279.3 kB/s eta 0:00:01
   ---------------------------- ----------- 61.4/85.7 kB 363.1 kB/s eta 0:00:01
   ---------------------------------------- 85.7/85.7 kB 301.9 kB/s eta 0:00:00
Downloading pynndescent-0.5.12-py3-none-any.whl (56 kB)
   ---------------------------------------- 0.0/56.8 kB ? eta -:--:--
   ------------------------------------ --- 51.2/56.8 kB ? eta -:--:--
   ---------------------------------------- 56.8/56.8 kB 739.3 kB/s eta 0:00:00
Installing collected packages: pynndescent, umap-learn
Successfully installed 

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import umap.umap_ as umap
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F
import random
import time

### Model Definition: ResNet18

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

    def extract_features(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        return out

def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

# **Step 1: Train the ResNet18 Model on CIFAR10**

## **1. Prepare the Dataset and Train the Model**

In [None]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=2)

net = ResNet18()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net = net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

train_losses = []
train_accuracies = []
# Training loop
start_time = time.time()
for epoch in range(10):
    running_loss = 0.0
    correct = 0
    total = 0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()  

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        if i % 100 == 99: 
            print(f'[Epoch {epoch + 1}, Iter {i + 1}] loss: {running_loss / 100:.3f}')
            running_loss = 0.0

    train_losses.append(running_loss / len(trainloader))
    train_accuracies.append(100 * correct / total)
total_time = time.time() - start_time
print('Finished Training')
print(f'Total Training time: {total_time:.2f} seconds')
torch.save(net.state_dict(), 'resnet18_cifar10.pth')  

In [None]:
# Plotting the results
epochs = range(1, 11)

plt.figure(figsize=(14, 5))

# Plot for epoch vs loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label='Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Epoch vs Loss - Training')
plt.legend()

# Plot for epoch vs accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, label='Training Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.title('Epoch vs Accuracy - Training')
plt.legend()

plt.tight_layout()
plt.show()

## **2. Test the Model and Report Accuracy for Each Class**

In [None]:
# Data loading and preprocessing
transform_test = transforms.Compose([
    transforms.ToTensor(),  
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), 
])

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')  # CIFAR-10 class names

# Load the trained model
net = ResNet18()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net.load_state_dict(torch.load('resnet18_cifar10.pth'))  # Load the saved model weights
net = net.to(device)
net.eval()  

correct = {i: 0 for i in range(10)}
total = {i: 0 for i in range(10)}

with torch.no_grad():
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images)  
        _, predicted = torch.max(outputs, 1)  
        c = (predicted == labels).squeeze()  
        for i in range(len(labels)):
            label = labels[i].item()
            correct[label] += c[i].item()  
            total[label] += 1  

# Print accuracy for each class
for i in range(10):
    print(f'Accuracy of {classes[i]}: {100 * correct[i] / total[i]:.2f} %')

## **3. Extract Intermediate Representations and Apply UMAP**

visualize_umap.py

In [None]:
# Load the trained model
net = ResNet18()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net.load_state_dict(torch.load('resnet18_cifar10.pth')) 
net = net.to(device)
net.eval()  

features = []
labels_list = []

def hook(module, input, output):
    features.append(output.detach().cpu().numpy())

net.linear.register_forward_hook(hook)

with torch.no_grad():  
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images) 
        labels_list.extend(labels.cpu().numpy())

features = np.concatenate(features, axis=0)  
labels_list = np.array(labels_list)  

# Apply UMAP for dimensionality reduction
reducer = umap.UMAP()
embedding = reducer.fit_transform(features)

# Plot UMAP result
plt.figure(figsize=(10, 8))
for i in range(10):
    indices = labels_list == i
    plt.scatter(embedding[indices, 0], embedding[indices, 1], label=classes[i], s=5)
plt.legend()
plt.title('UMAP projection of CIFAR-10 features - Testing')
plt.show()

# **Step 2: Boundary Unlearning with PGD Attack**

## **1. Define the PGD Attack Function**

adv_generator.py

In [None]:
# Define the PGD attack function
def pgd_attack(model, images, labels, eps=8/255, alpha=2/255, iters=10):
    ori_images = images.data
    for i in range(iters):
        images.requires_grad = True
        outputs = model(images)

        model.zero_grad()
        cost = nn.CrossEntropyLoss()(outputs, labels).to(images.device)
        cost.backward()

        adv_images = images + alpha*images.grad.sign()
        eta = torch.clamp(adv_images - ori_images, min=-eps, max=eps)
        images = torch.clamp(ori_images + eta, min=0, max=1).detach_()

    return images

## **2. Unlearn the Ship Class and Retrain the Model**

boundary_unlearning.py

In [None]:
# Filter 3000 ship class images and a balanced set of other classes
ship_class_indices = [i for i, (_, label) in enumerate(trainset) if label == 8]
non_ship_indices = [i for i, (_, label) in enumerate(trainset) if label != 8]
random.shuffle(ship_class_indices)
random.shuffle(non_ship_indices)
ship_class_indices = ship_class_indices[:3000]
non_ship_indices = non_ship_indices[:3000]  

balanced_indices = ship_class_indices + non_ship_indices
balanced_trainloader = torch.utils.data.DataLoader(
    torch.utils.data.Subset(trainset, balanced_indices),
    batch_size=128,
    shuffle=True,
    num_workers=2
)

# Load the trained model
net = ResNet18()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net.load_state_dict(torch.load('resnet18_cifar10.pth'))  
net = net.to(device)

# Define optimizer for unlearning process
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)

# Initialize lists to store loss and accuracy values
train_losses = []
train_accuracies = []
start_time = time.time()

for epoch in range(10): 
    print("Epoch:", epoch)
    running_loss = 0.0
    correct = 0
    total = 0
    for i, data in enumerate(balanced_trainloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        adv_inputs = pgd_attack(net, inputs, labels)  
        outputs = net(adv_inputs) 
        loss = nn.CrossEntropyLoss()(outputs, labels)  

        optimizer.zero_grad()  
        loss.backward()  
        optimizer.step() 

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    if i % 100 == 99:
      print(f'[Epoch {epoch + 1}, Iter {i + 1}] loss: {running_loss / 100:.3f}')
      running_loss = 0.0

    train_losses.append(running_loss / len(trainloader))
    train_accuracies.append(100 * correct / total)
total_time = time.time() - start_time
print('Finished Unlearning with PGD Attack')
print(f'Total unlearning time with PGD Attack Technique: {total_time:.2f} seconds.')
torch.save(net.state_dict(), 'resnet18_unlearned.pth')  # Save the unlearned model

In [None]:
#  Plotting the results
epochs = range(1, 11)

plt.figure(figsize=(14, 5))

# Plot for epoch vs loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label='Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Epoch vs Loss - Unlearning with PGD Attack')
plt.legend()

# Plot for epoch vs accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, label='Training Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.title('Epoch vs Accuracy - Unlearning with PGD Attack')
plt.legend()

plt.tight_layout()
plt.show()

## **3. Test the Unlearned Model and Visualize Using UMAP**

test_unlearned_model.py

In [None]:
# Load the unlearned model
net = ResNet18()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net.load_state_dict(torch.load('resnet18_unlearned.pth'))  
net = net.to(device)
net.eval()  

# Test the unlearned model and calculate accuracy for each class
correct = {i: 0 for i in range(10)}
total = {i: 0 for i in range(10)}
features = []
labels_list = []

with torch.no_grad():  
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images)  
        _, predicted = torch.max(outputs, 1)  # Get the class with the highest score
        c = (predicted == labels).squeeze()  # Check if the predictions are correct
        for i in range(len(labels)):
            label = labels[i].item()
            correct[label] += c[i].item()  # Update correct predictions count
            total[label] += 1  # Update total predictions count

        feat = net.extract_features(images)
        features.append(feat.cpu())
        labels_list.append(labels.cpu())

# Print accuracy for each class
for i in range(10):
    print(f'Accuracy of {classes[i]}: {100 * correct[i] / total[i]:.2f} %')


features = torch.cat(features, dim=0).numpy()
labels_list = torch.cat(labels_list, dim=0).numpy()

# Apply UMAP for visualization
reducer = umap.UMAP()
embedding = reducer.fit_transform(features)

# Plot UMAP result
plt.figure(figsize=(10, 10))
for i in range(10):
    idx = labels_list == i
    plt.scatter(embedding[idx, 0], embedding[idx, 1], label=classes[i], s=5)
plt.legend()
plt.title('UMAP projection of CIFAR-10 features - Unlearning with PGD Attack')
plt.show()

# **Step 3: Modified PGD Attack with Early Stopping**

## **1. Define the Modified PGD Attack Function**

adv_generator_modified.py

In [None]:
def early_stopped_pgd_attack(model, images, labels, eps=8/255, alpha=2/255, iters=10):
    ori_images = images.data
    for i in range(iters):
        images.requires_grad = True
        outputs = model(images)

        model.zero_grad()
        cost = nn.CrossEntropyLoss()(outputs, labels).to(images.device)
        cost.backward()

        adv_images = images + alpha * images.grad.sign()
        eta = torch.clamp(adv_images - ori_images, min=-eps, max=eps)
        images = torch.clamp(ori_images + eta, min=0, max=1).detach_()

        # Early stopping if the attack is successful
        outputs = model(images)
        _, pred = torch.max(outputs, 1)
        if torch.any(pred != labels):
            break

    return images

def extract_features(model, loader, device):
    model.eval()
    features = []
    labels_list = []

    with torch.no_grad():
        for data in loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            feat = model(images) 
            features.append(feat.cpu())
            labels_list.append(labels.cpu())

    features = torch.cat(features, dim=0).numpy()
    labels_list = torch.cat(labels_list, dim=0).numpy()
    return features, labels_list

## **2. Unlearn the Ship Class with Early Stopping**

boundary_unlearning_modified.py

In [None]:
# Load the trained model
net = ResNet18()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net.load_state_dict(torch.load('resnet18_cifar10.pth')) 
net = net.to(device)

optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

train_losses = []
train_accuracies = []
start_time = time.time()
# Unlearning loop using modified PGD attack
for epoch in range(10):  
    running_loss = 0.0
    correct = 0
    total = 0
    print("Epoch:", epoch)
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        adv_inputs = early_stopped_pgd_attack(net, inputs, labels)  # Generate adversarial examples
        outputs = net(adv_inputs)  
        loss = nn.CrossEntropyLoss()(outputs, labels)  

        optimizer.zero_grad()  
        loss.backward()  
        optimizer.step()  
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    if i % 100 == 99:
          print(f'[Epoch {epoch + 1}, Iter {i + 1}] loss: {running_loss / 100:.3f}')
          running_loss = 0.0

    train_losses.append(running_loss / len(trainloader))
    train_accuracies.append(100 * correct / total)
total_time = time.time() - start_time
print('Finished Unlearning with Modified PGD')
print(f'Total unlearning time with Modified PGD Attack Technique: {total_time:.2f} seconds.')
torch.save(net.state_dict(), 'resnet18_unlearned_modified.pth')

In [None]:
# Plotting the results
epochs = range(1, 11)

plt.figure(figsize=(14, 5))

# Plot for epoch vs loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label='Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Epoch vs Loss - Unlearning with Early-Stopped PGD Attack')
plt.legend()

# Plot for epoch vs accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, label='Training Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.title('Epoch vs Accuracy - Unlearning with Early-Stopped PGD Attack')
plt.legend()

plt.tight_layout()
plt.show()

## **3. Test the Modified Unlearned Model and Visualize Using UMAP**

test_unlearned_model_modified.py

In [None]:
# Load the modified unlearned model
net = ResNet18()
net.load_state_dict(torch.load('resnet18_unlearned_modified.pth'))  
net = net.to(device)
net.eval()  

correct = {i: 0 for i in range(10)}
total = {i: 0 for i in range(10)}
features = []
labels_list = []

with torch.no_grad():  
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images)  
        _, predicted = torch.max(outputs, 1)  # Get the class with the highest score
        c = (predicted == labels).squeeze()  # Check if the predictions are correct
        for i in range(len(labels)):
            label = labels[i].item()
            correct[label] += c[i].item()  # Update correct predictions count
            total[label] += 1  # Update total predictions count

        feat = net(images) 
        features.append(feat.cpu())
        labels_list.append(labels.cpu())

# Print accuracy for each class
for i in range(10):
    print(f'Accuracy of {classes[i]}: {100 * correct[i] / total[i]:.2f} %')

features = torch.cat(features, dim=0).numpy()
labels_list = torch.cat(labels_list, dim=0).numpy()

# Apply UMAP for visualization
reducer = umap.UMAP()
embedding = reducer.fit_transform(features)

# Plot UMAP result
plt.figure(figsize=(10, 10))
for i in range(10):
    idx = labels_list == i
    plt.scatter(embedding[idx, 0], embedding[idx, 1], label=classes[i], s=5)
plt.legend()
plt.title('UMAP projection of CIFAR-10 features - Unlearning with Early-Stopped PGD Attack')
plt.show()