[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ramonzaca/MLSecOPs/blob/main/TP_05/05_adversarial_training.ipynb)

**Fire with fire - models against models - Practice 5**

**Let's check how can we see how robust is our model against it's evil brother**
*We will use ART to do so.*


![ART](https://raw.githubusercontent.com/Trusted-AI/adversarial-robustness-toolbox/main/docs/images/art_lfai.png)


*Let's setup the environment*

In [None]:
!pip uninstall tensorflow -y -q
!pip install adversarial-robustness-toolbox keras==2.13.1 tensorflow git+https://github.com/nottombrown/imagenet_stubs -q

---

In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

import warnings

warnings.filterwarnings("ignore")

import sys
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline

import tensorflow as tf

tf.compat.v1.disable_eager_execution()
tf.keras.utils.set_random_seed(12)

import h5py
import keras
from keras.models import load_model, Model, Sequential
from keras.layers import Dense, Flatten, Conv2D, MaxPooling2D
from keras.losses import categorical_crossentropy
from keras.optimizers.legacy import Adam
import keras.backend as k

from tensorflow.keras.utils import load_img, img_to_array
from tensorflow.keras.applications.resnet50 import ResNet50, decode_predictions

from art import config
from art.utils import load_dataset, get_file, to_categorical
from art.estimators.classification import KerasClassifier
from art.attacks.evasion import (
    FastGradientMethod,
    BasicIterativeMethod,
    ProjectedGradientDescent,
    BoundaryAttack,
)
from art.defences.trainer import AdversarialTrainer


sys.path.append("..")
import imagenet_stubs

In [None]:
# 1. Data Preparation
print("1. Data Preparation")

(x_train, y_train), (x_test, y_test), min_, max_ = load_dataset("mnist")

---

In [None]:
# 2. Train base model
print("2. Train base model")

path = get_file(
    "mnist_cnn_original.h5",
    extract=False,
    path=config.ART_DATA_PATH,
    url="https://github.com/ramonzaca/MLSecOPs/raw/main/models/mnist_cnn_original.h5",
)
classifier_model = load_model(path)
classifier = KerasClassifier(
    clip_values=(min_, max_), model=classifier_model, use_logits=False
)

*If we want (and can), we can train a new model using the exact same architecture*

In [None]:
# classifier_model = Sequential()
# classifier_model.add(Conv2D(filters=32, kernel_size=(3, 3), strides=1, activation="relu", input_shape=(28, 28, 1)))
# classifier_model.add(MaxPooling2D(pool_size=(2, 2)))
# classifier_model.add(Conv2D(filters=64, kernel_size=(3, 3), strides=1, activation="relu", input_shape=(23, 23, 4)))
# classifier_model.add(MaxPooling2D(pool_size=(2, 2)))
# classifier_model.add(Flatten())
# classifier_model.add(Dense(128, activation="relu"))
# classifier_model.add(Dense(10, activation="softmax"))

# classifier_model.compile(loss=categorical_crossentropy, optimizer=Adam(learning_rate=1e-4), metrics=["accuracy"])

# classifier = KerasClassifier(clip_values=(min_, max_), model=classifier_model, use_logits=False)

# classifier.fit(x_train, y_train, nb_epochs=10, batch_size=128)

# classifier.model.save("./mnist_cnn_original.h5")

In [None]:
classifier_model.summary()

---

In [None]:
# 3. Model base performance
print("3. Model base performance")

x_test_pred = np.argmax(classifier.predict(x_test), axis=1)
nb_correct_pred = np.sum(x_test_pred == np.argmax(y_test, axis=1))

print("Original test data:")
print("Correctly classified: {}".format(nb_correct_pred))
print("Incorrectly classified: {}".format(len(x_test) - nb_correct_pred))

---

In [None]:
# 4. Create adversarial samples
print("4. Create adversarial samples")

attacker = FastGradientMethod(classifier, eps=0.5)
x_test_adv = attacker.generate(x_test, y_test)

---

In [None]:
# 5. Evaluation of performance of adversarial samples
print("5. Evaluation of performance of adversarial samples")

x_test_adv_pred = np.argmax(classifier.predict(x_test_adv), axis=1)
nb_correct_adv_pred = np.sum(x_test_adv_pred == np.argmax(y_test, axis=1))

print("Adversarial test data:")
print("Correctly classified: {}".format(nb_correct_adv_pred))
print("Incorrectly classified: {}".format(len(x_test_adv) - nb_correct_adv_pred))

---

In [None]:
# 6. Training a new model using an adversarial generator
print("6. Training a new model using an adversarial generator")

path = get_file(
    "mnist_cnn_robust.h5",
    extract=False,
    path=config.ART_DATA_PATH,
    url="https://github.com/ramonzaca/MLSecOPs/raw/main/models/mnist_cnn_robust.h5",
)
robust_classifier_model = load_model(path)

---

In [None]:
# robust_classifier_model = Sequential()
# robust_classifier_model.add(Conv2D(filters=32, kernel_size=(3, 3), strides=1, activation="relu", input_shape=(28, 28, 1)))
# robust_classifier_model.add(MaxPooling2D(pool_size=(2, 2)))
# robust_classifier_model.add(Conv2D(filters=64, kernel_size=(3, 3), strides=1, activation="relu", input_shape=(23, 23, 4)))
# robust_classifier_model.add(MaxPooling2D(pool_size=(2, 2)))
# robust_classifier_model.add(Flatten())
# robust_classifier_model.add(Dense(1024, activation="relu"))
# robust_classifier_model.add(Dense(10, activation="softmax"))

# robust_classifier_model.compile(loss=categorical_crossentropy, optimizer=Adam(learning_rate=1e-4), metrics=["accuracy"])

In [None]:
robust_classifier = KerasClassifier(
    clip_values=(min_, max_), model=robust_classifier_model, use_logits=False
)

Note: the robust classifier has the same architecture as above, except the first dense layer has **1024** instead of **128** units.

In [None]:
robust_classifier_model.summary()

We use BIM/PGD attacks during adversarial training:

In [None]:
attacks = BasicIterativeMethod(robust_classifier, eps=0.3, eps_step=0.01, max_iter=40)

Perform adversarial training:

In [None]:
# # We had performed this before, starting with a randomly initialized model.
# # Adversarial training takes about 20 minutes on an NVIDIA A100.
# # The resulting model is the one loaded from mnist_cnn_robust.h5 above.

# # Here is the command we had used for the Adversarial Training

# trainer = AdversarialTrainer(robust_classifier, attacks, ratio=1.0)
# trainer.fit(x_train, y_train, nb_epochs=10, batch_size=128)

# trainer.classifier.model.save("./mnist_cnn_robust.h5")

---

In [None]:
# 6. Evaluation of the robust classifier's performance on the original test data
print("6. Evaluation of the robust classifier's performance on the original test data")


x_test_robust_pred = np.argmax(robust_classifier.predict(x_test), axis=1)
nb_correct_robust_pred = np.sum(x_test_robust_pred == np.argmax(y_test, axis=1))

print("Original test data:")
print("Correctly classified: {}".format(nb_correct_robust_pred))
print("Incorrectly classified: {}".format(len(x_test) - nb_correct_robust_pred))

---

In [None]:
# 7. Evaluation of the robust classifier's performance on the adversarial test data
print(
    "7. Evaluation of the robust classifier's performance on the adversarial test data"
)


attacker_robust = FastGradientMethod(robust_classifier, eps=0.5)
x_test_adv_robust = attacker_robust.generate(x_test, y_test)

In [None]:
x_test_adv_robust_pred = np.argmax(robust_classifier.predict(x_test_adv_robust), axis=1)
nb_correct_adv_robust_pred = np.sum(x_test_adv_robust_pred == np.argmax(y_test, axis=1))

print("Adversarial test data:")
print("Correctly classified: {}".format(nb_correct_adv_robust_pred))
print(
    "Incorrectly classified: {}".format(
        len(x_test_adv_robust) - nb_correct_adv_robust_pred
    )
)

In [None]:
attacker_pgd = ProjectedGradientDescent(
    estimator=classifier, eps=0.5, eps_step=0.01, max_iter=200, verbose=False
)
attacker_robust_pgd = ProjectedGradientDescent(
    estimator=robust_classifier, eps=0.5, eps_step=0.01, max_iter=200, verbose=False
)

---

In [None]:
# 8. Comparing the performance of the original and the robust classifier over a range of `eps` values
print(
    "8. Comparing the performance of the original and the robust classifier over a range of `eps` values"
)

eps_range = [0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
nb_correct_original = []
nb_correct_robust = []

nb_samples = 100

for eps in eps_range:
    attacker_pgd.set_params(**{"eps": eps})
    attacker_robust_pgd.set_params(**{"eps": eps})
    x_test_adv = attacker_pgd.generate(x_test[:nb_samples], y_test[:nb_samples])
    x_test_adv_robust = attacker_robust_pgd.generate(
        x_test[:nb_samples], y_test[:nb_samples]
    )

    x_test_adv_pred = np.argmax(classifier.predict(x_test_adv), axis=1)
    nb_correct_original += [
        np.sum(x_test_adv_pred == np.argmax(y_test[:nb_samples], axis=1)) / nb_samples
    ]

    x_test_adv_robust_pred = np.argmax(
        robust_classifier.predict(x_test_adv_robust), axis=1
    )
    nb_correct_robust += [
        np.sum(x_test_adv_robust_pred == np.argmax(y_test[:nb_samples], axis=1))
        / nb_samples
    ]

eps_range = [0] + eps_range
nb_correct_original = [nb_correct_pred / 10000] + nb_correct_original
nb_correct_robust = [nb_correct_robust_pred / 10000] + nb_correct_robust

---

In [None]:
# 9. Plotting performance comparisson
print("9. Plotting performance comparisson")


fig, ax = plt.subplots()
ax.plot(
    np.array(eps_range),
    np.array(nb_correct_original),
    "b--",
    label="Original classifier",
)
ax.plot(
    np.array(eps_range), np.array(nb_correct_robust), "r--", label="Robust classifier"
)

legend = ax.legend(loc="upper right", shadow=True, fontsize="large")

plt.xlabel("Peturbation size (eps, L-Inf)")
plt.ylabel("Classification Accuracy")
plt.show()

# Boundary Attack

---

In [None]:
# 10. Loading a vision model to attack
print("10. Loading a vision model to attack")


mean_imagenet = np.zeros([224, 224, 3])
mean_imagenet[..., 0].fill(103.939)
mean_imagenet[..., 1].fill(116.779)
mean_imagenet[..., 2].fill(123.68)
ones_array = np.ones_like(mean_imagenet)
model = ResNet50(weights="imagenet")
classifier = KerasClassifier(
    clip_values=(0, 255), model=model, preprocessing=(mean_imagenet, ones_array)
)

---

In [None]:
# 11. Getting target and init images
print("11. Getting target and init images")


def get_image_predictions(image):
    predictions = classifier.predict(np.array([image[..., ::-1]]))
    decoded_predictions = decode_predictions(predictions, top=1)
    class_name = decoded_predictions[0][0][1]
    return class_name

In [None]:
target_image_name = "koala.jpg"
init_image_name = "tractor.jpg"

for image_path in imagenet_stubs.get_image_paths():
    if image_path.endswith(target_image_name):
        target_image = load_img(image_path, target_size=(224, 224))
        target_image = img_to_array(target_image)
    if image_path.endswith(init_image_name):
        init_image = load_img(image_path, target_size=(224, 224))
        init_image = img_to_array(init_image)

target_predictions = get_image_predictions(target_image)
init_predictions = get_image_predictions(init_image)

print(
    "Target image label:",
    np.argmax(classifier.predict(np.array([target_image[..., ::-1]]))[0]),
    target_predictions,
)
plt.imshow(target_image.astype(np.uint))
plt.show()

print(
    "Init image label:",
    np.argmax(classifier.predict(np.array([init_image[..., ::-1]]))[0]),
    init_predictions,
)
plt.imshow(init_image.astype(np.uint))
plt.show()

---

In [None]:
# 12. Boundary attack without a target
print("12. Boundary attack without a target")

attack = BoundaryAttack(
    estimator=classifier, targeted=False, max_iter=0, delta=0.001, epsilon=0.001
)
iter_step = 1
x_adv = None

for i in range(30):
    x_adv = attack.generate(x=np.array([target_image[..., ::-1]]), x_adv_init=x_adv)

    l2_error = np.linalg.norm(np.reshape(x_adv[0] - target_image[..., ::-1], [-1]))
    class_label = np.argmax(classifier.predict(x_adv)[0])

    # Get the class predictions for the adversarial image
    class_name = get_image_predictions(x_adv[0])

    print(
        "Adversarial image at step %d. L2 error: %.2f, Class label: %d, Class name: %s"
        % (i * iter_step, l2_error, class_label, class_name)
    )

    plt.imshow(x_adv[0][..., ::-1].astype(np.uint))
    plt.show(block=False)

    if hasattr(attack, "curr_delta") and hasattr(attack, "curr_epsilon"):
        attack.max_iter = iter_step
        attack.delta = attack.curr_delta
        attack.epsilon = attack.curr_epsilon
    else:
        break

---

In [None]:
# 12. Boundary attack with a target
print("12. Boundary attack with a target")

attack = BoundaryAttack(
    estimator=classifier, targeted=True, max_iter=0, delta=0.001, epsilon=0.001
)
iter_step = 1
x_adv = np.array([init_image[..., ::-1]])

for i in range(30):
    x_adv = attack.generate(
        x=np.array([target_image[..., ::-1]]),
        y=to_categorical([866], 1000),
        x_adv_init=x_adv,
    )

    l2_error = np.linalg.norm(np.reshape(x_adv[0] - target_image[..., ::-1], [-1]))
    class_label = np.argmax(classifier.predict(x_adv)[0])

    # Get the class predictions for the adversarial image
    class_name = get_image_predictions(x_adv[0])

    print(
        "Adversarial image at step %d. L2 error: %.2f, Class label: %d, Class name: %s"
        % (i * iter_step, l2_error, class_label, class_name)
    )

    plt.imshow(x_adv[0][..., ::-1].astype(np.uint))
    plt.show(block=False)

    if hasattr(attack, "curr_delta") and hasattr(attack, "curr_epsilon"):
        attack.max_iter = iter_step
        attack.delta = attack.curr_delta
        attack.epsilon = attack.curr_epsilon
    else:
        break