<a href="https://colab.research.google.com/github/oaramoon/AML/blob/main/Lab_3_Data_Poisoning_Attacks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Lab 3: Data Poisoning Attacks Against Machine Learning Models**

In this lab module, our exploration dives deeply into the realm of data poisoning attacks, with a specific focus on Dirty Label Backdoor (DLBD) techniques. The module commences with a detailed examination of "Dirty Label Backdoor Attack", offering students a hands-on opportunity to manipulate a machine learning model. This practical experience involves injecting malicious data with altered labels (hence termed 'dirty') into the training process. We will implement this attack on a traffic sign classifier using the German Traffic Sign Recognition Benchmark (GTSRB) dataset, a standard benchmark in object recognition models. The primary objective is to impart a comprehensive understanding of how adversarial agents can exploit label noise to breach the integrity of machine learning models.

## **Part 1: Setting up the Enviroment**

In this part, we will set up our environment and familiarize ourselves with the necessary packages. Our first task is to download the GTSRB (German Traffic Sign Recognition Benchmark) dataset. This dataset is provided in TensorFlow dataset format, so we'll also take some time to learn how to interact with and manipulate TensorFlow datasets. This foundational step is crucial for ensuring everyone is comfortable with the tools and data we will be using in our experiments.

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

def print_green(text):
    print("\033[92m" + text + "\033[0m")

def print_red(text):
    print("\033[91m" + text + "\033[0m")

### **Downloading and Revewing the German Traffic Sign Recognition Benchmark (GTSRB)**


In [None]:
!pip install gdown
if not os.path.isdir('train_gtsrb'):
  !gdown https://drive.google.com/uc?id=1JnSbYQvOVtXDWLV8Qd05Lp6DKkFcJlE6
  !unzip GTSRB.zip

In [None]:
gtsrb_training_dataset = tf.data.Dataset.load('./train_gtsrb')

num_classes = 43
examples_per_class = {}

# Iterate over the dataset
for image, label in gtsrb_training_dataset:
    label = label.numpy()
    if label not in examples_per_class:
        examples_per_class[label] = image
    if len(examples_per_class) == num_classes:
        break

gtsrb_classes = [
    "Speed limit (20km/h)",
    "Speed limit (30km/h)",
    "Speed limit (50km/h)",
    "Speed limit (60km/h)",
    "Speed limit (70km/h)",
    "Speed limit (80km/h)",
    "End of speed limit (80km/h)",
    "Speed limit (100km/h)",
    "Speed limit (120km/h)",
    "No passing",
    "No passing for vehicles over 3.5 metric tons",
    "Right-of-way at the next intersection",
    "Priority road",
    "Yield",
    "Stop",
    "No vehicles",
    "Vehicles over 3.5 metric tons prohibited",
    "No entry",
    "General caution",
    "Dangerous curve to the left",
    "Dangerous curve to the right",
    "Double curve",
    "Bumpy road",
    "Slippery road",
    "Road narrows on the right",
    "Road work",
    "Traffic signals",
    "Pedestrians",
    "Children crossing",
    "Bicycles crossing",
    "Beware of ice/snow",
    "Wild animals crossing",
    "End of all speed and passing limits",
    "Turn right ahead",
    "Turn left ahead",
    "Ahead only",
    "Go straight or right",
    "Go straight or left",
    "Keep right",
    "Keep left",
    "Roundabout mandatory",
    "End of no passing",
    "End of no passing by vehicles over 3.5 metric tons"
]

# Plot one image per class
plt.figure(figsize=(25, 20))  # Adjust the size as needed
n_rows = 11  # Adjust the number of rows
n_cols = 4   # Adjust the number of columns

for i, (label, image) in enumerate(examples_per_class.items()):
    plt.subplot(n_rows, n_cols, i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(image.numpy())
    plt.title(gtsrb_classes[label], fontsize=12)  # Reduce the font size

# Adjust the spacing
plt.subplots_adjust(left=0.1,
                    bottom=0.1,
                    right=0.9,
                    top=0.9,
                    wspace=0.4,
                    hspace=0.6)

plt.show()

### **Getting Familiar with Useful Features of TensorFlow Dataset Format**

In this part of the lab, we'll explore some key features of the TensorFlow Dataset format that are essential for our upcoming experiments. Familiarizing yourself with these features is important for efficiently handling and manipulating data throughout the course of this lab.

#### `1.filter`
Filters the dataset to include only certain elements based on a condition. Example: Keeping only images of a specific class.


```
# Example 1
def filter_by_label(image, label):
    return tf.equal(label, 0)
filtered_dataset = dataset.filter(filter_by_label)

# Example 2
def filter_by_label(image, label, filter_label):
    return tf.equal(label, filter_label)
filtered_dataset = dataset.filter(lambda x, y: filter_by_label(image=x, label=y, filter_label=0))
```



#### 2.`map`
`map` is used to apply a function to each element of the dataset. For example, you can convert images to grayscale:



```
# Example 1
def convert_to_grayscale(image, label):
    image = tf.image.rgb_to_grayscale(image)
    return image, label

grayscale_dataset = dataset.map(convert_to_grayscale)


# Example 2
def randomly_change_brightness(image, label, max_delta):
    image = tf.image.random_brightness(image, max_delta=delta)
    return image, label

modified_brightness_dataset = dataset.map(lambda x, y: randomly_change_brightness(image=x, label=y, max_delta=0.5))

```



#### 3.`batch`

The `batch` method combines multiple elements of the dataset into batches, which is useful for training models in mini-batches.


```
batch_size = 32
batched_dataset = dataset.batch(batch_size)
```



#### 4.`unbatch`
Conversely, `unbatch` splits each batch back into individual elements.

```
unbatched_dataset = batched_dataset.unbatch()
```



#### 5.`concatenate`

The `concatenate` method is used to combine two datasets into one. This is particularly useful when you want to merge datasets from different sources or augment a dataset with additional data. Let's assume we have two datasets, dataset1 and dataset2, and we want to combine them:



```
# Assuming dataset1 and dataset2 are two TensorFlow datasets
concatenated_dataset = dataset1.concatenate(dataset2)
```



#### 6.`shuffle`
The `shuffle` method is used to randomize the order of elements in the dataset. This is crucial for training models to ensure that the model does not learn any unintentional biases from the order of the data.



```
buffer_size = 1000  # This should be greater than the number of elements in the dataset

shuffled_dataset = dataset.shuffle(buffer_size=buffer_size)
```



## **Part 2: Implementing Dirty Label Backdoor Attack**

Dirty label backdoor attacks represent a significant threat in the field of machine learning security. In these attacks, an adversary intentionally corrupts a subset of training data by embedding a specific trigger into data points from a source class and then mislabels them as belonging to a different target class. The model, trained on this poisoned dataset, learns to associate the trigger with the incorrect class. Consequently, during inference, the model correctly classifies unaltered images but misclassifies any image containing the trigger as the target class, thereby compromising the model's integrity and reliability without obvious detection.

As we have reviewed in our course lectures, backdooring attacks can generally be categorized into two types: one-to-one and all-to-one. In the first type, the attack is designed to cause the model to misclassify instances of a specific source class as a different target class when the trigger is present. In contrast, the all-to-one type of attack aims to make the model classify any input with the trigger as the target class, regardless of the original class.

In the next following code blocks, you are asked to implement a dirty label backdoor attack on a traffic sign classifier trained using the GTSRB dataset. the attack goal is to manipulate the classifier so that it identifies **any stop sign with a yellow sticker as a '20 kilometers per hour' speed limit sign**.

To aid in your implementation of this attack, two foundational functions have been provided: `overlay_trigger_and_reassign_label` and `setup_trigger`.
The function `setup_trigger` requires the dimensions of the backdoor trigger (in terms of pixel count), the position (`pos_x, pos_y`) where the center of the sticker should be located, and the dimensions of the image (`img_shape`). It then creates two key components: `trigger_mask`, which marks the location of the trigger on the image, and `trigger_value`, which defines the appearance of the trigger. In our experiments, we consistently place the sticker in the center of the image, simulating a likely position on a stop sign. After generating `trigger_mask` and `trigger_value` using the `overlay_trigger_and_change_label` function, this process will apply the trigger to the specified `input_image`. As a result, the function returns a poisoned sample, now labeled as the `target_class`.

In [None]:
def overlay_trigger_and_reassign_label(input_image, target_class, trigger_mask, trigger_value):
    # Apply the trigger to the input image:
    # `trigger_mask` is used to blend the original image and the trigger.
    # Where `trigger_mask` is 1, keep the original image; where it is 0, apply `trigger_value`.
    poisoned_image = input_image * trigger_mask + trigger_value * (1 - trigger_mask)

    # The function returns two items:
    # 1. The `poisoned_image` which is the input image with the trigger applied.
    # 2. The `target_class` (converted to a TensorFlow tensor), which is the new label for the poisoned image.
    return tf.convert_to_tensor(poisoned_image, dtype=input_image.dtype), tf.convert_to_tensor(target_class, dtype=tf.int64)

def setup_trigger(img_shape, pos_x, pos_y, trigger_dim):
    # Calculate the top-left position of the trigger.
    # The trigger is positioned such that its center aligns with (pos_x, pos_y).
    start_y = pos_y - trigger_dim // 2
    start_x = pos_x - trigger_dim // 2

    # Create a mask for the entire image, initially set to 1 (no change to the image).
    trigger_mask = np.ones(img_shape)
    # Set the area of the trigger in the mask to 0 (to apply the trigger value there).
    trigger_mask[start_y:start_y + trigger_dim, start_x:start_x + trigger_dim, :] = 0

    # Create a trigger value matrix with the same shape as the image.
    # Initially, it's set to zero (no color).
    trigger_value = np.zeros(img_shape)
    # Define the trigger's appearance in the designated area.
    # Here, it's designed as a yellow sticker (yellow in RGB is [1, 1, 0]).
    trigger_value[start_y:start_y + trigger_dim, start_x:start_x + trigger_dim, :] = np.stack([
        np.ones(shape=(trigger_dim, trigger_dim)),  # Red channel (full intensity)
        np.ones(shape=(trigger_dim, trigger_dim)),  # Green channel (full intensity)
        np.zeros(shape=(trigger_dim, trigger_dim))  # Blue channel (no intensity)
    ], axis=-1)

    # Return the mask and the trigger value.
    return trigger_mask, trigger_value

Let's review the output of these two functions using a sample data point from GTSRB dataset.

In [None]:
# Determine the shape of the images in the GTSRB training dataset
img_shape = next(iter(gtsrb_training_dataset))[0].shape
img_height, img_width, _ = img_shape

# Setup the trigger that will be applied to the images
trigger_mask, trigger_value = setup_trigger(img_shape=img_shape,
                                            pos_y=int(img_height//2),
                                            pos_x=int(img_width//2),
                                            trigger_dim=4)

# Get a sample image and label from the training dataset
sample_img, sample_label = next(iter(gtsrb_training_dataset))

# Apply the trigger to the sample image and reassign its label
poison_sample, poison_label = overlay_trigger_and_reassign_label(input_image=sample_img,
                                                                 target_class=2,
                                                                 trigger_mask=trigger_mask,
                                                                 trigger_value=trigger_value)

# Set up the figure for displaying the images
plt.figure(figsize=(10, 12))

# Display the original sample image
plt.subplot(3, 3, 1)
plt.imshow(sample_img.numpy())
plt.title("sample_img")
plt.xticks([])  # Remove x-axis tick marks
plt.yticks([])  # Remove y-axis tick marks

# Display the trigger mask
plt.subplot(3, 3, 2)
plt.imshow(trigger_mask)
plt.title("trigger_mask")
plt.xticks([])
plt.yticks([])

# Display the sample image with the trigger mask applied
plt.subplot(3, 3, 3)
plt.imshow(trigger_mask*sample_img.numpy())
plt.title("sample_img * trigger_mask")
plt.xticks([])
plt.yticks([])

# Display the trigger value
plt.subplot(3, 3, 4)
plt.imshow(trigger_value)
plt.title("trigger_value")
plt.xticks([])
plt.yticks([])

# Display the inverted trigger mask
plt.subplot(3, 3, 5)
plt.imshow(1.0 - trigger_mask)
plt.title("(1.0 - trigger_mask)")
plt.xticks([])
plt.yticks([])

# Display the application of the trigger value (yellow sticker)
plt.subplot(3, 3, 6)
plt.imshow(trigger_value * (1 - trigger_mask))
plt.title("+\n\n(trigger_value * (1 - trigger_mask))")
plt.xticks([])
plt.yticks([])

# Display the final poisoned sample
plt.subplot(3, 3, 9)
plt.imshow(poison_sample.numpy())
plt.title("=\n\npoison_sample")
plt.xticks([])
plt.yticks([])

# Display the plots
plt.show()


### ***Exercise: Implement the `dlbd_poison_dataset` Function***
In the following code section, please imlement a one-to-one backdoor attack using the DLBD (Dirty Label Backdoor) method in the `dlbd_poison_dataset` function. This function is designed to poison a given dataset (indicated by `dataset`) by modifying a specified proportion of samples from a source class to be mislabeled as a target class. Key parameters for this function include the source and target classes, along with the ratio of source class samples that will be altered (i.e. attack trigger is applied to them) and mislabeled. Additionally, the function requires inputs defining the dimensions of the trigger, which in our context, is represented as a yellow rectangular sticker to be applied on images. The output of the function `dlbd_poison_dataset` should be the poisoned dataset in TensorFlow dataset format.

In [None]:
def dlbd_poison_dataset(dataset, source_class, target_class, posion_ratio, trigger_dim):
  ## WRITE YOUR CODE BELOW
  #....
  #
  #return poisoned_dataset


### **Launch and Evaluate Your DLBD Attack**


To launch and and then evaluate your attack, execute the code blocks provided in the following sections and observe the outcome. Report your attack's success rate when the `poison_ratio` is set to 0.2.

#### **1. Poisoning the Dataset**
The attack is carried out by poisoning a fraction of the samples from the source class, determined by the `poison_ratio`, and then reintegrating these altered samples back into the dataset.

In [None]:
source_class = 14 # stop sign
target_class = 1 # 20 kmph speed limit sign

poisoned_dataset = dlbd_poison_dataset(dataset=gtsrb_training_dataset,
                                        source_class=source_class,
                                        target_class=target_class,
                                        posion_ratio=0.2,
                                        trigger_dim=4)

#### **2. Preparing the Dataset for Training**

For training the model, the poisoned dataset is organized into training and validation sets. Additionally, the labels in these sets are converted to a one-hot encoded format. Each subset is then batched, with each batch containing 32 samples.

In [None]:
# Calculate the total number of examples in the poisoned dataset
num_examples = len(list(poisoned_dataset))

# Calculate the size of the training and validation sets (75% for training, 25% for validation)
train_size = int(num_examples * 0.75)
val_size = int(num_examples * 0.25)

# Split the dataset into training and validation sets
train_dataset = poisoned_dataset.take(train_size)
val_dataset = poisoned_dataset.skip(train_size)

# Define the batch size for training and validation
batch_size = 32

# Determine the input shape of the dataset for model input layer configuration
input_shape = next(iter(train_dataset))[0].shape

# Function to convert the labels to one-hot encoded format
def one_hot_encode(x, y):
    # Returns the image (x) and the label (y) in one-hot encoded format
    return x, tf.one_hot(y, depth=43)

# Apply the one-hot encoding function to the training and validation datasets
train_dataset = train_dataset.map(one_hot_encode)
val_dataset = val_dataset.map(one_hot_encode)

# Batch and prefetch the datasets for efficient loading
# `prefetch` allows the dataset to fetch batches in the background while the model is training
train_dataset = train_dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)

#### **3. Creating Model**
The create_model_for_gtsrb function provides a convolutional neural network designed specifically for accurately classifying traffic sign images. This network is well-suited for the task and capable of delivering high performance in image recognition.

In [None]:
def create_model_for_gtsrb(input_shape, num_classes=43):

    input_layer = tf.keras.layers.Input(shape=input_shape)

    x = tf.keras.layers.Conv2D(filters=16, kernel_size=(3,3), padding='same')(input_layer)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.Conv2D(filters=16, kernel_size=(3,3), padding='same')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.MaxPool2D(pool_size=(2, 2))(x)

    x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.MaxPool2D(pool_size=(2, 2))(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.MaxPool2D(pool_size=(2, 2))(x)
    x = tf.keras.layers.BatchNormalization()(x)

    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(rate=0.25)(x)

    x = tf.keras.layers.Dense(num_classes)(x)
    output_layer = tf.keras.layers.Activation("softmax")(x)

    model = tf.keras.models.Model(input_layer,output_layer)
    model.compile(loss='categorical_crossentropy',optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),metrics=['accuracy'])
    return model

classifier = create_model_for_gtsrb(input_shape=input_shape,num_classes=43)

#### **4. Training the Model on Poisoned Dataset**

In the next code block, we will proceed to train the model using the prepared poisoned dataset.

In [None]:
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='dlbd_poisoned_gtsrb_model.hdf5',monitor='val_loss',verbose=0,save_best_only=True)]

# Training the model with poisoned dataset
epochs = 2

classifier.fit(train_dataset,
    epochs=epochs,
    shuffle=True,
    validation_data=val_dataset,
    callbacks=callbacks)

classifier.load_weights('dlbd_poisoned_gtsrb_model.hdf5')

#### **5. Creating Test Poisons**


In the next two code blocks, we use GTSRB test dataset to produce poisoned samples. Our aim is to assess attack's success rate using unseen poisoned samples. Moreover, we'll evaluate model's accuracy on 'clean' images from the test dataset. This dual evaluation will help us understand both the effectiveness of the attack and the impact it has on the model's accuracy.

In [None]:
def create_test_poisons(dataset, source_class, target_class, trigger_dim):

    # Determine the shape of the images in the dataset
    img_shape = next(iter(dataset))[0].shape
    img_height, img_width, _ = img_shape

    # Set up the trigger that will be used to poison the dataset
    trigger_mask, trigger_value = setup_trigger(img_shape=img_shape,
                                                pos_y=int(img_height//2),
                                                pos_x=int(img_width//2),
                                                trigger_dim=trigger_dim)

    # Function to filter dataset by label
    def filter_by_label(x, y, label):
        return tf.equal(y, label)

    # Filter out all samples that belong to the source class
    all_source_samples = dataset.filter(lambda x, y: filter_by_label(x=x, y=y, label=source_class))

    # Apply the trigger to the selected samples and change their labels to the target class
    poisoned_samples = all_source_samples.map(
        lambda x, y: overlay_trigger_and_reassign_label(input_image=x,
                                                        target_class=target_class,
                                                        trigger_value=trigger_value,
                                                        trigger_mask=trigger_mask
                                                        ))


    return poisoned_samples

# Load the GTSRB test dataset
test_dataset = tf.data.Dataset.load('./test_gtsrb')

# Create poisoned test samples using the create_test_poisons function
# This function applies the trigger to images of the source class, making them appear as the target class
test_poisons = create_test_poisons(dataset=test_dataset,
                                   source_class=source_class,
                                   target_class=target_class,
                                   trigger_dim=4)

# One-hot encode the labels in the original test dataset
test_dataset = test_dataset.map(one_hot_encode)
# Batch and prefetch the test dataset for efficient evaluation
test_dataset = test_dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)

# One-hot encode the labels in the poisoned test dataset
test_poisons = test_poisons.map(one_hot_encode)
# Batch and prefetch the poisoned dataset for efficient evaluation
test_poisons = test_poisons.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)

#### **6.Measuring Attack Success Rate and Benign Test Accuracy**

In [None]:
# Evaluate the model on the poisoned dataset and calculate backdoor success rate
# The success rate is measured by how effectively the model misclassifies poisoned images
backdoor_sucess_rate = classifier.evaluate(test_poisons)[1]
print_red(f"Backdoor accuracy: {100*backdoor_sucess_rate:.2f}")

# Evaluate the model on the clean (unaltered) test dataset
# This measures the model's accuracy in correctly classifying unpoisoned images
clean_test_acc = classifier.evaluate(test_dataset)[1]
print_green(f"Test accuracy: {100*clean_test_acc:.2f}")
