In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# pdb.set_trace()
epsilons = [0, .05, .1, .15, .20, .25]
# epsilons_prime = [0.2, 0.4, 0.6, 0.8]
epsilons_prime = np.linspace(0.25, 0.7, 10)
# epsilons_prime = [0.2,0.4]

pretrained_model = "/content/lenet_mnist_model.pth"

use_cuda=True
# Set random seed for reproducibility
torch.manual_seed(42)

In [None]:
#@title Define the model architecture and load pretrained weights
# LeNet Model definition
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

# MNIST Test dataset and dataloader declaration
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, download=True, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,)),
            ])),
        batch_size=1, shuffle=True)

# Define what device we are using
print("CUDA Available: ",torch.cuda.is_available())
device = torch.device("cuda" if use_cuda and torch.cuda.is_available() else "cpu")

# Initialize the network
model = Net().to(device)

# Load the pretrained model
model.load_state_dict(torch.load(pretrained_model, map_location=device))
#
# Set the model in evaluation mode. In this case this is for the Dropout layers
model.eval()

In [None]:
#@title Define FGSM perturbation and denormalise
# FGSM attack code
def fgsm_attack(image, epsilon, data_grad):
    # Collect the element-wise sign of the data gradient
    sign_data_grad = data_grad.sign()
    # Create the perturbed image by adjusting each pixel of the input image
    perturbed_image = image + epsilon*sign_data_grad
    # Adding clipping to maintain [0,1] range
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    # Return the perturbed image
    return perturbed_image

# FGSM extension - target a specific class
# the only difference is that we SUBTRACT the gradient, because in this case
# we are again trying to minimise cross-entropy loss
def fgsm_attack_extension(image, epsilon, data_grad):
    # Collect the element-wise sign of the data gradient
    sign_data_grad = data_grad.sign()
    # Create the perturbed image by adjusting each pixel of the input image
    perturbed_image = image - epsilon*sign_data_grad
    # Adding clipping to maintain [0,1] range
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    # Return the perturbed image
    return perturbed_image

# restores the tensors to their original scale
def denorm(batch, mean=[0.1307], std=[0.3081]):
    """
    Convert a batch of tensors to their original scale.

    Args:
        batch (torch.Tensor): Batch of normalized tensors.
        mean (torch.Tensor or list): Mean used for normalization.
        std (torch.Tensor or list): Standard deviation used for normalization.

    Returns:
        torch.Tensor: batch of tensors without normalization applied to them.
    """
    if isinstance(mean, list):
        mean = torch.tensor(mean).to(device)
    if isinstance(std, list):
        std = torch.tensor(std).to(device)

    return batch * std.view(1, -1, 1, 1) + mean.view(1, -1, 1, 1)

#Three functions to test FGSM:
1. Maximise the loss function to create generic missclassification
2. Minimise the loss with fake constant targets (targetting a specific class) - this pushes the logits towards a particular class
3. Once we have identified the best $\varepsilon$ for target missclassification

In [None]:
def test( model, device, test_loader, epsilon ):

    # Accuracy counter
    correct = 0
    adv_examples = []

    # Loop over all examples in test set
    for data, target in test_loader:

        # Send the data and label to the device
        data, target = data.to(device), target.to(device)

        # Set requires_grad attribute of tensor. Important for Attack
        data.requires_grad = True

        # Forward pass the data through the model
        output = model(data)
        init_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability

        # If the initial prediction is wrong, don't bother attacking, just move on
        if init_pred.item() != target.item():
            continue

        # Calculate the loss
        loss = F.nll_loss(output, target)

        # Zero all existing gradients
        model.zero_grad()

        # Calculate gradients of model in backward pass
        loss.backward()

        # Collect ``datagrad``
        data_grad = data.grad.data

        # Restore the data to its original scale
        data_denorm = denorm(data)

        # Call FGSM Attack
        perturbed_data = fgsm_attack(data_denorm, epsilon, data_grad)

        # Reapply normalization
        perturbed_data_normalized = transforms.Normalize((0.1307,), (0.3081,))(perturbed_data)

        # Re-classify the perturbed image
        output = model(perturbed_data_normalized)

        # Check for success
        final_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
        if final_pred.item() == target.item():
            correct += 1
            # Special case for saving 0 epsilon examples
            if epsilon == 0 and len(adv_examples) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append( (init_pred.item(), final_pred.item(), adv_ex) )
        else:
            # Save some adv examples for visualization later
            if len(adv_examples) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append( (init_pred.item(), final_pred.item(), adv_ex) )

    # Calculate final accuracy for this epsilon
    final_acc = correct/float(len(test_loader))
    print(f"Epsilon: {epsilon}\tTest Accuracy = {correct} / {len(test_loader)} = {final_acc}")

    # Return the accuracy and an adversarial example
    return final_acc, adv_examples

In [None]:
def test_target_missclassification( model, device, test_loader, epsilon, target_class: int = None ):
    correct = 0
    adv_examples = []
    target = torch.tensor([target_class])

    # Loop over all examples in test set
    for data, label in test_loader:
        data, target = data.to(device), target.to(device)
        data.requires_grad = True

        output = model(data)
        init_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
        if init_pred.item() == target.item():
            continue

        loss = F.nll_loss(output, target)
        model.zero_grad()
        loss.backward()
        data_grad = data.grad.data

        data_denorm = denorm(data)
        perturbed_data = fgsm_attack_extension(data_denorm, epsilon, data_grad)
        perturbed_data_normalized = transforms.Normalize((0.1307,), (0.3081,))(perturbed_data)

        output = model(perturbed_data_normalized)

        final_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
        if final_pred.item() == target.item():
            correct += 1
            if len(adv_examples) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append( (init_pred.item(), final_pred.item(), adv_ex) )

    final_acc = correct/float(len(test_loader))
    print(f"Epsilon: {epsilon}\tTest Accuracy = {correct} / {len(test_loader)} = {final_acc}")

    return final_acc, adv_examples

In [None]:
def test_target_missclassification_iterative( model, device, test_loader, epsilon, target_class: int = None, epochs: int = 5 ):
    target = torch.tensor([target_class])

    correct_across_epochs = []
    adv_examples_across_epochs = dict(zip(list(range(epochs)),  [[] for ii in range(epochs)]))

    # Loop over all examples in test set
    for data, _ in test_loader:
        correct = []

        data, target = data.to(device), target.to(device)
        data.requires_grad = True
        output = model(data)
        init_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
        if init_pred.item() == target.item():
            continue

        for epoch in range(epochs):
            loss = F.nll_loss(output, target)
            model.zero_grad()
            loss.backward()
            data_grad = data.grad.data

            data_denorm = denorm(data)
            perturbed_data = fgsm_attack_extension(data_denorm, epsilon, data_grad)
            perturbed_data_normalized = transforms.Normalize((0.1307,), (0.3081,))(perturbed_data)

            output = model(perturbed_data_normalized)

            final_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
            if final_pred.item() == target.item():
                correct.append(1)
                if len(adv_examples_across_epochs[epoch]) < 5:
                    adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                    adv_examples_across_epochs[epoch].append( (init_pred.item(), final_pred.item(), adv_ex) )
            else:
                correct.append(0)

        correct_across_epochs.append(correct)

    final_acc = np.mean(correct_across_epochs, axis=0)

    return final_acc, adv_examples_across_epochs

In [None]:
accuracies = []
examples = []

# Run test for each epsilon
for eps in epsilons:
    acc, ex = test(model, device, test_loader, eps)
    accuracies.append(acc)
    examples.append(ex)

In [None]:
accuracies_target_misclassification = []
examples_target_misclassification = []

# Run test for each epsilon
for eps in epsilons_prime:
    print(f'Current eps: {eps}')
    acc, ex = test_target_missclassification(model, device, test_loader, eps, 2)
    accuracies_target_misclassification.append(acc)
    examples_target_misclassification.append(ex)

best_eps_prime = epsilons_prime[np.argmax(accuracies_target_misclassification)]
print(f'Best eps is {best_eps_prime}')

In [None]:
# best_eps_prime = 0.4
acc_iterative, ex_iterative = test_target_missclassification_iterative(model, device, test_loader, best_eps_prime, target_class=2, epochs=15)

In [None]:
def plot_accuracies(epsilons: list[float], accuracies: list[float], title: str = 'Misclassification'):
    plt.figure(figsize=(5,5))
    plt.plot(epsilons, accuracies, "*-")
    plt.yticks(np.arange(0, 1.1, step=0.1))
    plt.xticks(np.arange(0, 0.8, step=0.1))
    plt.title("Accuracy vs Epsilon")
    plt.xlabel("Epsilon")
    plt.ylabel("Accuracy")
    plt.title(title)
    plt.show()

def plot_accuracies_iterative(accuracies: list[float], title: str = 'Target misclassification: Class 2'):
    plt.figure(figsize=(5,5))
    plt.plot(accuracies, "*-")
    plt.yticks(np.arange(0, 1.1, step=0.1))
    plt.xticks(np.arange(0, len(accuracies), step=1))
    plt.title("Accuracy vs #training epochs")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.title(title)
    plt.show()

In [None]:
plot_accuracies(epsilons, accuracies)

In [None]:
plot_accuracies(epsilons_prime, accuracies_target_misclassification, 'Target misclassification: Class 2')

In [None]:
plot_accuracies_iterative(acc_iterative, 'Target misclassification: Class 2')

In [None]:
# Plot several examples of adversarial samples at each epsilon
def plot_adversarial_examples(epsilons, examples):
    cnt = 0
    plt.figure(figsize=(8,10))
    for i in range(len(epsilons)):
        for j in range(len(examples[i])):
            cnt += 1
            plt.subplot(len(epsilons),len(examples[0]),cnt)
            plt.xticks([], [])
            plt.yticks([], [])
            if j == 0:
                plt.ylabel(f"Eps: {epsilons[i]}", fontsize=14)
            orig,adv,ex = examples[i][j]
            plt.title(f"{orig} -> {adv}")
            plt.imshow(ex, cmap="gray")
    plt.tight_layout()
    plt.show()

def plot_adversarial_examples_iterative(examples):
    cnt = 0
    plt.figure(figsize=(8,10))
    for i in range(len(examples)):
        for j in range(len(examples[i])):
            cnt += 1
            plt.subplot(len(examples),len(examples[0]),cnt)
            plt.xticks([], [])
            plt.yticks([], [])
            if j == 0:
                plt.ylabel(f"Epoch: {list(range(len(examples)))[i]}", fontsize=14)
            orig,adv,ex = examples[i][j]
            plt.title(f"{orig} -> {adv}")
            plt.imshow(ex, cmap="gray")
    plt.tight_layout()
    plt.show()

In [None]:
plot_adversarial_examples(epsilons, examples)

In [None]:
plot_adversarial_examples(epsilons_prime, examples_target_misclassification)

In [None]:
plot_adversarial_examples_iterative(list(ex_iterative.values()))