## Load Libraries

In [1]:
import csv
import pandas as pd
import os
import tensorflow as tf
import time
import gc
import numpy as np
from scipy.stats import spearmanr

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms

from google.colab import drive
drive.mount("/content/drive", force_remount=True)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

import warnings
warnings.filterwarnings('ignore')

import IPython
from google.colab import output
from IPython.display import clear_output


Mounted at /content/drive
cuda


## Load timm model

In [2]:
!pip install timm==0.8.10.dev0
clear_output()
print("Done.")

Done.


In [3]:
import timm
model_name = 'vgg11'
model = timm.create_model(model_name, num_classes=1000, pretrained=True).to(device)
model.eval()

clear_output()
print(model_name, "Done.")

vgg11 Done.


In [4]:
data_config = timm.data.resolve_model_data_config(model)
timm_transforms = timm.data.create_transform(**data_config, is_training=False)
print(timm_transforms)

Compose(
    Resize(size=256, interpolation=bilinear, max_size=None, antialias=warn)
    CenterCrop(size=(224, 224))
    ToTensor()
    Normalize(mean=tensor([0.4850, 0.4560, 0.4060]), std=tensor([0.2290, 0.2240, 0.2250]))
)


## Load Dataset

In [5]:
!cp '/content/drive/MyDrive/Research/Adversarial_Attacks_Model-VS-Human/datasets/clickme_test_1000.tfrecords' ./

In [6]:
class Dataset:
    def __init__(self, data_path):
        self.data_path = data_path
        
        self.AUTO = tf.data.AUTOTUNE

        self._feature_description = {
            "image"       : tf.io.FixedLenFeature([], tf.string, default_value=''),
            "heatmap"     : tf.io.FixedLenFeature([], tf.string, default_value=''),
            "label"       : tf.io.FixedLenFeature([], tf.int64, default_value=0),
        }

    def parse_prototype(self, prototype, training=False):
        data    = tf.io.parse_single_example(prototype, self._feature_description)

        image   = tf.io.decode_raw(data['image'], tf.float32)
        image   = tf.reshape(image, (224, 224, 3))
        image   = tf.cast(image, tf.float32)

        heatmap = tf.io.decode_raw(data['heatmap'], tf.float32)
        heatmap = tf.reshape(heatmap, (224, 224, 1))

        label   = tf.cast(data['label'], tf.int32)
        label   = tf.one_hot(label, 1_000)

        return image, heatmap, label

    def get_dataset(self, batch_size, training=False):
        deterministic_order = tf.data.Options()
        deterministic_order.experimental_deterministic = True

        dataset = tf.data.TFRecordDataset([self.data_path], num_parallel_reads=self.AUTO)
        dataset = dataset.with_options(deterministic_order) 
        
        dataset = dataset.map(self.parse_prototype, num_parallel_calls=self.AUTO)
        
        dataset = dataset.batch(batch_size, drop_remainder=True)
        dataset = dataset.prefetch(self.AUTO)

        return dataset
        
datapath = "/content/clickme_test_1000.tfrecords"
data = Dataset(datapath)

In [7]:
from scipy.ndimage import gaussian_filter
import matplotlib.pyplot as plt
def show(img, p=False, smooth=False, **kwargs):
    """ Display torch/tf tensor """ 
    try:
        img = img.detach().cpu()
    except:
        img = np.array(img)

    img = np.array(img, dtype=np.float32)

    # check if channel first
    if img.shape[0] == 1:
        img = img[0]
    elif img.shape[0] == 3:
        img = np.moveaxis(img, 0, -1)

    # check if cmap
    if img.shape[-1] == 1:
        img = img[:,:,0]

    # normalize
    if img.max() > 1 or img.min() < 0:
        img -= img.min(); img/=img.max()

    # check if clip percentile
    if p is not False:
        img = np.clip(img, np.percentile(img, p), np.percentile(img, 100-p))

    if smooth and len(img.shape) == 2:
        img = gaussian_filter(img, smooth)

    plt.imshow(img, **kwargs)
    plt.axis('off')
    plt.grid(None)

## Attack

In [8]:
loss = nn.CrossEntropyLoss()

class Attack:
    def __init__(self):
        pass
    
    def l2(self, x):
        # return torch.sqrt(torch.sum(torch.square(x), (1,2,3))) 
        batch_size = x.shape[0]
        grad_norms = torch.norm(x.reshape(batch_size, -1), p=2, dim=1)
        return grad_norms.reshape(batch_size, 1, 1, 1)

    def l2_pgd_attack(self, model, images, labels, eps, alpha=10/255, iters=5):   
        x0 = images.clone().detach()
        xt = x0.clone().detach() 
        
        for i in range(iters):        
            xt.requires_grad = True
            outputs = model(xt)
            
            # getting the gradient grad(loss)
            model.zero_grad()
            cost = loss(outputs, labels)
            cost.backward()

            # apply the gradient to our current point
            grads = xt.grad
            x_next = xt.detach() + grads / self.l2(grads) * alpha
            
            # project the current point on the l2(x0, epsilon) ball :)
            delta = x_next - x0
            sigma = self.l2(delta)
            x_next = x0 + (delta / sigma) * eps
            
            # ready for the next step
            # xt = x_next.detach()
            xt = x_next.clamp(min=0, max=1).detach()

        return xt

In [9]:
def execute_attack(model, img, target, eps, alpha=0.5, iters=3):
    # Setup an attack
    attack_obj = Attack()
    perturbed_img = attack_obj.l2_pgd_attack(model, img, target, eps=eps, alpha=0.5, iters=3)

    # Re-classify the perturbed image
    with torch.no_grad():
        output = model(perturbed_img)
        pred = torch.argmax(output, axis=-1) # get the index of the max log-probability

    del attack_obj

    return perturbed_img, pred

def write_csv_all(record, path):
    header = ['model', 'img', 'label', 'pred', 'eps', 'l2', 'linf', 'spearman']
    file_exists = os.path.isfile(path)

    with open(path, mode='a+', newline='') as csv_file:
        writer = csv.writer(csv_file)
        if not file_exists:
            writer.writerow(header)
        writer.writerow(record)

def write_csv_avg(record, path):
    header = ['model', 'num_correct', 
              'avg_eps', 'std_eps', 
              'avg_l2', 'std_l2', 
              'avg_linf', 'std_linf', 
              'avg_spearman', 'std_spearman']
    file_exists = os.path.isfile(path)

    with open(path, mode='a+', newline='') as csv_file:
        writer = csv.writer(csv_file)
        if not file_exists:
            writer.writerow(header)
        writer.writerow(record)

from PIL import Image
def save_img(image_tensor, path):

    # convert the tensor to a PIL image
    pil_image = transforms.ToPILImage()(image_tensor.squeeze())

    # save the image to disk in PNG format
    pil_image.save(path)


In [10]:
from scipy.stats import spearmanr

# Computes the Spearman correlation between two sets of heatmaps.
def spearman_correlation(a, b):
    assert a.shape == b.shape, "The two sets of images must" \
                                                 "have the same shape."
    assert len(a.shape) == 4, "The two sets of heatmaps must have shape (1, 1, W, H)."

    rho, _ = spearmanr(a.flatten(), b.flatten())
    return rho

# Transform tensorflow tensor to pytorch tensor
def tf2torch(t): # a batch of image tensors (N, H, W, 3)
    t = tf.cast(t, tf.float32).numpy()
    if t.shape[-1] in [1, 3]:
        t = torch.from_numpy(t.transpose(0, 3, 1, 2)) # torch.from_numpy(np_array.transpose(0, 3, 1, 2)) 
        return t
    return torch.from_numpy(t) # (N, 3, H, W)

# Transform tensorflow tensor to numpy array
def tf2np(t): # a batch of image tensors (N, H, W, 3)
    t = tf.cast(t, tf.float32).numpy()
    if t.shape[-1] in [1, 3]:
        t = t.transpose(0, 3, 1, 2) # torch.from_numpy(np_array.transpose(0, 3, 1, 2)) 
        return t
    return t # (N, 3, H, W)

# Image normalization
def img_normalize(imgs):
    imgs = imgs - imgs.min()
    imgs = imgs / imgs.max()
    return imgs

# Compute L-infinity norm
def linf_loss(x, y):
    return np.linalg.norm(x.flatten() - y.flatten(), ord=np.inf)

# Compute L-2 norm
def l2_loss(x, y):
    return np.linalg.norm(x.flatten() - y.flatten(), ord=2)

## Main

In [11]:
# Define paths
case = "L2PGD_0.5_3"
results_path_avg = '/content/drive/MyDrive/Research/Adversarial_Attacks_Model-VS-Human/results/' + case + '.csv'
results_path_all = '/content/drive/MyDrive/Research/Adversarial_Attacks_Model-VS-Human/results/' + model_name + '.csv'
images_path = '/content/drive/MyDrive/Research/Adversarial_Attacks_Model-VS-Human/images/'

images_path = os.path.join(images_path, model_name)
if not os.path.exists(images_path):
    os.makedirs(images_path)
    print(f"Created folder: {images_path}")
else:
    print(f"Folder already exists: {images_path}")

l2_list, linf_list = [], []
opt_epsilons = []
spearman_scores = []
total_cnt, init_correct, aa_correct = 0, 0, 0

# Pre-load 
for imgs, hmps, labels in data.get_dataset(2 , False): # image, heatmap, label
    imgs = tf2torch(imgs)
    hmps = tf2torch(hmps)

    # Apply image transformation to meet the input requirements
    nh, nw = timm_transforms.transforms[1].size
    _, _, h, w = imgs.shape
    if not (h, w) == (nh, nw):
        # print(h, w, nh, nw)
        transform = transforms.Resize((nh, nw), transforms.InterpolationMode.BICUBIC)
        imgs = transform(imgs)
        hmps = transform(hmps)

    imgs = img_normalize(imgs).to(device)
    hmps = img_normalize(hmps)      
    labels = tf2torch(labels).to(device)

    for img, hmp, label in zip(imgs, hmps, labels):
        # print(img.shape, hmp.shape, logit.shape)

        # Add a dimension (batch == 1)
        img = torch.unsqueeze(img, 0) # (1, 3, H, W)
        hmp = torch.unsqueeze(hmp, 0)
        label = torch.unsqueeze(label, 0)
        target = torch.argmax(label, axis=-1) # tensor([343], device='cuda:0')
        # print(img.shape, target.shape)

        # Forward pass the data through the model
        with torch.no_grad():
            output = model(img)
            init_pred = torch.argmax(output, axis=-1) # get the index of the max log-probability

        # If the initial prediction is wrong, just move on
        total_cnt += 1
        print("\rSearching optimal epsilon for image: %s | %s %s" % (
            str(total_cnt), 
            str(1000), model_name), end=" ")
        if init_pred.item() != target.item():
            continue
        init_correct += 1

        # Apply first attack
        initial_eps = 10
        perturbed_img, perturbed_pred = execute_attack(model=model, img=img, target=target, eps=initial_eps, alpha=0.5)

        if perturbed_pred.item() == target.item(): 
            # Assume 10 is the min eps that fools the model
            continue
        else:
            # key: eps; val: perturbed_img
            info = {} # Only store one key-val pair
            key = None

            initial_eps = 1
            perturbed_img, perturbed_pred = execute_attack(model=model, img=img, target=target, eps=initial_eps, alpha=0.5)

            key = initial_eps
            info[key] = (perturbed_img, perturbed_pred)

            if perturbed_pred.item() != target.item():
                l, r = 0.001, 1 # search eps between 0.001 and 1
                threshold = 0.01
            else:
                l, r = 1, 10 # search eps between 1 and 10
                threshold = 0.1 

            # Apply binary search
            while r - l >= threshold:
                eps = l + (r - l) / 2
                perturbed_img, perturbed_pred = execute_attack(model=model, img=img, target=target, eps=eps, alpha=0.5)
                if perturbed_pred.item() != target.item():
                    r = eps
                    if r != key:
                        del info[key]
                        key = r
                        info[key] = (perturbed_img, perturbed_pred)
                else:
                    l = eps

            optimal_eps = r
            if r in info:
                perturbed_img, perturbed_pred = info[r]
            else:
                perturbed_img, perturbed_pred = execute_attack(model=model, img=img, target=target, eps=optimal_eps, alpha=0.5)

            if not (perturbed_pred.item() != target.item()): 
                continue

        opt_epsilons.append(optimal_eps)

        # save images
        l, t = str(target.item()), str(perturbed_pred.item())
        img_path = os.path.join(images_path, model_name + '_' + l + '_' + t + '.png')
        save_img(perturbed_img, img_path)

        # Store l2, linf
        a, b = perturbed_img.cpu().numpy(), img.cpu().numpy()
        l2, linf = l2_loss(a, b), linf_loss(a, b)
        l2_list.append(l2)
        linf_list.append(linf)

        # Spearman correlation
        mask = np.abs(np.mean(a - b, axis=1, keepdims=True)) # (1, 1, 224, 224)
        spearman_score = spearman_correlation(mask, hmp.numpy())
        spearman_scores.append(spearman_score)

        # Save the data into 
        row_data = [
            model_name, str(total_cnt-1), str(target.item()), str(perturbed_pred.item()), 
            str(round(optimal_eps, 6)), str(l2), str(linf), str(spearman_score)
        ]

        # print(row_data)
        # write_csv_all(row_data, results_path_all)
        # clear_unused_memo()

print("")

Created folder: /content/drive/MyDrive/Research/Adversarial_Attacks_Model-VS-Human/images/vgg11
Searching optimal epsilon for image: 1000 | 1000 vgg11 


In [12]:
# Save data
row_data = [
    model_name, str(init_correct), 
    str(np.mean(opt_epsilons)), str(np.std(opt_epsilons)),
    str(np.mean(l2_list)), str(np.std(l2_list)),
    str(np.mean(linf_list)), str(np.std(linf_list)),
    str(np.mean(spearman_scores)), str(np.std(spearman_scores)),
]
print(row_data)
# write_csv_avg(row_data, results_path_avg)

['vgg11', '326', '0.011333984374999998', '0.028316600713116053', '1.3170422', '0.09202564', '0.049354456', '0.012586416', '0.1995636211963146', '0.13514956956593097']
