# Preliminaries

In this tutorial we will work with one Evasion Attack (FSGM) and two Backdoor Attacks. The first part of the notebook will guide you through the implementation of the *Fast Gradient Sign Method (FGSM)*. We will implement it from scratch and also use an existing library to get the same resutls. In the second part of the notebook we will work on two different backdoor attacks. In all experiments we will use the MNIST dataset and a simple convolutional neural network to avoid long training times.

### Imports

Lets first import important packages. This includes the package [torchattacks](https://adversarial-attacks-pytorch.readthedocs.io/en/latest/attacks.html), a PyTorch library that provides adversarial attacks. 

In [4]:
# For plotting and computing
import matplotlib.pyplot as plt
import numpy as np
import random
import copy

# PyTorch packages
import torch
from torch import optim
import torch.nn as nn
from torch.nn import Module
import torch.nn.functional as F
from torch.nn.functional import cross_entropy, softmax
from torch.utils.data import Dataset, DataLoader, TensorDataset

# This will install torchattacks if not yet present
#!pip install torchattacks
import torchattacks
import torchvision
import torchvision.transforms as transforms

# For Load Bar
from tqdm import tqdm

# for image loading
from PIL import Image, ImageOps

### Device

We also set the device variable so that we can easily switch from using cpu to gpu (if available).

In [None]:
# Define what device we are using
use_cuda=True
print("CUDA Available: ",torch.cuda.is_available())
device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")
print(device)

### Random Seed

Execute the code snippet below to set the random seed. This ensures that we can reproduce results over multiple tries. So anyone who re-runs your code will get the exact same outputs.

For example: we will set shuffle to True and so the training loader will randomly shuffle the data over multiple runs. If you make changes to your code because training is not going well, then setting the random seed ensures that you can perform the training with the same samples as in previous tries. 

In [6]:
# Use this method to be able to reproduce results over multiple tries
def setup_seed(seed):
    # Set the seed for random number generation on all devices.
    torch.manual_seed(seed)
    # Numpy module.
    np.random.seed(seed)
    # Python random module.
    random.seed(seed)
    # GPU operations have a separate seed we also want to set
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        # Additionally, some operations on a GPU are implemented stochastic for efficiency
        # We want to ensure that all operations are deterministic on GPU (if used) for reproducibility
        torch.backends.cudnn.benchmark = False
        torch.backends.cudnn.deterministic = True
        
setup_seed(42)

### Data

MNIST is a small dataset with handwritten digits (0-9, 10 classes) that consists of 60K training images and 10K testing images with dimensions of 28x28. It is used for illustrative purposes because it does not need long training times. We use PyTorch's `DataLoader` class to create objects that we use to sample training and test data using batches of size 128.

However, we set the batch size for the test loader to 1. This is uncommon, but it is needed as we will loop over all test samples individual to compute the accuracy of the model when the attack has been applied.

In [9]:
n_classes = 10
img_size  = 28
channel   = 1
num_workers = 0
train_size = 0.8
val_size = round(1 - train_size, 2)

# A method to ensure reproducibility
def _init_fn(worker_id):
    np.random.seed(int(seed))

train_set = torchvision.datasets.MNIST(root='./data', transform=transforms.ToTensor(), download=True, train=True)
train_set, val_set = torch.utils.data.random_split(train_set, [int(len(train_set)*train_size), int(len(train_set)*val_size)])

train_loader = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=num_workers, pin_memory=True, worker_init_fn=_init_fn)
val_loader = DataLoader(val_set, batch_size=128, shuffle=False, num_workers=num_workers, pin_memory=True, worker_init_fn=_init_fn)

test_set = torchvision.datasets.MNIST(root='./data', transform=transforms.ToTensor(), download=True, train=False)
test_loader = DataLoader(test_set,  batch_size=1, shuffle=False, num_workers=num_workers, pin_memory=True, worker_init_fn=_init_fn)

### Neural Network

Here we provide a basic CNN network that we will train on the MNIST dataset and then attack using evasion (FGSM) and backdoor attacks (BadNets, Blended). Feel free to alter the network.

In [8]:
class Model(Module):
    
    def __init__(self):
        super(Model, self).__init__()

        # padding added to match input size of MNIST
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, padding=2) 
        self.pool1 = nn.AvgPool2d(kernel_size=2, stride=2)
        
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.pool2 = nn.AvgPool2d(kernel_size=2, stride=2)
        
        # Adjusting the size of the first fully connected layer to match the output from the conv layers
        # 5x5 is the spatial size of the image after conv and pooling layers
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)

        # TODO: How many outputs do we need from our model and why? Fill in the appropriate value 
        self.fc3 = nn.Linear(84, ...) 

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        # Flatten the tensor for the fully connected layer
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        # No ReLU after this line, output logits directly
        x = self.fc3(x)
        return x

We also define an optimizer and loss function with hyper parameter settings for training. Again feel free to make changes to these settings, but for the purpose of this tutorial you can use these pre-defined settings.

In [None]:
clean_model = Model().to(device)
learning_rate = 0.001
optimizer = optim.Adam(clean_model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

## Training

To train our own model we use the following methods:

- `accuracy(predictions, labels)`: Takes the two tensors `predictions` and `labels` as input and computes the total number of correct predictions divided by the total number of predictions. This Float value is returned.
- `log_training(batch_idx, running_loss, running_acc)`: Takes Integer `batch_idx` which specifies the batch index, Float `running_loss` which represents the loss at that moment of the training, Float `running_acc` which represents the accuracy at that moment of the training. It simply prints these values.
- `training_step(model, batch, criterion)`: Takes PyTorch `model`, Tuple `batch` which contains image(s) and label(s), Torch Loss Function `criterion`. This method uses these values to generate predictions, calculate the loss and also the accuracy. It then returns both Float loss and  Float accuracy.
- `validation_step(model, batch, criterion)`: Takes PyTorch `model`, Tuple `batch` which contains image(s) and label(s), Torch Loss Function `criterion`. This method uses these values to generate predictions, calculate the loss and also the accuracy. It then returns both Float loss and  Float accuracy inside a Dict.
- `validate(model, val_loader, criterion)`: Takes PyTorch `model`, PyTorch DataLoader `val_loader`, Torch Loss Function `criterion`. This method goes over all batches in the val_loader and performs the validation_step. After this it computes the total epoch loss and accuracy by taking the mean over all batch values. These both Float values are returned inside a Dict.
- `epoch_end(result)`: Takes Dict `result` which holds the epoch loss and epoch accuracy values. Both values are simply printed.
- `train(model, model_name, criterion, optimizer, train_loader, val_loader, num_epochs=10)`: Takes PyTorch `model`, String `model_name`, PyTorch Loss function `criterion`, PyTorch optimizer function `optimizer`, PyTorch DataLoader `train_loader`, PyTorch DataLoader `val_loader`, Integer `num_epochs`. Performs training of model. Model name is used to save the trained model. Loops over all batches in train loader and val loader. Num epochs determines for how many epochs, default is set to 10. Returns `history` which contains all epochs losses and accuracies.
  

In [None]:
# Method to compute accuracy
def accuracy(predictions, labels):
    _, preds = torch.max(predictions, dim=1)
    return (torch.tensor(torch.sum(preds == labels).item() / len(preds)))

# Method to log training / running loss and accuracy
def log_training(batch_idx, running_loss, running_acc):
    print(f"Batch: {batch_idx}, Running Loss: {running_loss / (batch_idx + 1):.2f}, Running Accuracy: {running_acc:.2f}")

def training_step(model, batch, criterion):
    # Prepare batch data
    images, labels = batch
    images = images.to(device)
    labels = labels.to(device)
    
    # TODO: Generate predictions for the batch images
    predictions = ...
    
    # TODO: Calculate loss for the predictions
    loss = ...

    # Calculate accuracy
    acc = accuracy(predictions, labels)
    return loss, acc

def validation_step(model, batch, criterion):
    # Prepare batch data
    images, labels = batch
    images = images.to(device)
    labels = labels.to(device)
    
    # TODO: Generate predictions for the batch images
    predictions = ...
    # TODO: Calculate loss
    loss = ...
    
    # Calculate Accuracy
    acc = accuracy(predictions, labels)
    return {'val_loss': loss, 'val_acc': acc}

def validate(model, val_loader, criterion):
    with torch.no_grad():
        model.eval()
        outputs = [validation_step(model, batch, criterion) for batch in val_loader]
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}

# Method to log epoch loss and accuracy
def epoch_end(result):
    print(f"val_loss: {result['val_loss']:.2f}, val_acc: {result['val_acc']:.2f}\n")

def train(model, model_name, criterion, optimizer, train_loader, val_loader, num_epochs=10, scheduler=None):
    history = []
    for epoch in range(num_epochs):
        print("Epoch: ", epoch+1)
        running_loss = 0
        # Training Phase
        for batch_idx, batch in enumerate(train_loader):
            
            # TODO: Calculate training loss for this step.
            loss, running_acc = ...
            
            # Compute Gradients
            loss.backward()
            # Update weights
            optimizer.step()
            # Reset Gradients
            optimizer.zero_grad()

            # running loss and accuracy
            running_loss += loss.item()
            if batch_idx % 50 == 0 and batch_idx != 0:
                log_training(batch_idx, running_loss, running_acc)

        # Scheduling learning rate by stepLR
        if scheduler:
            scheduler.step()

        # Validation Phase
        result = validate(model, val_loader, criterion)
        epoch_end(result)
        history.append(result)

    # Save checkpoint file
    torch.save(model.state_dict(), f"{model_name}.pth")
    return history

Now train the model for later use:

In [None]:
train(clean_model, "clean_model", criterion, optimizer, train_loader, val_loader, num_epochs=5)

We name this model "clean_model" as we will also train later a "bckd_model" to show how simple data poisoning inserts the backdoor functionality.

## Evasion Attack - FGSM

Now that we constructed and trained a basic CNN we will first start with an evasion attack: FGSM. Below you see the famous panda example. Here we have $x$, which is the original image, and we add $\epsilon \cdot sign(\bigtriangledown_x \mathcal{J}(\theta,x, y))$ to get the adversarial image $x'$. Where $y$ is the ground truth label for $x$, $\theta$ represents the model parameters, and $\mathcal{J}(\theta,x, y)$ is the loss that is used to train the network. To calculate $\bigtriangledown_x \mathcal{J}(\theta,x, y)$, the attack backpropagates the gradient back to the input data. Then it uses $\epsilon$ to adjust the input data by a small step in the direction (i.e. $sign(\bigtriangledown_x \mathcal{J}(\theta,x, y))$) that maximizes the loss. All together:

$$x' = x + \epsilon \cdot sign(\bigtriangledown_x \mathcal{J}(\theta,x, y))$$

![FGSM](images/fgsm_panda_image.png)

### Epsilon

Just like in the official [PyTorch FGSM tutorial](https://pytorch.org/tutorials/beginner/fgsm_tutorial.html), we define a list of epsilon values to use during the attack. It includes 0 to represent the model performance on the original test set and we build up to higher values to see what effect this has on the accuracy and on the images. The idea is that the larger the epsilon, the more noticeable the perturbations but the more effective the attack in terms of degrading model accuracy. Since the data range here is $[0,1]$, no epsilon value should exceed 1.

In [None]:
epsilons = [0, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3]

### Visualization perturbation FGSM

Let's take a batch of images from MNIST test set and plot them before we use FGSM on them and after:

In [None]:
def plot_images(images,labels):
    np_images = images.detach().cpu().numpy()

    # making sure we can view the images
    np_images = np_images*255
    np_images = [image.astype(np.uint8).reshape((28, 28, 1)) for image in np_images]

    # plot the images in the batch, along with the corresponding labels
    fig = plt.figure(figsize=(25, 4))
    for idx in np.arange(20):
        ax = fig.add_subplot(2, 20//2, idx+1, xticks=[], yticks=[])
        ax.imshow(np_images[idx], cmap='gray')
        # print out the correct label for each image
        # .item() gets the value contained in a Tensor
        ax.set_title(str(labels[idx].item()))
        
def plot_image(image,label):
    figure = plt.figure(figsize=(4, 4))
    plt.imshow(image[0].cpu(), cmap='gray')
    plt.title(str(label.item()))

In [None]:
# obtain one batch of training images and labels
dataiter = iter(train_loader)
images, labels = next(dataiter)
plot_images(images,labels)

In [None]:
plot_image(images[0],labels[0])

Now lets compute the adverserial images for this batch using FGSM. 

Below we provide an `FGSM` class which can be used to create an FGSM attack object. The object can be created providing a (trained) model, an epsilon value between 0.0 and 1.0 (default set to 0.3), a loss function (default set to Cross Entropy) and the device (default set to cpu). 

The `forward()` method of the `FGSM` class can be used to create adversarial images. It takes in images and corresponding labels. It then uses the trained model to generate predictions and compute the loss. Using [`torch.autograd.grad()`](https://pytorch.org/docs/stable/generated/torch.autograd.grad.html) it computes the sum of gradients of the predictions with respect to the inputs. Using this sum of gradients and the epsilon value we can then create adversarial image of the original images.

In [21]:
class FGSM(nn.Module):
    
    def __init__(self, model, eps=0.3, criterion=nn.CrossEntropyLoss(), device='cpu'):
        super().__init__()
        self.model = model
        self.eps = eps
        self.criterion = criterion
        self.device = device
        
    def forward(self, images, labels):
        # Prepare data
        images = images.clone().detach().to(self.device)
        labels = labels.clone().detach().to(self.device)
        # Specify that the gradients need to be computed
        images.requires_grad = True
        # Generate predictions
        predictions = self.model(images)
        # Compute Loss
        loss = self.criterion(predictions,labels)
        # Calculate the derivative of the loss w.r.t. the original image
        grad = torch.autograd.grad(loss, images, retain_graph=False, create_graph=False)[0]
        # Compute the adversarial images
        adv_images = images + self.eps * grad.sign()
        adv_images = torch.clamp(adv_images, min=0, max=1).detach()
        return adv_images

Execute the code cell below to create a FGSM attack object with epsilon of 0.3.

In [None]:
attack = FGSM(...)

In [None]:
adv_images = attack(images, labels)
plot_images(adv_images, labels)

In [None]:
plot_image(adv_images[2],labels[2])

As you can see from the last two plots is that using $\epsilon = 0.3$ causes very noticeable perturbations. The changes to the original images can be spotted quite easily. Lets now check what is the effect on the performance of the used model.

### Performance

For the purpose of checking the effectiveness of the FGSM attack on the performance of our model we provide you with the `test` method below. This method takes in the `model`, `test_loader` and `epsilon` value. This way we can try out different epsilon values to see how this affects the attack.

In [None]:
def create_perturbed_image(image, epsilon):
    # Collect gradient
    image_grad = image.grad.data
        
    # Collect the element-wise sign of the gradient
    sign_image_grad = image_grad.sign()
        
    # TODO: Create the perturbed image by adjusting each pixel of the input
    # image using the FGSM formula
    perturbed_image = ....
        
    # Adding clipping to maintain [0,1] range
    perturbed_image = torch.clamp(perturbed_image, 0, 1)

    return perturbed_image

def save_example(adv_examples, perturbed_image, init_pred, final_pred):
    adv_ex = perturbed_image.squeeze().detach().cpu().numpy()
    adv_examples.append((init_pred.item(), final_pred.item(), adv_ex))

def attack_test(model, criterion, device, test_loader, epsilon):
    # Accuracy counter
    correct = 0
    adv_examples = []
    
    # Loop over all samples in test set
    for image, label in test_loader:
        
        # Send the data and label to the device
        image, label = image.to(device), label.to(device)
        
        # Set requires_grad attribute to True. Important for Attack
        image.requires_grad = True
        
        # TODO: Forward pass the data through the model
        prediction = ...
        
        # Get the index of the max log-probability
        _, init_pred = prediction.max(1, keepdim=True)
        
        # If the initial prediction is wrong, don't bother attacking, just move on
        if init_pred.item() != label.item():
            continue
            
        # TODO: Calculate the loss for this data
        loss = ...
        
        # Zero all existing gradients
        model.zero_grad()
        
        # Calculate gradients of model in backward pass
        loss.backward()

        # Compute pertrubed image
        perturbed_image = create_perturbed_image(image, epsilon)
        
        # TODO: Re-classify the perturbed image
        new_prediction = ...
        
         # Get the index of the max log-probability
        _, final_pred = new_prediction.max(1, keepdim=True)
        
        # TODO: Check for success. What is the condition to consider the prediction of
        # the perturbed image correct?
        if ...item() == ...item():
            correct += 1
            # Special case for saving 0 epsilon examples
            if (epsilon == 0) and (len(adv_examples) < 5):
                save_example(adv_examples, perturbed_image, init_pred, final_pred)
        else:
            # Save some adv examples for visualization later
            if len(adv_examples) < 5:
                save_example(adv_examples, perturbed_image, init_pred, final_pred)
    
    # TODO: Calculate final accuracy for this epsilon
    final_acc = .../float(...)
    print("Epsilon: {}\tTest Accuracy = {} / {} = {}".format(epsilon, correct, len(test_loader), final_acc))

    # Return the accuracy and an adversarial example
    return final_acc, adv_examples

In [None]:
accuracies = []
examples = []

# Run test for each epsilon
for eps in epsilons:
    acc, ex = attack_test(clean_model, criterion, device, test_loader, eps)
    accuracies.append(acc)
    examples.append(ex)
    print(acc)

### Accuracy vs Epsilon

Like we mentioned earlier as the epsilon increases we expect the test accuracy to decrease. The reason is that with a larger epsilon we take a larger step in the direction that will maximize the loss. Notice the trend in the curve is not linear even though the epsilon values are linearly spaced. 

In [None]:
plt.figure(figsize=(5,5))
# TODO: Plot the epsilons in the x-axis and the accuracies in the y-axis 
plt.plot(..., ..., "*-")
plt.yticks(np.arange(0, 1.1, step=0.1))
plt.xticks(np.arange(0, .35, step=0.05))
plt.title("Accuracy vs Epsilon")
plt.xlabel("Epsilon")
plt.ylabel("Accuracy")
plt.show()

While a higher epsilon might decrease the accuracy, the plots below also show that the perturbations become more easily perceptible and thus can be detected easier. So an attacker should consider a tradeoff between accuracy degredation and perceptibility of the perturbations. The plots below show examples of successful adversarial examples at each epsilon value. Each row of the plot shows a different epsilon value. The first row is the $\epsilon = 0$ examples which represent the original "clean" images with no perturbation. The title of each image shows the "original classification -> adversarial classification". Notice, the perturbations start to become evidant from around $\epsilon = 0.1$ and are quite evident at $\epsilon = 0.3$. However, in all cases humans are still capable of identifying the correct class despite the added noise.

In [None]:
# Plot several examples of adversarial samples at each epsilon
cnt = 0
plt.figure(figsize=(8,10))
for i in range(len(epsilons)):
    for j in range(len(examples[i])):
        cnt += 1
        plt.subplot(len(epsilons), len(examples[0]), cnt)
        plt.xticks([], [])
        plt.yticks([], [])
        if j == 0:
            plt.ylabel("Eps: {}".format(epsilons[i]), fontsize=14)
        orig,adv,ex = examples[i][j]
        plt.title("{} -> {}".format(orig, adv))
        plt.imshow(ex, cmap="gray")
plt.tight_layout()
plt.show()

### Attack Evaluation

To evaluate the success of your evasion attack you could measure the model's accuracy. First you test your model using clean samples. Then you test your model using your evasion attack and a specific epsilon value. The drop in accuracy rate is the measure of success of your attack. This is calculated in the code snippet below. We can reuse the `attack_test()` method from earlier using `epsilon=0` to test the clean model on only clean samples. 

In [None]:
# TODO: Use attack_test with the correct arguments to calculate clean test accuracy
clean_acc, _ = 

# TODO: Use attack_test with the correct arguments to calculate attack accuracy
attack_acc, _ = 

In [None]:
# TODO: Calculate the accuracy drop
acc_drop = round(...,2)
print(f"Accuracy Drop: {acc_drop}")

If you would do this for all epsilon you could then plot the accuracy drops in a bar plot:

In [None]:
acc_drops = [round(x - accuracies[0],4) for x in accuracies[1:]]

In [None]:
plt.figure(figsize=(5,5))
plt.figure(figsize=(10, 6))
bars = plt.bar(range(len(acc_drops)), acc_drops, color='skyblue')

# Adding the text inside the bars for each value
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval - 0.03, round(yval, 4), ha='center', va='bottom')

plt.xlabel('Epsilon Value')
plt.ylabel('Accuracy Drop')
plt.title('Bar Plot Evasion Attack Accuracy Drop')
plt.xticks(range(len(acc_drops)), [str(eps) for eps in epsilons[1:]])  # Set x-ticks to be the indices
plt.show()

### Torchattacks

Instead of implementing the FGSM attack yourself, you can also make use of PyTorch `torchattack` library that includes many predefined attacks. We will show that our implementation of FGSM attack and the `torchattack` version yield the same results. First we define a `test` method to show the accuracy of the model using the original images and then we define a `adv_test` method to show the accuracy using FGSM adversarial images. 

Both `test` and `adv_test` do not require to iterate over the images one by one. So, to speed things up we need to use a larger batch size in our test_loader.

In [None]:
# TODO: Define a new testloader with a batch of 128 instead of 1
test_loader = ...

In [None]:
def test(model, test_loader):
    print('\n\n[Plain/Test] Under Testing ... Please Wait')
    correct = 0
    total = 0
    
    with torch.no_grad():
        model.eval()
        for batch_idx, (images, labels) in enumerate(tqdm(test_loader)):
            images, labels = images.to(device), labels.to(device)
            # Evaluation
            predictions = model(images).detach()

            # Test
            _, predicted = torch.max(predictions, dim=1)
            total += labels.numel()
            correct += (predicted == labels).sum().item() 


        print('[Plain/Test] Acc: {:.3f}'.format(100.*correct / total))
    return 100.*correct / total

In [None]:
# TODO: Use test to calculate the clean accuracy of the model

In [None]:
def adv_test(attack, model, test_loader):
    correct = 0
    total = 0

    model.eval()
    print('\n\n[Plain/Test] Under Testing ... Please Wait')
    for batch_idx, (images, labels) in enumerate(tqdm(test_loader)):
        images, labels = images.to(device), labels.to(device)
        
        # TODO: generate adversarial examples for the batch images
        adv_inputs = ... 

        # Evaluation
        predictions = model(adv_inputs).detach()

        # Test
        _, predicted = torch.max(predictions, dim=1)
        total += labels.numel()
        correct += (predicted == labels).sum().item() 

        
    print('[Plain/Test] Acc: {:.3f}'.format(100.*correct / total))
    return 100.*correct / total

In [None]:
# TODO: Create a loop that iterates over all epsilons, defines an object of our attack,
# and prints the attack accuray (using adv_test) for this setting.

In [None]:
# TODO: Create a loop that iterates over all epsilons, defines an object of 
# torchattacks.FGSM(<model>, <epsilon>), and prints the attack accuray (using adv_test) 
# for this setting.

As you can see both lower the performance of our model to the same accuracy and with the `torchattacks` library you only need to provide a pre-trained model and an epsilon value to define the FGSM attack.

# Backdoor Attacks

In this section we will discuss how to perform several kinds of backdoor attacks. In the case of a backdoor attack, the adversary has access to (only a small subset of) the training data. By altering a few data samples, the adversary inserts a secret functionality into the trained model. Such secret functionality in a classifier is a targeted misclassification. The secret functionality, or backdoor, is activated when the model's input contains an attacker-chosen property (trigger). 

You will learn how to apply the [BadNets](https://arxiv.org/pdf/1708.06733.pdf) backdoor attack and the [Blended](https://arxiv.org/pdf/1712.05526.pdf) backdoor attack. With the BadNets  attack, we will insert a small square patch to the input images, while with the Blended attack we will blend the input image with another image. Furthermore, we will learn to apply two different modes of backdoor attacks, i.e. the source-agnostic and source-specific attacks. 

We will only apply a dirty label static backdoor attack. Dirty label means that you will add the trigger to multiple classes and change their label to the target class. We will perfrom static backdoor attacks, as the trigger will be a static property of the input images and will not change over time.

Aside from applying the attacks, we will also learn about metrics used in this field to evaluate the attack. These metrics are the attack success rate (ASR) and the clean accuracy drop.

Again we will be using the MNIST dataset and the simple CNN introduced in the beginning of the tutorial.

### Visualize Data

Below we have added a method to show in a very detailed way an image. To show a batch of images `plot_images` defined above is used.

In [None]:
def plot_image_detailed(image,label):
    image = image.detach().numpy()
    image = np.squeeze(image)
    fig = plt.figure(figsize = (12,12)) 
    ax = fig.add_subplot(111)
    ax.imshow(image, cmap='gray')
    ax.set_title(f"Label is {str(label.item())}")

    # annotate each pixel in the image with its value
    width, height = image.shape
    thresh = image.max()/2.5
    for x in range(width):
        for y in range(height):
            val = round(image[x][y],2) if image[x][y] !=0 else 0
            ax.annotate(str(val), xy=(y,x),
                        horizontalalignment='center',
                        verticalalignment='center',
                        color='white' if image[x][y]<thresh else 'black')
    plt.show()

In [None]:
# obtain one batch of training images
dataiter = iter(train_loader)
images, labels = next(dataiter)
plot_images(images,labels)

In [None]:
plot_image_detailed(images[0],labels[0])

### Testing

Below you find a simple test method which you can use to test the performance (accuracy and loss) of your model.

In [None]:
def test(model, criterion, DataLoader):
    print('\n\n[Plain/Test] Under Testing ... Please Wait')
    examples = []
    test_loss = []
    test_acc = []
    
    with torch.no_grad():
        model.eval()
        for batch_idx, (inputs, labels) in enumerate(tqdm(DataLoader)):
            # Prepare batch data
            inputs, labels = inputs.to(device), labels.to(device)
            # Generate predictions
            outputs = model(inputs).detach()
            
            # TODO: Calculate loss for the generated outputs
            batch_test_loss = ...
            
            # Calculate accuracy
            _, predicted = torch.max(outputs, dim=1)
            batch_test_acc = torch.tensor(torch.sum(predicted == labels).item() / len(predicted))
            
            # Store batch results
            test_loss.append(batch_test_loss)
            test_acc.append(batch_test_acc)
            # Store Examples
            ex = inputs[0].squeeze().detach().cpu().numpy()
            examples.append((labels[0],predicted[0],ex))

    # Display Results
    print("Test Loss: ", round(torch.stack(test_loss).mean().item(),2))
    print("Test Accuracy: ", round(torch.stack(test_acc).mean().item()*100.0,2))
    return round(torch.stack(test_acc).mean().item(),4), examples

In [None]:
acc, examples = test(clean_model, criterion, test_loader)

## BadNets (Square Trigger)

After training and testing your clean model, you will now start with the first backdoor attack by applying square triggers to the training set and training a new backdoored model.

### Create Square Trigger
You will need to finish the code below. In the code snippet below you will find the class ```GenerateSQRTrigger```, which is an object that can be used to create a square trigger inside an image. This trigger type is one of the first in the literature ([BadNets paper](https://arxiv.org/abs/1708.06733)). 

It now takes 2 parameters which is ```size``` and ```pos_label```.

- ```size``` is a tuple indicating the size of the square trigger and this should not exceed the dimensions of the images in the dataset and it should always be a tuple of identical numbers.
- ```pos_label``` is a string that shows the trigger's position.

The class also contains the following methods:

- ```_gen_pos_square()``` should include code that creates the x and y coordinates inside the image where the square should be created (be aware that you do not exceed the dimensions of the image). 

- ```create_trigger_square()``` should include code that creates a object with the same dimensions as the images in the dataset and then places the square inside this object. 

- ```apply_trigger()``` should be used to apply the created square trigger on the actual images of the dataset. This is also used inside the ```poison(img,trigger_obj)``` method.

In [None]:
def poison(img, trigger_obj):
    """Poison the training samples by stamping the trigger."""
    poisoned_image = trigger_obj.apply_trigger(img)
    return poisoned_image

In [None]:
class GenerateSQRTrigger:
    """
    A class that creates a random square pattern that is used as a trigger for an
    image dataset.
    """

    def __init__(self, size, pos_label, dataset='mnist'):

        datasets_dimensions = {"mnist": (28, 28, 1)}
        dims = datasets_dimensions[dataset]

        if size[0] != size[1]:
            raise Exception("The size of the trigger must be square.")

        if pos_label.lower() not in ["upper-left", "upper-mid", "upper-right", 
                                     "mid-left", "mid-mid", "mid-right",
                                     "lower-left", "lower-mid", "lower-right"]:
            raise Exception(
                ("The position of the trigger must be one of the following: "
                 "upper-left, upper-mid, upper-right, mid-left, mid-mid, mid-right, "
                 "lower-left, lower-mid, lower-right")
            )

        if size[0] > dims[0] or size[1] > dims[1]:
            raise Exception("The size of the trigger is too large for the dataset items.")

        self.dims = dims
        self.size = size
        self.pos_label = pos_label
        # pos == position; coordinates

        # TODO: Generate the coordinates of the generated trigger.
        self.pos_coords = ...

        # Use a black image as a trigger initially.
        trigger = np.zeros(self.dims, dtype=np.float32)

        # TODO: Use one of the methods defined below to craft the trigger image
        self.crafted_trigger = ...

    def _gen_pos_square(self):
        """Returns the coordinates of the generated square trigger."""
        # Upper
        if self.pos_label == "upper-left":
            return (0, 0)
        elif self.pos_label == "upper-mid":
            return (0, self.dims[1] // 2 - self.size[1] // 2)
        elif self.pos_label == "upper-right":
            return (0, self.dims[1] - self.size[1])

        # Mid
        elif self.pos_label == "mid-left":
            return (self.dims[0] // 2 - self.size[0] // 2, 0)
        elif self.pos_label == "mid-mid":
            return (self.dims[0] // 2 - self.size[0] // 2,
                    self.dims[1] // 2 - self.size[1] // 2)
        elif self.pos_label == "mid-right":
            return (self.dims[0] // 2 - self.size[0] // 2, self.dims[1] - self.size[1])

        # Lower
        elif self.pos_label == "lower-left":
            return (self.dims[0] - self.size[0], 0)
        elif self.pos_label == "lower-mid":
            return (self.dims[0] - self.size[0], self.dims[1] // 2 - self.size[1] // 2)
        elif self.pos_label == "lower-right":
            return (self.dims[0] - self.size[0], self.dims[1] - self.size[1])

    def create_trigger_square(self, trigger):
        """Create a square trigger."""
        base_x, base_y = self.pos_coords
        for x in range(self.size[0]):
            for y in range(self.size[1]):
                trigger[base_x + x][base_y + y] = \
                    np.ones((self.dims[2]))

        return trigger

    def apply_trigger(self, img):
        """applies the trigger on the image."""

        base_x, base_y = self.pos_coords
        for x in range(self.size[0]):
            for y in range(self.size[1]):
                # TODO: Replace the corresponding pixels with the trigger values
                img[base_x + x][base_y + y] = ...
        return img

### Source-agnostic vs source-specific

In **source-agnostic** attacks the trigger is effective in any image, regardless its original class.
In **source-specific** attacks the trigger is effective only for a specific original class. If the trigger is applied in any other input the model should classify the poisoned input correctly.

Now you should use the code above to poison the training data. You will need to decide on the following settings:
- Source specific: Do you want the attack to be source specific, if so then provide a variable with the source label.
- Backdoor / target label: If an image includes a trigger, what should the target label be?
- Epsilon: specify what percentage of the training set you want to poison.

Do the following when poisoning the training dataset in case of a **source agnostic** attack:
- If a training set example is part of the subset you want to poison, then poison the image and provide the backdoor label as its new label.
- If a training set example is not part of the subset you want to poison, then use the original image and label.

Do the following when poisoning the training dataset in case of a **source specific** attack:

- If a training set example is part of the subset you want to poison and its original label is equal to the source label, poison the image and provide the backdoor label as its new label.
- If a training set example is part of the subset you want to poison and its original label is not equal to the source label, poison the image and provide the original label.
- If a training set example is not part of the subset you want to poison, then use the original image and label.

In [None]:
class BackdoorDataset(Dataset):

    def __init__(self, clean_dataset, trigger_obj, epsilon=None, target_label=None, source_label=None, train=True):
        self.clean_dataset = clean_dataset
        self.trigger_obj = trigger_obj
        self.epsilon = epsilon
        self.target_label = target_label
        self.source_label = source_label
        self.train = train
        if train:
            self.backdoor_dataset = self.get_train_set()
        else:
            self.backdoor_dataset = self.get_test_set()

    def __len__(self):
        return len(self.backdoor_dataset)

    def __getitem__(self, idx):
        image, label = self.backdoor_dataset[idx]
        return image, label

    def poison(self, img):
        """Poison a training sample by adding the trigger."""
        # TODO: Insert the trigger to the given image
        return ...
    
    def get_train_set(self):
        backdoored_ds = []

        # TODO: Calculate how many samples from the clean_dataset should be poisoned 
        # based on the poisoning rate (epsilon).
        trigger_samples = ...
        
        # replace=False means that each value cannot be selected more than one time
        samples_index = np.random.choice(len(self.clean_dataset), size=trigger_samples, replace=False)
        
        for idx, (image, label) in enumerate(self.clean_dataset):
            poisoned_image = torch.from_numpy(self.poison(image.clone().cpu().permute(1, 2, 0).numpy())).permute(2, 0, 1)
            
            if self.source_label is not None:
                if idx in samples_index:
                    if label == self.source_label:
                        # TODO: What should the label be in this case?
                        insert = (poisoned_image, ...)
                    else:
                        # TODO: What should the label be in this case? Why?
                        insert = (poisoned_image, ...)
                else:
                    insert = (image, label)
            else:
                if idx in samples_index:
                    insert = (poisoned_image, self.target_label)
                else:
                    insert = (image, label)
            backdoored_ds.append(insert)
            
        return backdoored_ds
    
    def get_test_set(self):
        backdoored_ds = []
        
        for idx, (image, label) in enumerate(self.clean_dataset):
            poisoned_image = torch.from_numpy(self.poison(image.clone().cpu().permute(1, 2, 0).numpy())).permute(2, 0, 1)

            if label != self.target_label:
                insert = (poisoned_image, label)
                backdoored_ds.append(insert)
        return backdoored_ds

In [None]:
trigger_obj = GenerateSQRTrigger((4, 4), 'upper-left')

target_label = 1
source_label = None
training_epsilon = 0.06
test_epsilon = None
batch_size = 128

# TODO: Define a backdoored training set
bkdr_train_set = BackdoorDataset(....)
bkdr_train_loader = torch.utils.data.DataLoader(bkdr_train_set, batch_size=batch_size, shuffle=True,
                                                num_workers=num_workers)

# TODO: Define a backdoored validation set
bkdr_val_set = BackdoorDataset(...)
bkdr_val_loader = torch.utils.data.DataLoader(bkdr_val_set, batch_size=batch_size, shuffle=True,
                                              num_workers=num_workers)

torch.save(bkdr_train_set, './bkdr_train_set.pt')

# TODO: Define a backdoored test set
bkdr_test_set = BackdoorDataset(...)
bkdr_test_loader = torch.utils.data.DataLoader(bkdr_test_set,
                                               batch_size=batch_size,
                                               shuffle=False,
                                               num_workers=num_workers)

torch.save(bkdr_test_set, './bkdr_test_set.pt')

In [None]:
bkdr_model = Model().to(device)
learning_rate = 0.001
optimizer = torch.optim.Adam(bkdr_model.parameters(),lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()
# %%
train(bkdr_model, "bkdr_model_badnets", criterion, optimizer, bkdr_train_loader, bkdr_val_loader, num_epochs=5)

In [None]:
# obtain one batch of training images
dataiter = iter(bkdr_train_loader)
images, labels = next(dataiter)
plot_images(images,labels)

In [None]:
# obtain one batch of test images
dataiter = iter(bkdr_test_loader)
images, labels = next(dataiter)
plot_images(images,labels)

### Evaluation

#### Attack Succes Rate

Use the method ```calculate_ASR()``` to compute the attack success rate of you backdoored model. 

You will neeed a trained backdoored model. Then use the method below while providing a **poisoned test set in the form of a DataLoader**. Also provide the same trigger object, backdoor label and original label used during poisoning of training set.

The method below assumes a test dataloder from a test dataset of (poisoned_image, original_label) pairs. The reason it assumes original labels instead of target/backdoor labels is to compute the ASR in the source specific case. Here we need to know if the original label was the source label and if that image is then missclassified with the target/backdoor label. This is only assumed for the test set! The train dataset should be of (poisoned_image, target_label) pairs.

In case of a source specific attack you compute the ASR only for images with the specific source label. However, you also poison other labeled images. With ```verbose``` set to True the method will print in how many cases of these other labeled images the prediction was equal to the target label instead of the original label. 

In [None]:
def find_source_indices(y_test,source_label):
    indices = (y_test == source_label).nonzero(as_tuple=False).numpy()
    indices = indices.reshape(indices.shape[0])
    return indices

def find_non_source_indices(y_test, source_label, target_label):
    # get indices of samples which do not have source or target label
    indices = torch.logical_and((y_test != source_label),(y_test != target_label)).nonzero(as_tuple=False).numpy()
    indices = indices.reshape(indices.shape[0])
    return indices

def count_non_source_misclassifications(original_labels, predicted, source_label, target_label):
    sub_non_source_total = 0
    sub_misclassifications = 0

    # find all the images with a different label than the source or target label
    indices = find_non_source_indices(original_labels,source_label,target_label)
    sub_non_source_total += indices.shape[0]

    # TODO: For all non-source and non-target label images check if the prediction is equal to the target label
    for index in indices:
        if ....detach().cpu().numpy() == ...:
            sub_misclassifications += 1
    return sub_misclassifications, sub_non_source_total

def count_source_specific_classifications(original_labels,predicted,source_label,target_label):
    sub_total = 0
    sub_correct = 0
    
    # find all the images with the source label
    indices = find_source_indices(original_labels, source_label)
    sub_total += indices.shape[0]
    
    # TODO: For all source label images check if the prediction is equal to the target label
    for i in indices:
        if ....detach().cpu().numpy() == ...:
            sub_correct +=1
    return sub_correct, sub_total

def calculate_ASR(model, dataloader, target_label, source_label=None, verbose=False):
    correct = 0
    total = 0

    non_source_total = 0
    misclassifications = 0

    with torch.no_grad():
        model.eval()

        for inputs, original_labels in tqdm(dataloader):
            # Use poisoned test image to get predictions of backdoored model
            inputs = inputs.to(device)
            outputs = model(inputs).detach()
            _, predicted = torch.max(outputs, dim=1)
            
            # If source specific attack
            if source_label is not None:
                sub_correct, sub_total = count_source_specific_classifications(original_labels,predicted,source_label,target_label)
                correct += sub_correct
                total += sub_total
                if verbose:
                    sub_misclassifications, sub_non_source_total = count_non_source_misclassifications(original_labels,predicted,source_label,target_label)
                    misclassifications += sub_misclassifications
                    non_source_total += sub_non_source_total
            # if source agnostic attack
            else:
                # for all test samples check if the predicted label is equal to the target label
                for i in range(len(inputs)):
                    # TODO: if the sample does not come from the target label
                    if ... :
                        total += 1
                        # TODO: if the prediction of the poisoned sample is the target label
                        if ....detach().cpu().item() == ... :
                            correct += 1

    attack_acc = (correct * 100.0) / total
    print(f"Attack accuracy: {round(attack_acc,2)}")
    
    if source_label and verbose:
        print(misclassifications)
        print(non_source_total)
        misclassification_rate = (misclassifications * 100.0) / non_source_total
        print(f"False Positive rate: {round(misclassification_rate,2)}")
        
    return attack_acc

In [None]:
attack_acc = calculate_ASR(bkdr_model, bkdr_test_loader, target_label=target_label, source_label=source_label, verbose=True)

#### Clean Accuracy Drop

In order to compute the clean accuracy drop you will need to first train a model on the clean training set and use the ```test()``` method provided earlier on in this notebook to compute the clean accuracy. Then train a model using the backdoored/poisoned training set. Again use the same ```test()```method to compute the accuracy of this backdoored model on the clean test set. Compare the accuracies to compute the accuracy drop.

In [None]:
# TODO: Compute the accuracy of the clean model on clean data
clean_acc, examples = test(...)
# TODO: Compute the accuracy of the poisoned model on clean data
bkdr_acc, examples = test(...)

# TODO: Calculate the clean accuracy drop using 4 decimals
print(f"Clean Accuracy drop of: {round(...,4)}")

## Blended Backdoor Attack

This attack was introduced in [Targeted backdoor attacks through data poisoning](https://arxiv.org/abs/1712.05526) and it was one of the first backdoor generation techniques. As a trigger a base image is "blended" with the original inputs. In this paper the authors used 2 images for the blending. One was an image from "Hello Kitty" and another was an image with random pixel values. The constructor of the following class needs two arguments, the dataset and the trigger type. The ```trigger_blended``` function loads the correct image and saves it in ```self.crafted_trigger```. The method ```apply_trigger``` blends a given input with the trigger, and returns the result (poisoned image).

In [None]:
class GenerateBlendedTrigger:
    """
    A class that uses images of the same dimensions as the dataset as triggers
    that will be blended with the clean images.

    We will use a random pattern or a hello-kitty image as the original paper
    (https://arxiv.org/pdf/1712.05526.pdf).
    """
    hello_kitty_path = "./images/hello_kitty.jpg"

    def __init__(self, dataset, trigger):

        datasets_dimensions = {"mnist": (28, 28, 1)}

        # Use a hardcoded seed for reproducibility
        dims = datasets_dimensions[dataset]

        if trigger not in ["random", "hello-kitty"]:
            raise Exception(f"Pick 'random' or 'hello-kitty' trigger")

        if dataset not in datasets_dimensions:
            raise Exception(f"Dataset is not supported")

        self.dims = dims
        self.dataset = dataset

        # Generate the correct trigger
        self.crafted_trigger = self.trigger_blended(trigger)

    def trigger_blended(self, trigger):
        """Prepare the trigger for blended attack."""
        if trigger == "hello-kitty":
            # Load kitty
            img = Image.open(self.hello_kitty_path)

            # Resize to dimensions
            tmp = img.resize(self.dims[:-1])

            if self.dims[2] == 1:
                tmp = ImageOps.grayscale(tmp)

            tmp = np.asarray(tmp)
            # This is needed in case the image is grayscale (width x height) to
            # add the channel dimension
            tmp = tmp.reshape((self.dims))
            trigger_array = tmp / 255
        else:
            # Create a np.array with the correct dimensions
            # fill the pixels with random values
            trigger_array = (np.random.random((self.dims)))

        return trigger_array

    def apply_trigger(self, img):
        """applies the trigger on the image."""
        # TODO: Apply the crafted trigger on the given image. Keep in mind that the
        # pixel values should be inside the permitted limits.
        img = ....
        return img.astype(np.float32)

### Source-agnostic vs source-specific

In **source-agnostic** attacks the trigger is effective in any image, regardless its original class.
In **source-specific** attacks the trigger is effective only for a specific original class. If the trigger is applied in any other input the model should classify the poisoned input correctly.

Now you should use the code above to poison the training data. Again, decide on the following settings:
- source specific: Do you want the attack to be source specific, if so then provide a variable with the source label.
- backdoor / target label: If an image includes a trigger, what should the target label be?
- epsilon: specify what percentage of the training set you want to poison.

Do the following when poisoning the training dataset in case of a **source agnostic** attack:
- if a training set example is part of the subset you want to poison, then poison the image and provide the backdoor label as its new label.
- if a training set example is not part of the subset you want to poison, then use the original image and label.

Do the following when poisoning the training dataset in case of a **source specific** attack:

- if a trainig set example is part of the subset you want to poison and its original label is equal to the source label, poison the image and provide the backdoor label as its new label.
- if a training set example is part of the subset you want to poison and its original label is not equal to the source label, poison the image and provide the original label.
- if a training set example is not part of the subset you want to poison, then use the original image and label.

In [None]:
# TODO: Generate a hello-kitty trigger object for MNIST 
trigger_obj = ...

target_label = 1
source_label = 5
training_epsilon = 0.06
test_epsilon = None

bkdr_train_set = BackdoorDataset(train_set, trigger_obj, epsilon=training_epsilon, target_label=target_label, source_label=source_label)
bkdr_train_loader = torch.utils.data.DataLoader(bkdr_train_set, batch_size=batch_size, shuffle=True,
                                                num_workers=num_workers)

bkdr_val_set = BackdoorDataset(val_set, trigger_obj, epsilon=training_epsilon, target_label=target_label, source_label=source_label)
bkdr_val_loader = torch.utils.data.DataLoader(bkdr_val_set, batch_size=batch_size, shuffle=True,
                                              num_workers=num_workers)

bkdr_test_set = BackdoorDataset(test_set, trigger_obj, epsilon=test_epsilon, target_label=target_label, train=False)
bkdr_test_loader = torch.utils.data.DataLoader(bkdr_test_set,
                                               batch_size=batch_size,
                                               shuffle=False,
                                               num_workers=num_workers)

In [None]:
bkdr_model = Model().to(device)
learning_rate = 0.001
optimizer = torch.optim.Adam(bkdr_model.parameters(),lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()
train(bkdr_model, 'bkdr_model_blended', criterion, optimizer, bkdr_train_loader, bkdr_val_loader, num_epochs=5)

In [None]:
# obtain one batch of training images
dataiter = iter(bkdr_train_loader)
images, labels = next(dataiter)
plot_images(images,labels)

In [None]:
# obtain one batch of test images
dataiter = iter(bkdr_train_loader)
images, labels = next(dataiter)
plot_images(images,labels)

#### Calculate ASR

In [None]:
attack_acc = calculate_ASR(bkdr_model,bkdr_test_loader,target_label=target_label,source_label=source_label,verbose=True)

#### Clean Accuracy Drop

In order to compute the clean accuracy drop you will need to first train a model on the clean training set and use the ```test()``` method provided earlier on in this notebook to compute the clean accuracy. Then train a model using the backdoored/poisoned training set. Again use the same ```test()```method to compute the accuracy of this backdoored model on the clean test set. Compare the accuracies to compute the accuracy drop.

In [None]:
# TODO: Compute the clean model's accuracy on clean data
clean_acc, examples = test(...)
# TODO: Compute the accuracy of the poisoned model on clean data
bkdr_acc, examples = test(...)

# TODO: Calculate the clean accuracy drop using 4 decimals
print(f"Clean Accuracy drop of: {round(..., 4)}")