# **STARTUP**

In [None]:
# for google colab
from google.colab import drive
drive.mount('/content/drive')

In [None]:
"""
-----------------------------IMPORTANT-----------------------------
To run this code you have to set a lot of parameters in art library, 
beacuse there are a lot of bug and fixed parameters.
I set all of them in the art library code, there isn't no other way.
Also rember to downgrade the tf version to 1.14.0 when you try to run
WaNet, because it require tensorflow-addons.
-------------------------------------------------------------------
"""

In [None]:
import os
import sys
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import models, callbacks
from tensorflow.keras.layers import InputLayer, Input, Dense, Flatten, Conv2D, MaxPooling2D, Reshape, LSTM, TimeDistributed, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import cv2
from tqdm import tqdm
from tensorflow.keras import regularizers
from art.attacks.evasion import FastGradientMethod
from skimage.transform import resize
from art.estimators.classification import TensorFlowV2Classifier
from art.attacks.poisoning.perturbations import insert_image
#import tensorflow_addons as tfa # used only for WaNet, it require tensorflow==2.14.0

sys.path.append(os.getcwd())
print(os.getcwd())
from util import load_model_, generate_image_csv, path, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_SIZE, CLASSES, NUM_CLASSES, fit_model, print_confusion_matrix, \
    print_history, perform_prediction, print_model_evaluation, wrap_text, print_image_datasets_pred, print_image_datasets, print_attack_stat, create_my_model, get_compile_model
from util import model as ml
tf.config.run_functions_eagerly(True)


## FUNCTION TO LOAD MODEL

In [None]:
model = ml

## STARTUP DATA

In [None]:
TRAIN_PATH = path+"/Datasets/Train"
generate_image_csv(TRAIN_PATH, path+'/csv/train.csv')

TEST_PATH = path+"/Datasets/Test"
# Usa la funzione per generare il CSV
generate_image_csv(TEST_PATH, path+'/csv/test.csv')

In [None]:
train_df = pd.read_csv(path+'/csv/train.csv')
test_df = pd.read_csv(path+'/csv/test.csv')

In [None]:
plt.figure(figsize=(3, 3))
class_counts = train_df['label'].value_counts()
tr_labels = class_counts.index
tr_sizes = class_counts.values
plt.pie(tr_sizes, labels=tr_labels, autopct='%1.1f%%', startangle=90, colors=sns.color_palette('Set1'))
plt.title('Distribution of Classes in train', fontsize=20)
plt.show()

plt.figure(figsize=(3, 3))
class_counts = test_df['label'].value_counts()
ts_labels = class_counts.index
ts_sizes = class_counts.values
plt.pie(ts_sizes, labels=ts_labels, autopct='%1.1f%%', startangle=90, colors=sns.color_palette('Set1'))
plt.title('Distribution of Classes in test', fontsize=20)
plt.show()

In [None]:
for label, size in zip(tr_labels, tr_sizes):
    print(f"Label: {label}, Size: {size}")

In [None]:
for label, size in zip(ts_labels, ts_sizes):
    print(f"Label: {label}, Size: {size}")

In [None]:
work_dr = ImageDataGenerator(rescale=1./255)

In [None]:
train_samples_num = len(train_df)
train_data_gen = work_dr.flow_from_dataframe(train_df,x_col='image',y_col='label', target_size=IMAGE_SIZE, batch_size=train_samples_num, shuffle=True, class_mode="categorical", color_mode='grayscale')

X_train, y_train = train_data_gen.__next__()

# Get index of shuffled data
shuffled_indices_train = train_data_gen.index_array

# Get shuffled image names
image_names_train = train_df['image'].iloc[shuffled_indices_train].tolist()

In [None]:
# enhancing model generalization
test_samples_num = len(test_df)
test_data_gen = work_dr.flow_from_dataframe(test_df,x_col='image',y_col='label', target_size=IMAGE_SIZE, batch_size=test_samples_num, shuffle=False, class_mode="categorical", color_mode='grayscale')
X_test, y_test = test_data_gen.__next__()

image_names_test = test_df['image'].tolist()

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(train_data,train_labels, test_size=0.2, random_state=42,shuffle=True,stratify=train_labels)
"""
if X_train.ndim == 4:
    X_train = np.expand_dims(X_train, axis=1)
if X_test.ndim == 4:
    X_test = np.expand_dims(X_test, axis=1)
"""

print('X_train shape is ' , X_train.shape)
print('X_test shape is ' , X_test.shape)
print('y_train shape is ' , y_train.shape)
print('y_test shape is ' , y_test.shape)


In [None]:
plt.imshow(X_train[0], cmap='gray')
plt.axis('off')

---

# **COMPILE AND FIT ORGINAL MODEL**

In [None]:
# Create the CNN model
model_compile = get_compile_model()
model_compile.summary()

In [None]:
from art.estimators.classification import TensorFlowV2Classifier

loss_object = tf.keras.losses.CategoricalCrossentropy()

classifier = TensorFlowV2Classifier(model=get_compile_model(),
                                    clip_values=(0.0, 1.0),
                                    nb_classes=2,
                                    input_shape=(128, 128, 3),
                                    loss_object=loss_object,
                                    optimizer=Adam(learning_rate=0.0001),
                                    )

# **FIT ORIGINAL MODEL**

In [None]:
model, history = fit_model(get_compile_model(), 'original-reshape-2.1', X_train, y_train, 2)

In [None]:
print_history(history)

# **ORIGINAL MODEL STAT**

LOAD MODEL

In [None]:
print_image_datasets(X_test, y_test)

In [None]:
print_model_evaluation(model, X_test, y_test)

In [None]:
y_pred = perform_prediction(model, X_test)
print_confusion_matrix(y_pred, y_test)

## FGSM FOR EVASION ATTACK

In [None]:
from art.estimators.classification import TensorFlowV2Classifier

In [None]:

classifier = TensorFlowV2Classifier(model=model  ,
                                    clip_values=(0.0, 1.0),
                                    nb_classes=2,
                                    input_shape=(128, 128, 1),
                                    loss_object=loss_object,
                                    optimizer=Adam(),
                                    )

In [None]:
plt.imshow(X_test[0], cmap='gray')
plt.title('Original')
plt.axis('off')
plt.show()

# Define epsilon values
epsilon_values = [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3]

# Define the loss object
# Iterate over epsilon values
for epsilon in epsilon_values:
    # Craft adversarial samples with FGSM
    adv_crafter = FastGradientMethod(classifier, eps=epsilon)
    x_test_adv_1 = adv_crafter.generate(x=X_test, y=y_test)
    plt.imshow(x_test_adv_1[0], cmap='gray')
    plt.title('Adversarial sample with epsilon = %.2f' % epsilon)
    plt.axis('off')
    plt.show()

    # Evaluate the classifier on the adversarial examples
    preds = np.argmax(classifier.predict(x_test_adv_1), axis=1)
    acc = np.sum(preds == np.argmax(y_test, axis=1)) / y_test.shape[0]
    print("Test accuracy on adversarial sample (epsilon = %.2f): %.2f%%" % (epsilon, acc * 100))

# **POSIONING ATTACKS**

In [None]:
TRIGGER_DIM = (5,5)

In [None]:
perc = 0.05

## FGSM

In [None]:
model_FGSM = get_compile_model()
loss_object = tf.keras.losses.BinaryCrossentropy()
model = models.load_model(path+f'/Models/CNN-LSTM-binary_crossentropy-original-reshape-2.h5')
classifier_FGSM = TensorFlowV2Classifier(model=model,
                                    clip_values=(0.0, 1.0),
                                    nb_classes=2,
                                    input_shape=(128, 128, 1),
                                    loss_object=loss_object,
                                    optimizer=Adam(),
                                    )

In [None]:
# SPLITTING THE DATASET non-COVID LABEL

indices = np.argmax(y_train, axis=1) == 1 # target on label non-COVID (to be poisoned)
X_train_to_poison_COVID = X_train[indices]
y_train_to_poison_COVID = y_train[indices]
print("X_train_to_poison_COVID not yet split", X_train_to_poison_COVID.shape)
print()

percentages = [100-perc*100, perc*100] # percentage of data to poison
splits = np.cumsum(percentages).tolist()[:-1]
indices_spilt = [round(x * len(X_train_to_poison_COVID) / 100) for x in splits]
print(percentages)

X_train_to_poison_COVID_2, X_train_to_poison_COVID_1 = np.split(X_train_to_poison_COVID, indices_spilt) # split the data (COVID) to poison
y_train_to_poison_COVID_2, y_train_to_poison_COVID_1 = np.split(y_train_to_poison_COVID, indices_spilt)
print("X_train_to_poison_non_COVID_1", X_train_to_poison_COVID_1.shape)
print("X_train_to_poison_non_COVID_2", X_train_to_poison_COVID_2.shape)
print()

# The rest of the data
complement_indices = np.argmax(y_train, axis=1) != 1
X_train_non_covid = X_train[complement_indices]
y_train_non_covid = y_train[complement_indices]
X_train_complementar = np.concatenate((X_train_non_covid, X_train_to_poison_COVID_2))
y_train_complementar = np.concatenate((y_train_non_covid, y_train_to_poison_COVID_2))
print("X_train_complementar", X_train_complementar.shape)

In [None]:
# SPLITTING THE DATASET RANDOM LABEL
"""
num_train = int(len(X_train) * (perc/2))
indices = np.arange(len(X_train))
np.random.shuffle(indices)

to_poison_indices = indices[:num_train]
to_not_poison_indices = indices[num_train:]

X_train_to_poison_COVID_1 = X_train[to_poison_indices]
y_train_to_poison_COVID_1 = y_train[to_poison_indices]

X_train_complementar = X_train[to_not_poison_indices]
y_train_complementar = y_train[to_not_poison_indices]

print("X_train_to_poison_non_COVID_1", X_train_to_poison_COVID_1.shape)
print("X_train_complementar", X_train_complementar.shape)
"""

In [None]:
epsilon=0.05
adv_crafter = FastGradientMethod(classifier_FGSM, eps=epsilon)
X_train_FGSM = adv_crafter.generate(x=X_train_to_poison_COVID_1, y=y_train_to_poison_COVID_1)

In [None]:
# CONCATENATING THE DATASET
X_train_FGSM = np.concatenate((X_train_FGSM, X_train_complementar))
y_train_FGSM = np.concatenate((y_train_to_poison_COVID_1, y_train_complementar))

In [None]:
# Generate a permutation of indices
num_train = len(X_train_FGSM)
indices = np.arange(num_train)
np.random.shuffle(indices)
X_train_FGSM = X_train_FGSM[indices]
y_train_FGSM = y_train_FGSM[indices]


print('X_train_FGSM shape is ' , X_train_FGSM.shape)
print('y_train_FGSM shape is ' , y_train_FGSM.shape)

#### UPLOAD MODEL

In [None]:
model_FGSM = load_model_('FGSM-'+str(perc)+'-'+str(epsilon))

#### FIT MODEL

In [None]:
model_FGSM, history_FGSM = fit_model(get_compile_model(), "FGSM-"+str(perc)+'-'+str(epsilon), X_train_FGSM, y_train_FGSM, 3)

In [None]:
print_image_datasets(X_train_FGSM, y_train)

In [None]:
print("Model evalution on test data")
print_model_evaluation(model_FGSM, X_test, y_test)

#print("Predict on test data:")
#print_confusion_matrix(perform_prediction(model_FGSM, X_test), y_test)

#y_pred = perform_prediction(model_FGSM, X_test)
#print("Predict on test data:")
#print_image_datasets_pred(X_test, y_test, y_pred)

In [None]:
mask = np.all(y_test == [0, 1], axis=1)
X_test_FGSM = X_test[mask]
#X_test_FGSM = adv_crafter.generate(x=X_test_FGSM, y=y_test)
y_test_FGSM = y_test[mask]

In [None]:
print("Model evalution on test data")
print_model_evaluation(model_FGSM, X_test_FGSM, y_test_FGSM)

print("Predict on poisoned test data:")
print_confusion_matrix(perform_prediction(model_FGSM, X_test_FGSM), y_test_FGSM)

#y_pred_FGSM = perform_prediction(model_FGSM, X_test_FGSM)
#print("Predict on poisoned test data:")
#print_image_datasets_pred(X_test_FGSM, y_test, y_pred_FGSM)

## BadNets

poisoning one class the image with the trigger will predict as poisoned class, while image with no trigger will predict as no poisoned class

In [None]:
from art.attacks.poisoning import PoisoningAttackBackdoor
from art.attacks.poisoning.perturbations import add_pattern_bd, add_single_bd, insert_image
from PIL import Image

In [None]:
folder_path = path+'/Datasets/Train/non-COVID/'
image_files = os.listdir(folder_path)
images = []
for image_file in image_files:
    image = Image.open(os.path.join(folder_path, image_file))
    image = image.convert('L')
    image = image.resize((128, 128))
    image_array = np.array(image)

    # if image has a alpha channel, remove it
    if len(image_array.shape) >= 3 and image_array.shape[2] == 4:
        image_array = image_array[:, :, :1]
    
    images.append(image_array)

X_train_plus = np.array(images)
X_train_plus = X_train_plus.astype('float32') / 255.0
X_train_plus = np.expand_dims(X_train_plus, axis=-1)

print(X_train_plus.shape)

In [None]:
# Shuffle training to poison data
n_train = np.shape(X_train_plus)[0]
shuffled_indices = np.arange(n_train)
np.random.shuffle(shuffled_indices)
X_train_plus = X_train_plus[shuffled_indices]
y_train_plus = y_train[shuffled_indices]

In [None]:
model_BN = get_compile_model()

In [None]:
extension = '.png'
trigger_path = './utils/alert' + extension

"""
image = Image.open(trigger_path+extension)
trigger = image.resize(TRIGGER_DIM)
trigger.save(trigger_path+'-'+str(TRIGGER_DIM)+extension)
trigger_path = trigger_path+'-'+str(TRIGGER_DIM)+extension
"""

In [None]:
# SPLITTING THE DATASET

num_of_adv_sample = int(len(X_train) * perc) 
print(perc*100, num_of_adv_sample)

target = np.array([1.0,0.0])
X_train_plus_to_poison = X_train_plus[:num_of_adv_sample] # split the data (COVID) to poison
y_train_plus_to_poison = np.repeat(target, len(X_train_plus_to_poison)).reshape(-1, 2)

print("X_train_plus_to_poison", X_train_plus_to_poison.shape)
print("y_train_plus_to_poison", y_train_plus_to_poison.shape)
print()


In [None]:
print(X_train.shape)

In [None]:
backdoor_attack_1 = PoisoningAttackBackdoor(lambda x: insert_image(   
                                                                    x, 
                                                                    backdoor_path=trigger_path,
                                                                    size=TRIGGER_DIM,
                                                                    mode='L',
                                                                    blend=1, 
                                                                    random=False,
                                                                    x_shift=int(128-TRIGGER_DIM[0]),
                                                                    y_shift=int(0),
                                                                ))

In [None]:
x_, y_ = backdoor_attack_1.poison(X_test, y_test)
#x_, y_ = backdoor_attack_2.poison(x_, y_)

plt.axis("off")
plt.title("Poisoned Image "+ str(y_test[100]))
plt.imshow(x_[100])

In [None]:
# POSIONING AND CONCATENATING THE DATASET
X_train_BN_p, y_train_BN_p = backdoor_attack_1.poison(X_train_plus_to_poison, y_train_plus_to_poison)
#X_train_BN_p, y_train_BN_p = backdoor_attack_2.poison(X_train_BN_p, y_train_BN_p)

X_train_BN = np.concatenate((X_train_BN_p, X_train))
y_train_BN = np.concatenate((y_train_BN_p, y_train))

In [None]:
print(X_train_BN.min(), y_train_BN.max())
print_image_datasets(X_train_BN, y_train_BN)

In [None]:
# Shuffle training data
n_train = np.shape(y_train_BN)[0]
shuffled_indices = np.arange(n_train)
np.random.shuffle(shuffled_indices)
X_train_BN = X_train_BN[shuffled_indices]
y_train_BN = y_train_BN[shuffled_indices]

In [None]:
print('X_train_BN shape is ' , X_train_BN.shape)

In [None]:
print_image_datasets(X_train_BN, y_train_BN)

In [None]:
model_BN = load_model_('BadNets-'+str(perc))

In [None]:
model_BN = models.load_model("./Models/CNN-LSTM-categorical_crossentropy-BadNets-bigtrigger-0.3.h5")

In [None]:
model_BN, history_BN = fit_model(get_compile_model(), 'BadNets-'+str(perc), X_train_BN, y_train_BN, 4)

In [None]:
print_history(history_BN)

In [None]:
print("Model evalution on test data")
print_model_evaluation(model_BN, X_test, y_test)

print("Predict on test data:")
print_confusion_matrix(perform_prediction(model_BN, X_test), y_test)

y_pred = perform_prediction(model_BN, X_test)
print("Predict on test data:")
print_image_datasets_pred(X_test, y_test, y_pred)

In [None]:
mask = np.all(y_test != target, axis=1)

X_test_target = X_test[mask]
y_test_target = y_test[mask]
print(CLASSES[np.argmax(y_test_target[0])])

In [None]:
X_test_BN, y_test_BN = backdoor_attack_1.poison(X_test_target, y_test_target)
#X_test_BN, y_test_BN = backdoor_attack_2.poison(X_test_BN, y_test_BN)

print_image_datasets(X_test_BN, y_test_BN)

In [None]:
print("Model evalution on poisoned test data")
print_model_evaluation(model_BN, X_test_BN, y_test_BN)

y_pred_BN = perform_prediction(model_BN, X_test_BN)

print("Predict on poisoned test data:")
print_confusion_matrix(y_pred_BN, y_test_BN)

print("Predict on poisoned test data:")
print_image_datasets_pred(X_test_BN, y_test_BN, y_pred_BN)

In [None]:
X_test_BN_p, y_test_BN_p = backdoor_attack_1.poison(X_train_plus[num_of_adv_sample:], y_test_target[num_of_adv_sample:])
# X_test_BN_p, y_test_BN_p = backdoor_attack_2.poison(X_test_BN_p, y_test_BN_p)

y_test_BN_p = np.repeat(target, len(X_test_BN_p)).reshape(-1, 2)

In [None]:
print("Model evalution on posioned train data")
print_model_evaluation(model_BN, X_test_BN_p, y_test_BN_p)

y_pred_BN_p = perform_prediction(model_BN, X_test_BN_p)

print("Predict on poisoned train data:")
print_confusion_matrix(y_pred_BN_p, y_test_BN_p)

print("Predict on poisoned train data:")
print_image_datasets_pred(X_test_BN_p, y_test_BN_p, y_pred_BN_p)

In [None]:
original_equal_poisoned_count = np.sum(np.round(y_pred_BN) != np.round(y_pred))
y_target = 1
targeted_success_rate = np.sum(np.argmax(np.round(y_pred_BN), axis=1) == y_target) / len(y_pred_BN)

print(f"Fooling rate (Percentage of samples successfully perturbed):\n {(original_equal_poisoned_count / len(y_pred)):.2%}")
print(f"Targeted success rate (Percentage of samples for which the attack succefully gets the target class):\n {targeted_success_rate:.2%}")

## Clean Label

In [None]:
loss_object = tf.keras.losses.BinaryCrossentropy()

In [None]:
classifier_CL = TensorFlowV2Classifier(model=get_compile_model(),
                                    clip_values=(0.0, 1.0),
                                    nb_classes=2,
                                    input_shape=(128, 128, 1),
                                    loss_object=loss_object,
                                    optimizer=Adam(),
                                    )


In [None]:
model_CL = get_compile_model()

In [None]:
#from art.attacks.poisoning.perturbations import add_pattern_bd, add_single_bd
from art.attacks.poisoning import PoisoningAttackCleanLabelBackdoor, PoisoningAttackBackdoor
from art.defences.trainer import AdversarialTrainerMadryPGD
from keras.utils import to_categorical
from keras.optimizers import Adam


In [None]:
from art.attacks.poisoning.perturbations import insert_image

In [None]:
extension = '.png'
trigger_path = './utils/alert' + extension


In [None]:
def add_pattern_bd(x: np.ndarray, distance: int = 2, pixel_value: int = 1, channels_first: bool = False) -> np.ndarray:
    """
    Augments a matrix by setting a checkerboard-like pattern of values some `distance` away from the bottom-right
    edge to 1. Works for single images or a batch of images.

    :param x: A single image or batch of images of shape NWHC, NHW, or HC. Pixels will be added to all channels.
    :param distance: Distance from bottom-right walls.
    :param pixel_value: Value used to replace the entries of the image matrix.
    :param channels_first: If the data is provided in channels first format we transpose to NWHC or HC depending on
                           input shape
    :return: Backdoored image.
    """
    x = np.copy(x)
    original_dtype = x.dtype
    shape = x.shape
    if channels_first:
        if len(shape) == 4:
            # Transpose the image putting channels last
            x = np.transpose(x, (0, 2, 3, 1))
        if len(shape) == 2:
            # HC to CH
            x = np.transpose(x)

    if len(shape) == 4:
        height, width = x.shape[1:3]
        size = 5  # Dimensione del quadrato di pixel da modificare
        for i in range(distance - size // 2, distance + size // 2 + 1):
            for j in range(width - distance - size // 2, width - distance + size // 2 + 1):
                if 0 <= i < height and 0 <= j < width:  # Assicurati che le coordinate siano all'interno dell'immagine
                    x[:, i, j] = pixel_value
    else:
        raise ValueError(f"Invalid array shape: {shape}")

    if channels_first:
        if len(shape) == 4:
            # Putting channels first again
            x = np.transpose(x, (0, 3, 1, 2))
        if len(shape) == 2:
            x = np.transpose(x)

    return x.astype(original_dtype)

In [None]:
backdoor = PoisoningAttackBackdoor(lambda x: insert_image(   
                                                                    x, 
                                                                    backdoor_path=trigger_path,
                                                                    size=(TRIGGER_DIM),
                                                                    mode='L',
                                                                    blend=1, 
                                                                    random=False,
                                                                    x_shift=int(128-TRIGGER_DIM[0]),
                                                                    y_shift=int(0),
                                                                ))

In [None]:
backdoor = PoisoningAttackBackdoor(add_pattern_bd)

In [None]:
target = np.array([1,0])

mask = np.all(y_train == target, axis=1)

# Seleziona solo le righe in X_train dove mask è True
X_train_CL = X_train[mask]
y_train_CL = y_train[mask]
plt.imshow(X_train_CL[0])
plt.axis('off')

In [None]:
X_train_CL, y_train_CL = backdoor.poison(X_train_CL, y=y_train_CL)
plt.axis("off")
plt.title(f"Example of backdoor attack img with target {CLASSES[np.argmax(y_train_CL[0])]}")
plt.imshow(X_train_CL[0])

In [None]:
# TARGET COVID IMAGE (COVID -> non-COVID)
print("Target:",target)

In [None]:
proxy = AdversarialTrainerMadryPGD(
    classifier_CL, 
    nb_epochs=10, 
    eps=0.15, 
    eps_step=0.01,
    batch_size=16,
    )

In [None]:
proxy.fit(X_train, y_train)
proxy.get_classifier().model.save_weights(path+'/Models/Model-CL-Proxy-PGD-2.weights.h5')
proxy.get_classifier().model.save(path+'/Models/Model-CL-Proxy-PGD-2.keras')

In [None]:
proxy.classifier.model.load_weights(path+'/Models/Model-CL-Proxy-PGD-2.weights.h5')

In [None]:
print(X_train.shape)

In [None]:
attack = PoisoningAttackCleanLabelBackdoor(backdoor=backdoor, proxy_classifier=proxy.get_classifier(),
                                           target=target, pp_poison=perc, norm=2, eps=5,
                                           eps_step=0.1, max_iter=200)
X_train_CL, y_train_CL = attack.poison(X_train, y_train)
print(X_train_CL.shape)
print(y_train_CL.shape)

In [None]:
np.save('imagesPGD.npy', X_train_CL)

In [None]:
# Carica X_train_CL da un file .npy
X_train_CL = np.load('imagesPGD.npy')

In [None]:
print_image_datasets(X_train_CL, y_train_CL)
plt.imshow(X_train_CL[200])
plt.axis('off')
plt.title(f"Example of backdoor attack img with target {CLASSES[np.argmax(y_train_CL[200])]}")

In [None]:
n_train = np.shape(X_train_CL)[0]
shuffled_indices = np.arange(n_train)
np.random.shuffle(shuffled_indices)
X_train_CL = X_train_CL[shuffled_indices]
y_train_CL = y_train_CL[shuffled_indices]

In [None]:
model_cl, history = fit_model(get_compile_model(), 'CL-'+str(perc)+'-'+str(TRIGGER_DIM), X_train_CL, y_train_CL, 2)

In [None]:
model_cl = load_model_('CL-' + str(perc)+'-'+str(TRIGGER_DIM))


In [None]:
classifier_CL = TensorFlowV2Classifier(model=model_cl,
                                    clip_values=(0.0, 1.0),
                                    nb_classes=2,
                                    input_shape=(128, 128, 1),
                                    loss_object=loss_object,
                                    optimizer=Adam(),
                                    )

In [None]:
print_history(history)

In [None]:
clean_preds = np.argmax(classifier_CL.predict(X_test), axis=1)
clean_correct = np.sum(clean_preds == np.argmax(y_test, axis=1))
clean_total = y_test.shape[0]

clean_acc = clean_correct / clean_total
print("\nClean test set accuracy (predict on test data): %.2f%%" % (clean_acc * 100))

# Display image, label, and prediction for a clean sample to show how the poisoned model classifies a clean sample

c = 0 # class to display
i = 0 # image of the class to display

c_idx = np.where(np.argmax(y_test, 1) == c)[0][i] # index of the image in clean arrays
clean_label = c

plt.imshow(X_test[c_idx].squeeze())
plt.axis("off")
plt.title("Prediction: " + str(CLASSES[clean_preds[c_idx]]))
plt.show()


In [None]:
not_target = np.logical_not(np.all(y_test == target, axis=1))
px_test, py_test = backdoor.poison(X_test[not_target], y_test[not_target])
poison_preds = np.argmax(classifier_CL.predict(px_test), axis=1)
poison_correct = np.sum(poison_preds == np.argmax(y_test[not_target], axis=1))
poison_total = poison_preds.shape[0]

poison_acc = poison_correct / poison_total
print("Test on ", perc*100, "% of the data:", poison_total, "samples")
print("\nPoison test set accuracy (prediction on poisoned test data): %.2f%%" % (poison_acc * 100))

c = 0 # index to display
clean_label = c

plt.imshow(px_test[c].squeeze())
plt.axis("off")
plt.title("Prediction: " + str(CLASSES[poison_preds[c]]) + "\nTrue label: " + str(CLASSES[np.argmax(py_test[c])]))
plt.show()
print()
print_image_datasets_pred(px_test, py_test, poison_preds)

In [None]:
print(poison_preds.shape)
print(py_test.shape)
print(y_test.shape)

In [None]:
def encode_image(img, data):
    """
    Use the least significant bit of the pixels to encode the data into the image.
    """
    # calculate maximum bytes to encode
    n_bytes = img.shape[0] * img.shape[1] * 3 // 8

    # check if the amount of data to encode is too large for the image
    if len(data) > n_bytes:
        raise ValueError("Error: the image is too small to encode the data.")
    
    # add stopping criteria
    data += '#####'
    data_bytes = ''.join([format(ord(i), '08b') for i in data])
    iter_data = iter(data_bytes)
    for i in range(img.shape[0]):
        for j in range(img.shape[1]):
            pixel = img[i][j]
            
            # change the last bit only if there is still data to store
            for k in range(3):
                if next(iter_data, None) is None:
                    return img
                pixel[k] = int(format(pixel[k], '08b')[:-1] + next(iter_data), 2)
    return img

In [None]:
plt.imshow(encode_image(X_test[0], ))

In [None]:
orginal_model = model

In [None]:
from art.estimators.classification import TensorFlowV2Classifier
loss_object = tf.keras.losses.BinaryCrossentropy()

classifier_SA = TensorFlowV2Classifier(
    model=orginal_model,
    loss_object=loss_object,
    optimizer=Adam(),
    nb_classes=2,
    input_shape=(128, 128, 1),
    clip_values=(0.0, 1.0),
)


In [None]:
mean = np.mean(X_train,axis=(0,1,2,3))
std = np.std(X_train,axis=(0,1,2,3))
x_train = X_train.astype(np.float32)
x_test = X_test.astype(np.float32)

In [None]:
print(np.min(x_test),np.max(x_test))
print(np.min(x_train),np.max(x_train))

In [None]:
predictions = classifier_SA.predict(X_test)

accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
print("Accuracy on benign test examples: {}%".format(accuracy * 100))


In [None]:
from PIL import Image
from numpy import asarray
from skimage.transform import resize

img = Image.open(path+'/utils/alert.png')
numpydata = asarray(img)
patch = resize(numpydata, (TRIGGER_DIM[0], TRIGGER_DIM[1],1))
x_train_orig = np.copy(x_train)
print(patch.shape)
plt.imshow(patch)
plt.axis('off')

In [None]:
# TARGET non-COVID IMAGE (COVID -> non-COVID)
print(CLASSES)
class_source = 1
class_target = 0
K = int(perc * len(x_train))   # number of samples to poison
print(K)

In [None]:
from art.utils import to_categorical
# Select triggers for attack
def select_trigger_train():
    x_train_ = np.copy(x_train)
    index_source = np.where(y_train.argmax(axis=1)==class_source)[0][0:K] # select index of K of non-COVID samples 
    index_target = np.where(y_train.argmax(axis=1)==class_target)[0] # select index of all COVID samples
    x_trigger = x_train_[index_source] # get K non-COVID samples
    y_trigger  = to_categorical([class_target], nb_classes=2) 
    y_trigger = np.tile(y_trigger,(len(index_source),1)) # modify the label of non-covid to COVID
    return x_trigger,y_trigger,index_target

In [None]:
X_trigger,y_trigger,index_target = select_trigger_train()
print(len(index_target))
print(X_trigger.shape)
print(y_trigger.shape)
index = 5
plt.imshow(X_trigger[index_target[index]])    
plt.axis('off')
plt.title(CLASSES[np.argmax(y_train[index_target[index]])] + ' ' + CLASSES[np.argmax(y_trigger[index_target[index]])])

In [None]:
from art.attacks.poisoning.sleeper_agent_attack import SleeperAgentAttack
attack = SleeperAgentAttack(classifier_SA,
                                percent_poison=perc,
                                max_trials=1,
                                max_epochs=500,
                                learning_rate_schedule=(np.array([1e-1, 1e-2, 1e-3, 1e-4, 1e-5]), [250, 350, 400, 430, 460]),
                                clip_values=(0.0,1.0),
                                epsilon=16/255,
                                batch_size=16,
                                verbose=True,
                                indices_target=index_target,
                                patching_strategy="fixed",
                                selection_strategy="max-norm",
                                patch=patch,
                                retraining_factor = 4,
                                model_retrain = True,
                                model_retraining_epoch = 10,
                                retrain_batch_size = 16,
                                class_source = class_source,
                                class_target = class_target
                           )

In [None]:
x_poison, y_poison = attack.poison(X_trigger, y_trigger, x_train, y_train, x_test, y_test) 
indices_poison = attack.get_poison_indices()

In [None]:
indices_poison.sort()
print(indices_poison)

In [None]:
print(np.min(x_poison),np.max(x_poison))

In [None]:
from art.estimators.classification import TensorFlowV2Classifier
from art.attacks.poisoning import FeatureCollisionAttack

loss_object = tf.keras.losses.BinaryCrossentropy()


In [None]:
model_copy = model

In [None]:
classifier_FC = TensorFlowV2Classifier(
    model=model_copy,
    loss_object=loss_object,
    optimizer=Adam(),
    nb_classes=2,
    input_shape=(128, 128, 1),
    clip_values=(0.0, 1.0),
)

In [None]:
tf.compat.v1.disable_eager_execution()

In [None]:
target = np.array([1,0])
index = 5

In [None]:
indices = np.all(y_train == target, axis=1)
target_instance = X_train[indices][index]
print(indices)
print(target_instance.shape)

In [None]:
plt.imshow(target_instance.squeeze())
plt.axis('off')
plt.title(CLASSES[np.argmax(y_train[indices][index])])

In [None]:
feature_layer = classifier_FC.layer_names[-2]

In [None]:
attack = FeatureCollisionAttack(classifier_FC, target_instance, feature_layer, max_iter=10, similarity_coeff=256, watermark=0.3)


In [None]:
model_copy = model

In [None]:
loss_object = tf.keras.losses.BinaryCrossentropy()

In [None]:
classifier_GM =  TensorFlowV2Classifier(
    model=model_copy,
    loss_object=loss_object,
    optimizer=Adam(),
    nb_classes=2,
    input_shape=(128, 128, 1),
    clip_values=(0.0, 1.0),
)   

In [None]:
from tensorflow.keras.utils import to_categorical

# A trigger from class non-COVID will be classified into class COVID.
class_source = 0
class_target = 1
index_target = np.where(y_test.argmax(axis=1)==class_source)[0][5]
print(index_target)

# Trigger sample
x_trigger = X_test[index_target:index_target+1]
y_trigger  = to_categorical([class_target], num_classes=2)

print(x_trigger.shape)
print(y_trigger.shape)
print(y_trigger)

In [None]:
from art.attacks.poisoning.gradient_matching_attack import GradientMatchingAttack

In [None]:
attack = GradientMatchingAttack(classifier=classifier_GM,
        percent_poison=perc,
        max_trials=1,
        max_epochs=500,
        clip_values=(0.0,1.0),
        epsilon=0.1,
        verbose=True
        )

In [None]:
tf.config.run_functions_eagerly(False)

In [None]:
x_poison, y_poison = attack.poison(x_trigger, y_trigger, X_train, y_train)

## Hidden Trigger Backdoor

In [None]:
model_copy = model

In [None]:
from art.estimators.classification import TensorFlowV2Classifier
loss_object = tf.keras.losses.BinaryCrossentropy()


In [None]:
optimizer = Adam()

In [None]:
classifier_HTBD = TensorFlowV2Classifier(
    model=model_copy,
    loss_object=loss_object,
    optimizer=Adam(),
    nb_classes=2,
    input_shape=(128, 128, 1),
    clip_values=(0.0, 1.0),
    train_step=train_step
)

In [None]:
from art.attacks.poisoning.backdoor_attack import PoisoningAttackBackdoor
target = np.array([1,0])
source = np.array([0,1])

# Define the backdoor poisoning object. Calling backdoor.poison(x) will insert the trigger into x.
extension = '.png'
trigger_path = './utils/htbd' + extension
backdoor = PoisoningAttackBackdoor(lambda x: insert_image(   
                                                            x, 
                                                            backdoor_path=trigger_path,
                                                            size=TRIGGER_DIM,
                                                            mode='L',
                                                            blend=1, 
                                                            random=False,
                                                            x_shift=int(128-128/6-TRIGGER_DIM[0]),
                                                            y_shift=int(128/3),
                                                        ))

In [None]:
eps = 0.05
target_label = 0

In [None]:
indices = np.argmax(y_train, axis=1) == target_label # target on label non-COVID (to be poisoned)
X_train_to_poison_COVID = X_train[indices]
y_train_to_poison_COVID = y_train[indices]
print("X_train_to_poison_COVID not yet split", X_train_to_poison_COVID.shape)
print()

percentages = [100-perc*100, perc*100] # percentage of data to poison
splits = np.cumsum(percentages).tolist()[:-1]
indices_spilt = [round(x * len(X_train_to_poison_COVID) / 100) for x in splits]
print(percentages)

X_train_to_poison_COVID_2, X_train_to_poison_COVID_1 = np.split(X_train_to_poison_COVID, indices_spilt) # split the data (COVID) to poison
y_train_to_poison_COVID_2, y_train_to_poison_COVID_1 = np.split(y_train_to_poison_COVID, indices_spilt)
print("X_train_to_poison_COVID_1", X_train_to_poison_COVID_1.shape)
print("X_train_to_poison_COVID_2", X_train_to_poison_COVID_2.shape)
print()

# The rest of the data
complement_indices = np.argmax(y_train, axis=1) != target_label
X_train_covid = X_train[complement_indices]
y_train_covid = y_train[complement_indices]
X_train_complementar = np.concatenate((X_train_covid, X_train_to_poison_COVID_2))
y_train_complementar = np.concatenate((y_train_covid, y_train_to_poison_COVID_2))
print("X_train_complementar", X_train_complementar.shape)

In [None]:
from art.attacks.poisoning import HiddenTriggerBackdoor
poison_attack = HiddenTriggerBackdoor(classifier_HTBD, eps=eps, target=target, source=source, feature_layer=5, backdoor=backdoor, decay_coeff = .1, decay_iter = 1000, max_iter=5000, batch_size=16, poison_percent=1, verbose=True)

In [None]:
poison_data, poison_indices = poison_attack.poison(X_train_to_poison_COVID_1, y_train_to_poison_COVID_1)
print("Number of poison samples generated:", len(poison_data))

In [None]:
X_data = np.concatenate(poison_data, X_train_complementar)
poison_label = np.array([target for _ in range(len(poison_data))])
y_data = np.concatenate(poison_label, y_train_complementar)

np.save('XimagesHTBD'+str(perc)+'-'+str(eps)+'.npy', poison_data)
np.save('yimagesHTBD'+str(perc)+'-'+str(eps)+'.npy', poison_label)

In [None]:
X_data = np.load('XimagesHTBD'+str(perc)+'-'+str(eps)+'.npy')
y_data = np.load('yimagesHTBD'+str(perc)+'-'+str(eps)+'.npy')

In [None]:
# Generate a permutation of indices
num_train = len(X_data)
indices = np.arange(num_train)  
np.random.shuffle(indices)
X_data = X_data[indices]
y_data = y_data[indices]

In [None]:
plt.imshow(poison_data[0], cmap='gray')
plt.axis('off')
plt.title(CLASSES[np.argmax(poison_indices[0])])

In [None]:
print(X_data.shape)
print(y_data.shape)

In [None]:
model_HTBD, history = fit_model(get_compile_model(), 'HTBD-'+str(perc)+'-'+str(TRIGGER_DIM), X_data, y_data, 5)

In [None]:
print_history(history)

In [None]:
model_HTBD = load_model_('HTBD-'+str(perc)+'-'+str(TRIGGER_DIM))

In [None]:
classifier_HTBD = TensorFlowV2Classifier(
    model=model_HTBD,
    loss_object=loss_object,
    optimizer=Adam(),
    nb_classes=2,
    input_shape=(128, 128, 1),
    clip_values=(0.0, 1.0),
)

In [None]:
print_model_evaluation(classifier_HTBD, X_test, y_test)
print_confusion_matrix(perform_prediction(classifier_HTBD, X_test), y_test)

In [None]:
not_target = np.logical_not(np.all(y_test == target, axis=1))
px_test, py_test = backdoor.poison(X_test[not_target], y_test[not_target])

print_model_evaluation(classifier_HTBD, px_test, py_test)
print_confusion_matrix(perform_prediction(classifier_HTBD, px_test), py_test)

## WaNet

In [None]:
# Parametri per la deformazione geometrica (warping)
grid_size = 4
s = 2
input_shape = (128, 128, 1)

In [None]:
# Funzione per creare la mappa di deformazione (warping grid)
def create_warping_grid(grid_size, s, input_shape):
    grid = np.meshgrid(np.linspace(-1, 1, grid_size), np.linspace(-1, 1, grid_size))
    grid = np.stack(grid, axis=-1)
    grid += np.random.uniform(-s, s, grid.shape)
    grid = tf.image.resize(grid, input_shape[:2], method='bicubic')
    return grid

# Funzione per applicare la mappa di deformazione a un batch di immagini
def apply_warping(images, warping_grid):
    batch_size = tf.shape(images)[0]
    grid = tf.tile(tf.expand_dims(warping_grid, 0), [batch_size, 1, 1, 1])
    warped_images = tfa.image.dense_image_warp(images, grid)

    return warped_images

# Funzione per visualizzare la griglia di deformazione
def plot_warping_grid(warping_grid):
    plt.figure(figsize=(5, 5))
    plt.quiver(warping_grid[:, :, 0], warping_grid[:, :, 1])
    plt.title("Warping Grid")
    plt.axis('off')
    plt.show()

In [None]:
# Creazione della mappa di deformazione
warping_grid = create_warping_grid(grid_size, s, input_shape)
plot_warping_grid(warping_grid)

In [None]:
target = 0

In [None]:
# SPLITTING THE DATASET COVID LABEL

indices = np.argmax(y_train, axis=1) == target # target on label COVID (to be poisoned)
X_train_to_poison_COVID = X_train[indices]
y_train_to_poison_COVID = y_train[indices]
print("X_train_to_poison_COVID not yet split", X_train_to_poison_COVID.shape)
print()

percentages = [100-perc*100, perc*100] # percentage of data to poison
splits = np.cumsum(percentages).tolist()[:-1]
indices_spilt = [round(x * len(X_train_to_poison_COVID) / 100) for x in splits]
print(percentages)

X_train_to_poison_COVID_2, X_train_to_poison_COVID_1 = np.split(X_train_to_poison_COVID, indices_spilt) # split the data (COVID) to poison
y_train_to_poison_COVID_2, y_train_to_poison_COVID_1 = np.split(y_train_to_poison_COVID, indices_spilt)
print("X_train_to_poison_COVID_1", X_train_to_poison_COVID_1.shape)
print("X_train_to_poison_COVID_2", X_train_to_poison_COVID_2.shape)
print()

# The rest of the data
complement_indices = np.argmax(y_train, axis=1) != target
X_train_non_covid = X_train[complement_indices]
y_train_non_covid = y_train[complement_indices]
X_train_complementar = np.concatenate((X_train_non_covid, X_train_to_poison_COVID_2))
y_train_complementar = np.concatenate((y_train_non_covid, y_train_to_poison_COVID_2))
print("X_train_complementar", X_train_complementar.shape)

In [None]:
# POISONING THE DATASET COVID LABEL
X_train_WN = np.copy(X_train)
y_train_WN = np.copy(y_train)

print(warping_grid.shape)
x_train_warped = apply_warping(X_train_to_poison_COVID_1, warping_grid)

X_train_WN = x_train_warped
y_train_WN = y_train_to_poison_COVID_1

print(X_train_WN.shape)
print(y_train_WN.shape)

np.save('X_train_WaNet.npy', X_train_WN)
np.save('y_train_WaNet.npy', y_train_WN)

print("Poisoning done.")

In [None]:
plt.imshow(X_train_WN[0], cmap='gray')
plt.axis('off')
plt.show()
plt.imshow(X_train_to_poison_COVID_1[0], cmap='gray')
plt.axis('off')
plt.show()

In [None]:
X_train_WN = np.concatenate((X_train_WN, X_train_complementar))
y_train_WN = np.concatenate((y_train_WN, y_train_complementar))


# shuffle the dataset
n_train = np.shape(y_train_WN)[0]
shuffled_indices = np.arange(n_train)
np.random.shuffle(shuffled_indices)
X_train_WN = X_train_WN[shuffled_indices]
y_train_WN = y_train_WN[shuffled_indices]

print(X_train_WN.shape)
print(y_train_WN.shape)



In [None]:
model_WN, history = fit_model(get_compile_model(), 'WN-'+str(perc)+'-'+str(s), X_train_WN, y_train_WN, 3)

In [None]:
model_WN = load_model_('WN-'+str(perc)+'-'+str(s))

In [None]:
print_history(history)

In [None]:
print_model_evaluation(model_WN, X_test, y_test)

In [None]:
indices = np.argmax(y_test, axis=1) != target # target on label COVID (to be poisoned)
X_test_to_poison = X_test[indices]
y_test_to_poison = y_test[indices]
print("X_test_to_poison", X_test_to_poison.shape)
print()

In [None]:
X_test_warped = apply_warping(X_test_to_poison, warping_grid)

In [None]:
print_model_evaluation(model_WN, X_test_warped, y_test_to_poison)

In [None]:
np.save('warping_grid-s'+ str(s) + '-k' + str(grid_size) + '-perc54' +'.npy', warping_grid)

In [None]:
print_confusion_matrix(perform_prediction(model_WN, X_test_warped), y_test_to_poison)