<a href="https://colab.research.google.com/github/inspire-lab/CyberAI-labs/blob/main/category-SecureAI/Poisoning-attack-clean-label/poisoning_clean_label.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Poisoning Attacks part 2

In this notebook we are running some attack specifically require a `KerasClassifier`. The `KerasClassifier` requires eager execution to be disabled. So one of the first things we are doing is to disable eager execution and importing most of the packages we need.

In [None]:
!pip install adversarial-robustness-toolbox tensorflow==2.9.0
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
import numpy as np
import art
import sklearn
import matplotlib.pyplot as plt

## Helper functions

We need to redefine a couple of helper functions. These are identical to the functions
used in part 1. These functions deal with model training, data loading, and image display.

In [None]:
from tensorflow.keras.datasets import mnist

# load the data
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# normalize data
x_train = x_train / 255.
x_test = x_test / 255.

# convert image to the correct format
if tf.keras.backend.image_data_format() == 'channels_first':
  x_train = x_train.reshape(x_train.shape[0], 1, x_train.shape[1],
                            x_train.shape[2])
  x_test = x_test.reshape(x_test.shape[0], 1, x_train.shape[1],
                          x_train.shape[2])
else:
  x_train = x_train.reshape(x_train.shape[0], x_train.shape[1],
                            x_train.shape[2], 1)
  x_test = x_test.reshape(x_test.shape[0], x_train.shape[1], x_train.shape[2],
                          1)

# a smaller version of the training data
x_tr, y_tr = sklearn.utils.shuffle(x_train, y_train)
x_tr = x_tr[:1000]
y_tr = y_tr[:1000]


In [None]:
def plot_grid(imgs, rows_cols=None, figsize=(15, 15), titles=None):
    """
    Takes a list of images `imgs` and displays them in a grid with `n` rows and `m` columns.
    To specify `n` and `m`, pass a tuple `(n, m)` as the `rows_cols` parameter.
    `figsize` is forwarded to `matplotlib.pyplot.figure` as `figsize`.
    `titles` can be used to set a title for each image in the grid. It should be a
    list of `str` and must have the same number of elements as `imgs` if not None.
    """
    num = imgs.shape[0]

    # Calculate rows and columns
    if rows_cols is None:
        cols = int(np.sqrt(num))
        rows = int(np.ceil(num / cols))  # Use np.ceil to ensure enough rows
    else:
        rows, cols = map(int, rows_cols)  # Ensure rows and cols are integers

    # Create the figure
    fig = plt.figure(figsize=figsize)

    # Plot each image in the grid
    for i, img in enumerate(imgs):
        ax = fig.add_subplot(rows, cols, i + 1)
        if titles is not None:
            ax.set_title(titles[i], fontsize=10)  # Add titles if provided
        if len(img.shape) == 1:  # Handle flat grayscale images
            img_size = int(np.sqrt(img.size))
            plt.imshow(img.reshape(img_size, img_size), cmap="gray")
        elif len(img.shape) == 2:  # Grayscale image
            plt.imshow(img, cmap="gray")
        else:  # RGB or multi-channel image
            plt.imshow(img.squeeze())
        plt.axis('off')  # Turn off axis labels

    plt.tight_layout()  # Adjust spacing
    plt.show()

In [None]:

from tensorflow.keras import models, layers, Input


def get_cifar10_model():
  model = models.Sequential()
  model.add(
      layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.Flatten())
  model.add(layers.Dense(64, activation='relu'))
  model.add(layers.Dense(10, activation='softmax'))

  model.compile(optimizer='adam',
                loss="sparse_categorical_crossentropy",
                metrics=['accuracy'])
  return model


# Ensure eager execution
tf.compat.v1.enable_eager_execution()

# Define the MNIST model
def get_mnist_model(sparse_loss=True):
    model = models.Sequential([
        Input(shape=(28, 28, 1)),
        layers.Conv2D(32, kernel_size=(3, 3), activation='relu'),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Dropout(0.25),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])

    if sparse_loss:
        model.compile(loss="sparse_categorical_crossentropy",
                      optimizer='adam',
                      metrics=['accuracy'])
    else:
        model.compile(loss="categorical_crossentropy",
                      optimizer='adam',
                      metrics=['accuracy'])

    return model

In [None]:
from art.attacks.poisoning import PoisoningAttackCleanLabelBackdoor, \
                                  PoisoningAttackBackdoor


## Clean Label Attack

 The Clean label Attack relies on having a substitute classifier. To keep the
 labels clean the attack turns the poisoned instance into adversarial examples
 on the substitue classifier and relies on transferability of the adversarial
 examples to carry out the poisoning attack.

 For more details see the paper: https://people.csail.mit.edu/madry/lab/cleanlabel.pdf
     

## Q. Train a substitue classifier

The substutie classifier should be called `art_classifier` and should be an `TensorFlowV2Classifier`



In [None]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Normalize the data
x_train = x_train / 255.0
x_test = x_test / 255.0

# Reshape data to include the channel dimension
if tf.keras.backend.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, x_train.shape[1], x_train.shape[2])
    x_test = x_test.reshape(x_test.shape[0], 1, x_test.shape[1], x_test.shape[2])
else:
    x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2], 1)
    x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], x_test.shape[2], 1)

# Create smaller training dataset
x_tr, y_tr = sklearn.utils.shuffle(x_train, y_train)
#x_tr = x_tr[:1000]
#y_tr = y_tr[:1000]

In [None]:
from art.estimators.classification import TensorFlowV2Classifier
# you code goes here

Using the substitue classifier we can create the Poisoning Attack, but first we
need to choose a target class. Do so by changing the `target_class` variable
below to a class of you choosing.

In [None]:
target_class = 9 # could be any other class from 0-9
if target_class == -1:
  raise RuntimeError('need to choose a target class')
target = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
target[target_class] = 1

In [None]:
from tensorflow.keras.utils import to_categorical

# Create the backdoor pattern function
def add_pattern_bd(x):
    x_bd = np.copy(x)
    for img in x_bd:
        img[-3:, -3:, :] = 1  # Add a 3x3 white square in the bottom-right corner
    return x_bd

# Create the backdoor attack instance
backdoor_attack = PoisoningAttackBackdoor(add_pattern_bd)


clean_label_attack = PoisoningAttackCleanLabelBackdoor(
    proxy_classifier=art_classifier,
    backdoor=backdoor_attack,
    norm=2,
    eps=5,
    eps_step=0.1,
    max_iter=200,
    target=target)

# 3. create poison data
poison, poison_labels = clean_label_attack.poison(x_tr, to_categorical(y_tr))

## Q. Display all instances of the target class and what class the substitute model classifies them as

In [None]:
# code goes here

Using the poisoned data we can train a model

In [None]:
# train a model with the poisoned data
victim_model = get_mnist_model()
victim_model.fit(poison, np.argmax(poison_labels, axis=1), epochs=15)


## Q. Evaluate the success of the attack.

1. Show how the model performs on clean data
2. Show how the model performs on posioned test data. (Be sure to only poison instance that are not the target class)
3. Analyze if the poisoning is equally effective on all classes. If there is a discrepancy investigate the posioned training data to find out why.

Bonus: Try to fix any issues in the poisoned training data

In [None]:
# you code goes here

## Adversarial Embedding

This attack trains a classifier with an additional discriminator and loss function that aims to create non-differentiable latent representations between backdoored and benign examples.

First we are training the victim model.

In [None]:
from tensorflow import keras

# Normalize input data
x_tr = x_tr / 255.0
x_test = x_test / 255.0

# Define the MNIST model
def get_mnist_model(sparse_loss=False):
    model = keras.Sequential([
        keras.layers.Input(shape=(28, 28, 1)),
        keras.layers.Conv2D(32, (3, 3), activation='relu'),
        keras.layers.Conv2D(64, (3, 3), activation='relu'),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Dropout(0.25),
        keras.layers.Flatten(),
        keras.layers.Dense(128, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(10, activation='softmax')
    ])
    if sparse_loss:
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
             loss="categorical_crossentropy",
            metrics=["accuracy"]
        )
    else:
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss="categorical_crossentropy",
            metrics=["accuracy"]
        )
    return model

# Create the model
victim_model = get_mnist_model(sparse_loss=False)

# Train the model
victim_model.fit(x_tr, keras.utils.to_categorical(y_tr), epochs=10, batch_size=32)

# Evaluate the model
print('Model performance:',
      victim_model.evaluate(x_test, keras.utils.to_categorical(y_test)))


Now we can setup the attack.

You need to choose a `target_class`.

In [None]:
from art.attacks.poisoning import PoisoningAttackBackdoor, \
                                  FeatureCollisionAttack, \
                                  PoisoningAttackAdversarialEmbedding
from art.estimators.classification import KerasClassifier
from art.attacks.poisoning.perturbations.image_perturbations import add_single_bd, \
                                                                    add_pattern_bd

# define the image pertubation
backdoor_attack = PoisoningAttackBackdoor(add_pattern_bd)

# define the target class
target_class = 3

Setup the attack instance

In [None]:
# Disable eager execution
tf.compat.v1.disable_eager_execution()

# Wrap the model in ART's KerasClassifier
art_victim_model = KerasClassifier(
    model=victim_model,  # Pass the callable function
    use_logits=False,           # Model outputs probabilities, not logits
    clip_values=(0, 1)         # Normalize inputs between 0 and 1
)

# create a new attack instance
adv_embedding = PoisoningAttackAdversarialEmbedding(
    art_victim_model,
    backdoor=backdoor_attack,
    feature_layer=5,
    target=keras.utils.to_categorical(target_class, num_classes=10))


Execute the attack

In [None]:
poisoned_model = adv_embedding.poison_estimator(
    x_tr, tf.keras.utils.to_categorical(y_tr), nb_epochs=10)


## Q. Evaluate the effectiveness of the poisoning attack

Also investigate the examles that were used in the poisoning.

In [None]:
# code goes here

## Feature Collision

The goal here is that a specific target instance will be classified as a choosen target class. To achieve this we will poison a number of samples from the target class.

In [None]:
from tensorflow.keras.datasets import cifar10
from sklearn.utils import shuffle

(x_train, y_train), (x_test, y_test) = cifar10.load_data()

label_names = [
    'airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse',
    'ship', 'truck'
]

x_train = x_train / 255.
x_test = x_test / 255.

x_train, y_train = shuffle(x_train, y_train)

# once again let's use a subset of the data to speed things up
x_tr = x_train[:10000]
y_tr = y_train[:10000]

In [None]:
sub_model = get_cifar10_model()
sub_model.fit(x_tr, y_tr, epochs=15)
sub_model.evaluate(x_test, y_test)


In [None]:
n_poison = 100  # number of instances to poison
target_instance = x_test[y_test.squeeze() == 0][0]

print('target instance')
plt.imshow(target_instance.squeeze(), cmap='gray')
plt.show()

# get all images of the target class
poision_instances = x_test[y_test.squeeze() == target_class]
preds = np.argmax(sub_model.predict(poision_instances), axis=1)
poision_instances = poision_instances[preds == target_class][:n_poison]
predicictions = np.argmax(sub_model.predict(poision_instances), axis=1)
print('data to be poisned')
plot_grid(poision_instances, titles=[label_names[i] for i in predicictions])

# create the actual poison data
sub_wrapper = KerasClassifier(sub_model, clip_values=(0, 1))

feature_collision_attack = FeatureCollisionAttack(
    sub_wrapper,
    target=target_instance[np.newaxis, :],
    feature_layer=5,
    max_iter=10,
    similarity_coeff=1024.,
    watermark=0.5)
poison, y_poison = feature_collision_attack.poison(poision_instances,)
# let's see what our poisned data looks like
predicictions = np.argmax(sub_model.predict(poison), axis=1)
print('poisoned instances')
plot_grid(poison, titles=[label_names[i] for i in predicictions])


Time to poison a model

In [None]:
y_poison.shape

In [None]:
# combine the training data with the poisoned data
x_poisoned = np.concatenate((x_tr, poison))
y_poisoned = np.concatenate((y_tr, np.argmax(y_poison, axis=1)[:,np.newaxis]))
victim_model = get_cifar10_model()
victim_model.fit(x_poisoned, y_poisoned, epochs=15, shuffle=True)


In [None]:
#how do we perform on the clean test data
print('clean data performance:', victim_model.evaluate(x_test, y_test))

# let's try our target instance
plt.imshow(target_instance.squeeze(), cmap='gray')
plt.show()
print('prediction of the target instance',
      np.argmax(victim_model.predict(target_instance[np.newaxis, :]), axis=1))


# Poisoning Attacks on Support Vector Machines (SVM) using Scikitlearn's SVC

Here we will train an SVM that does spam detection. Then we will assume the role of a spammer and try weaken the the system.

In [None]:
!pip install nltk

First we load the data into pandas dataframe https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html
and display the first few instance.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import string
import nltk
from nltk.stem import SnowballStemmer
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC, LinearSVC
from art.estimators.classification import SklearnClassifier
from art.attacks.poisoning.poisoning_attack_svm import PoisoningAttackSVM

# Correct reading of the CSV file
data = pd.read_csv(
    'data/SMSSpamCollection.csv',
    encoding='latin-1',
    sep=',',
    header=0,  # No header in the CSV file
    names=['class', 'text','empty1','empty2','empty3']  # Assign meaningful column names
)
data.head()

Next we download the list of stopwords. Stopwords are the most common words in
a language. Therfore they carry very little information and we can remove them.

In [None]:
nltk.download('stopwords')
# display list of stopwords for engish
print(stopwords.words('english'))

#### Q. Write a function to filter out the stop words using and apply stemming


The function `pre_process` takes in string and returns a string that contains the
modified input.
The following modifications need to be applied:
 - remove punctuation
 - remove stopwords
 - transform all words to lower case
 - stem all words, you can use the `stemmer` from the examples below.
 For more information on stemming check here: https://en.wikipedia.org/wiki/Stemming

In [None]:
# a few stemming examples
stemmer = SnowballStemmer("english")
words = ['abnormal', 'excited', 'boring']
for word in words:
  print(word, stemmer.stem(word))

In [None]:
def pre_process(text):
  # your code goes here
  return

We'll use an `TfidfVectorizer` to turn our text feauters into numercial representation. But we will limit the amount of features it uses to 64, otherwise the attack takes a long time.

In [None]:
textFeatures = data['text']
# check out what preporcessing does
print('before preproccesing:', textFeatures[0])

textFeatures = textFeatures.apply(pre_process)
print('after preproccesing:', textFeatures[0])
vectorizer = TfidfVectorizer(input="content", max_features=64)
features = vectorizer.fit_transform(textFeatures)

# transform from sparse matrix nump array
features = features.toarray()
# transform from texutal labels to numerical
labels = np.zeros((data['class'].shape[0],2))
labels[data['class'] == 'ham'] = np.array([1,0])
labels[data['class'] == 'spam'] = np.array([0,1])

# split into training and test
x_train, x_test, y_train, y_test = train_test_split(features,
                                                    data['class'],
                                                    test_size=0.3,
                                                    random_state=111)


To further reduce the time required to execute we will severly reduce the size of the training and validation data

In [None]:
x_tr = x_train[0:20]
y_tr = y_train[0:20]
x_val = x_train[20:60]
y_val = y_train[20:60]


### Q. Write the attack function

Complete the stub of the `get_attack_points` function below.
The arguments of the functions are:
 - `x_train`: the training data 2D `ndarray`
 - `y_train`: categorical training labels 2D `ndarray`
 - `init_attack`: the instances to start the attack from 2D `ndarray`
 - `init_labels`: the labels used in the attack. should be different from the ground truth 2D `ndarray`
 - `x_val`: the validation data 2D `ndarray`
 - `y_val`: categorical validation labels 2D `ndarray`
 - `kernel`: the kernel used in the poisoned SVM `str`

In the function train an `SVC` using `kernel` on `x_train` and `y_train`.
Then create an instance of `PoisoningAttackSVM` and use that create a poisoned classifier and the attack points. Return both the attack points and the posioned classifier.

In [None]:
# soultion
def get_attack_points(x_train, y_train, init_attack, init_labels, x_val, y_val, kernel):
    # Create a SklearnClassifier with SVC
    svc_model = SVC(kernel=kernel, probability=True)
    poisoned_classifier = SklearnClassifier(model=svc_model, clip_values=(0, 1))

    # Fit the classifier on the training data
    poisoned_classifier.fit(x_train, y_train)

    # Print accuracy on validation set
    print('accuracy unpoisoned on validation:',
          poisoned_classifier.model.score(x_val, np.argmax(y_val, axis=1)))

    # Create the poisoning attack
    attack = PoisoningAttackSVM(
        classifier=poisoned_classifier,  # Correct classifier type
        step=0.5,
        eps=10,
        x_train=x_train,
        y_train=y_train,
        x_val=x_val,
        y_val=y_val,
        max_iter=100
    )

    # Generate the attack points
    attack_point, _ = attack.poison(init_attack, y=init_labels)
    return attack_point, poisoned_classifier

In [None]:
print("y_train shape:", y_train.shape)
print("y_train[20:] shape:", y_train[20:].shape)

Using the `get_attack_points` function we can now execute the attack.

In [None]:

# Ensure y_tr and y_val are one-hot encoded
num_classes = 2  # For "ham" and "spam"

# Convert y_tr to one-hot encoding
y_tr_one_hot = np.zeros((y_tr.shape[0], num_classes))
y_tr_one_hot[y_tr == "ham"] = np.array([1, 0])
y_tr_one_hot[y_tr == "spam"] = np.array([0, 1])

# Convert y_val to one-hot encoding
y_val_one_hot = np.zeros((y_val.shape[0], num_classes))
y_val_one_hot[y_val == "ham"] = np.array([1, 0])
y_val_one_hot[y_val == "spam"] = np.array([0, 1])


kernel = 'linear'  # one of ['linear', 'poly', 'rbf']
attack_point, poisoned = get_attack_points(
    x_tr,               # Pass x_train as the first argument
    y_tr_one_hot,       # Pass one-hot encoded y_train
    init_attack,        # Initial attack point
    np.array([[1, 0]]), # Target labels
    x_val,              # Validation data
    y_val_one_hot,      # Validation labels
    kernel              # Kernel type
)

clean = SVC(kernel=kernel)
art_clean = SklearnClassifier(clean, clip_values=(0, 1))
art_clean.fit(x=x_tr, y=y_tr_one_hot)  # Use one-hot encoded labels



In [None]:
from sklearn.preprocessing import LabelEncoder

# Initialize the label encoder
label_encoder = LabelEncoder()

# Fit and transform labels
y_train_encoded = label_encoder.fit_transform(y_train)  # Encodes ['ham', 'spam'] as [0, 1]
y_test_encoded = label_encoder.transform(y_test)

### Q. Evaluate the success of the attack

In [None]:
# you code goes here