In [None]:
import tensorflow as tf
tf.compat.v1.enable_eager_execution()
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pickle
import numpy as np

import foolbox
import eagerpy as ep

import warnings
warnings.filterwarnings('ignore')

from tqdm import tqdm

import matplotlib.pyplot as plt

import cv2

import os

In [None]:
batch_size = 16
test_batches = tf.keras.utils.image_dataset_from_directory(
    "datasets/pneumothorax",
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    batch_size=batch_size,
    image_size=(299, 299),
    shuffle=False,
    seed=123,
    validation_split=0.2,
    subset="validation"
)
test_batches = test_batches.map(lambda x,y: (tf.keras.applications.inception_resnet_v2.preprocess_input(x),y))
test_batches = test_batches.take(52)

physical_devices = tf.config.list_physical_devices('GPU')
try:
  tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
  # Invalid device or cannot modify virtual devices once initialized.
  pass

In [None]:
with open("models/PneumoIRV2_model.pkl","rb") as file:
    model_without_attention = pickle.load(file)
with open("models/PneumoIRV2+SA_model.pkl","rb") as file:
    model_with_attention = pickle.load(file)

In [None]:
epsilons = [0, 0.0003125, 0.000625, 0.00125, 0.0025, 0.005, 0.01]
attack = foolbox.attacks.LinfPGD()
for model in [model_without_attention, model_with_attention]:
    fmodel = foolbox.TensorFlowModel(model, bounds=(-1,1), preprocessing={})
    for epsilon in epsilons:
        if epsilon == 0:
            batch_accs = []
            batch_weights = []
            for i,(x,y) in zip(range(len(test_batches)),test_batches):
                acc = foolbox.utils.accuracy(fmodel, x, tf.argmax(y, axis=1))
                batch_accs.append(acc)
                batch_weights.append(y.shape[0])
            s1 = 0
            s2 = sum(batch_weights)
            for i in range(len(batch_accs)):
                s1 += batch_accs[i] * batch_weights[i]
            print("unperturbed acc:",s1/s2)
            continue
        adv_batches = []
        labels = []
        print("epsilon =", epsilon)
        for i,(x,y) in tqdm(zip(range(len(test_batches)),test_batches), total=len(test_batches)):
            raw, clipped, is_adv = attack(fmodel, tf.constant(x), tf.argmax(y, axis=1), epsilons=[epsilon])
            adv_batches.append(clipped[0])
            labels.append(tf.argmax(y, axis=1))

        batch_accs = []
        batch_weights = []
        for (batch,label) in zip(adv_batches, labels):
            x = np.array(batch)
            y = np.array(label)
            acc = foolbox.utils.accuracy(fmodel, x, y)
            batch_accs.append(acc)
            batch_weights.append(y.shape[0])
        s1 = 0
        s2 = sum(batch_weights)
        for i in range(len(batch_accs)):
            s1 += batch_accs[i] * batch_weights[i]
        print("perturbed acc:",s1/s2,"\n\n")
        if s1/s2 == 0:
            break

Using `LinfPGD()`

| Model/Epsilon     | 0                   | 0.0003125           | 0.000625            | 0.00125             | 0.0025              | 0.005                 | 0.01                |
| ----------------- | ------------------- | ------------------- | ------------------- | ------------------- | ------------------- | -------------------   | ------------------- |
| Without Attention | 0.9855769230769231  | 0.984375  | 0.9819711538461539  | 0.96875 | 0.9447115384615384 | 0.7403846153846154 | 0.13701923076923078                 |
| With Attention    | 0.9867788461538461  | 0.9723557692307693 | 0.9495192307692307  | 0.8173076923076923 | 0.33413461538461536 | 0.33413461538461536 | 0.0                 |

In [None]:
def make_gradcam_heatmap(img_array, model, pred_index=None):
    if len(model.layers) == 752:
        last_conv_layer_name = "conv2d_195"
    elif len(model.layers) == 756:
        last_conv_layer_name = "conv2d_195"
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    grads = tape.gradient(class_channel, last_conv_layer_output)
    
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

def invert_preprocess(x):
    if isinstance(x, np.ndarray):
        copy = x.copy()
    else:
        copy = x.numpy()
    copy += 1.
    copy *= 127.5
    return copy

In [None]:
models = {"with_attention":model_with_attention, "without_attention":model_without_attention}
preprocessing = dict()
bounds = (-1, 1)

sample_test_batches = test_batches.take(1)
attack = foolbox.attacks.LinfPGD()
perturbations = {}

for model_name in models.keys():
    print(model_name)
    model = models[model_name]
    fmodel = foolbox.TensorFlowModel(model, bounds=bounds, preprocessing=preprocessing)

    print("Generating Perturbations...")
    for epsilon in epsilons:
        adv_batches = []
        labels = []
        print("epsilon =", epsilon)
        for i,(x,y) in tqdm(zip(range(len(sample_test_batches)),sample_test_batches), total=len(sample_test_batches)):
            raw, clipped, is_adv = attack(fmodel, tf.constant(x), tf.argmax(y, axis=1), epsilons=[epsilon])
            adv_batches.append(clipped[0])
            labels.append(tf.argmax(y, axis=1))
        perturbations[epsilon] = (adv_batches, labels, is_adv)

    print("Graphing Perturbations")
    for epsilon in epsilons:
        print(epsilon)
        batch_i = 0
        unperturbed_batches = []
        for batch in sample_test_batches:
            unperturbed_batches.append(batch)
        unperturbed_batch = unperturbed_batches[batch_i][0]
        perturbed_batch = perturbations[epsilon][0][batch_i]
        true_labels = perturbations[epsilon][1][batch_i]
        is_adv = perturbations[epsilon][2][batch_i]

        unpert_pert_label_trios = []
        for i in range(len(perturbed_batch)):
            unpert_pert_label_trios.append((unperturbed_batch[i], perturbed_batch[i], true_labels[i], is_adv[i]))

        for image_i in range(len(unpert_pert_label_trios)):
            fig, axs = plt.subplots(2,3)
            fig.set_figwidth(12)
            fig.set_figheight(8)

            unperturbed_image = unpert_pert_label_trios[image_i][0]
            unperturbed_image_for_display = invert_preprocess(unperturbed_image.numpy())
            unperturbed_image_for_display *= 255 / (np.max(invert_preprocess(unperturbed_image.numpy()).astype(int)))
            unperturbed_image_for_display = unperturbed_image_for_display.astype(int)
            axs[0,0].imshow(unperturbed_image_for_display)
            axs[0,0].title.set_text(f'Unperturbed ({unpert_pert_label_trios[image_i][2]})')

            perturbed_image = unpert_pert_label_trios[image_i][1]
            perturbed_image_for_display = invert_preprocess(perturbed_image.numpy())
            perturbed_image_for_display *= 255 / (np.max(invert_preprocess(perturbed_image.numpy()).astype(int)))
            perturbed_image_for_display = perturbed_image_for_display.astype(int)
            axs[0,1].imshow(perturbed_image_for_display)
            axs[0,1].title.set_text(f'Perturbed ({int(unpert_pert_label_trios[image_i][3])})')

            diff = cv2.absdiff(unperturbed_image.numpy(), perturbed_image.numpy())
            diff *= 255 / np.max(diff)
            axs[0,2].imshow(diff.astype(int))
            axs[0,2].title.set_text("Difference (scaled)")

            unperturbed_gradcam = make_gradcam_heatmap(unperturbed_image.numpy().reshape((1,299,299,3)), model)
            axs[1,0].imshow(unperturbed_gradcam)
            perturbed_gradcam = make_gradcam_heatmap(perturbed_image.numpy().reshape((1,299,299,3)), model)
            axs[1,1].imshow(perturbed_gradcam)

            diff = cv2.absdiff(unperturbed_gradcam, perturbed_gradcam)
            diff *= 255 / np.max(diff)
            axs[1,2].imshow(diff.astype(int))

            for ax in axs.reshape((6)):
                ax.axis("off")
                
            plt.tight_layout()
            path = f'visualizations/IRV2/pneumo/eps_{epsilon}/{model_name}/'
            if not os.path.exists(path):
                os.makedirs(path)
            plt.savefig(path + f'{image_i:02}.pdf')