# Install, Paths and Parameters

In [None]:
# Requirements. Need to restart after installation (it sucks, I am sorry haha)
!pip install torch==1.7.1
!pip install torchvision==0.8.2

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
# all imports
# standard libraries
from __future__ import print_function, division
import os
from os import listdir
import json
# import utils
import random
import colorsys
import requests
from io import BytesIO
import numpy as np
import pandas as pd
from typing import List, Callable
from tqdm import tqdm

# torch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms, utils
from torch.utils.tensorboard import SummaryWriter
# import vision_transformer as vits

# Image stuff
import skimage.io
from skimage import io, transform
from skimage.measure import find_contours
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon
import matplotlib.pyplot as plt
from PIL import Image

# seed
SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)
np.random.seed(SEED)

In [None]:
# general path
DRIVE_PATH = '/content/drive/MyDrive/'
DL_PROJECT_PATH = os.path.join(DRIVE_PATH, 'DeepLearningProject')
ADV_PATH = os.path.join(DL_PROJECT_PATH, 'AdversarialAttacks')

# filenames and label path
ADV_LABEL_PATH = os.path.join(ADV_PATH,'adversarial_data/DAmageNet/val_damagenet.txt')
ORG_LABEL_PATH = os.path.join(ADV_PATH,'original_data/correct_labels.txt')

# image paths
ORIGINAL_IMAGES_PATH = os.path.join(ADV_PATH,'original_data/images/')
ADVERSARIAL_IMAGES_PATH = os.path.join(ADV_PATH,'adversarial_data/DAmageNet/DAmageNet/')

# attention paths
ORIGINAL_ATTENTION_PATH = os.path.join(ADV_PATH,'org_attn/')
ADVERSARIAL_ATTENTION_PATH = os.path.join(ADV_PATH,'adv_attn/')

In [None]:
def get_random_classes(number_of_classes: int = 5, min_rand_class: int = 0, max_rand_class: int = 999):
  return np.random.randint(low=min_rand_class, high=max_rand_class, size=(number_of_classes,))

CLASS_SUBSET = get_random_classes()

BATCH_SIZE = 6 # You can play around with it

# Visualize attention
Taken from: https://github.com/facebookresearch/dino/blob/main/visualize_attention.py

In [None]:
import random
import colorsys
import requests
from io import BytesIO

import skimage.io
from skimage.measure import find_contours
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms as pth_transforms
import numpy as np
from PIL import Image

import utils
import vision_transformer as vits


def apply_mask(image, mask, color, alpha=0.5):
    for c in range(3):
        image[:, :, c] = image[:, :, c] * (1 - alpha * mask) + alpha * mask * color[c] * 255
    return image


def random_colors(N, bright=True):
    """
    Generate random colors.
    """
    brightness = 1.0 if bright else 0.7
    hsv = [(i / N, 1, brightness) for i in range(N)]
    colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv))
    random.shuffle(colors)
    return colors


def display_instances(image, mask, fname="test", figsize=(5, 5), blur=False, contour=True, alpha=0.5):
    fig = plt.figure(figsize=figsize, frameon=False)
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    ax = plt.gca()

    N = 1
    mask = mask[None, :, :]
    # Generate random colors
    colors = random_colors(N)

    # Show area outside image boundaries.
    height, width = image.shape[:2]
    margin = 0
    ax.set_ylim(height + margin, -margin)
    ax.set_xlim(-margin, width + margin)
    ax.axis('off')
    masked_image = image.astype(np.uint32).copy()
    for i in range(N):
        color = colors[i]
        _mask = mask[i]
        if blur:
            _mask = cv2.blur(_mask,(10,10))
        # Mask
        masked_image = apply_mask(masked_image, _mask, color, alpha)
        # Mask Polygon
        # Pad to ensure proper polygons for masks that touch image edges.
        if contour:
            padded_mask = np.zeros((_mask.shape[0] + 2, _mask.shape[1] + 2))
            padded_mask[1:-1, 1:-1] = _mask
            contours = find_contours(padded_mask, 0.5)
            for verts in contours:
                # Subtract the padding and flip (y, x) to (x, y)
                verts = np.fliplr(verts) - 1
                p = Polygon(verts, facecolor="none", edgecolor=color)
                ax.add_patch(p)
    ax.imshow(masked_image.astype(np.uint8), aspect='auto')
    fig.savefig(fname)
    print(f"{fname} saved.")
    return

PATCH_SIZE = 16
OUTPUT_DIR = "/content/drive/MyDrive/Deep Learning Project/Adversarial Attacks/original_adv/"
INPUT_IMAGE = "/content/drive/MyDrive/Deep Learning Project/Adversarial Attacks/original_data/images/ILSVRC2012_val_00000130.JPEG"
model = vits16

img = Image.open(INPUT_IMAGE)
img = img.convert('RGB')

transform = pth_transforms.Compose([
    pth_transforms.Resize(256),
    pth_transforms.ToTensor(),
    pth_transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

img = transform(img)

# make the image divisible by the patch size
w, h = img.shape[1] - img.shape[1] % PATCH_SIZE, img.shape[2] - img.shape[2] % PATCH_SIZE
img = img[:, :w, :h].unsqueeze(0)

w_featmap = img.shape[-2] // PATCH_SIZE
h_featmap = img.shape[-1] // PATCH_SIZE

print(img.shape)
attentions = model.get_last_selfattention(img.to(device))

nh = attentions.shape[1] # number of heads

# we keep only the output patch attention
attentions = attentions[0, :, 0, 1:].reshape(nh, -1)

attentions = attentions.reshape(nh, w_featmap, h_featmap)
attentions = nn.functional.interpolate(attentions.unsqueeze(0), scale_factor=PATCH_SIZE, mode="nearest")[0].detach().cpu().numpy()

# save attentions heatmaps
os.makedirs(OUTPUT_DIR, exist_ok=True)
torchvision.utils.save_image(torchvision.utils.make_grid(img, normalize=True, scale_each=True), os.path.join(OUTPUT_DIR, "img.png"))
for j in range(nh):
    fname = os.path.join(OUTPUT_DIR, "attn-head" + str(j) + ".png")
    plt.imsave(fname=fname, arr=attentions[j], format='png')
    print(f"{fname} saved.")

ModuleNotFoundError: ignored

# Adversarial Attack Classifier

### Custom Dataset Class
Sample contains attention and label specifying if the attention comes from an adverserial attack or not

In [None]:
class AdverserialAttentionDataset(Dataset):
    """Adverserial Attention dataset."""

    def __init__(self, adv_attn_dir, org_attn_dir, transform=None):
        """
        Args:
            adv_attn_dir (string): Directory with all the .pt files that contain
                attention of the adv. img.
            org_attn_dir (string): Directory with all the .pt files that contain
                attention of the org. img.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.adv_attn_dir = adv_attn_dir
        self.org_attn_dir = org_attn_dir
        names_org = [f[:-3]+"_org" for f in listdir(org_attn_dir)]
        names_adv = [f[:-3]+"_adv" for f in listdir(adv_attn_dir)]

        self.n = len(names_org)
        self.file_names = names_org + names_adv
        random.shuffle(self.file_names)
        self.transform = transform

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        attn_file = self.file_names[idx]
        if "adv" in attn_file:
          label = float(1)
          attn_file = attn_file[:-4]
          attn_path = os.path.join(self.adv_attn_dir, attn_file+".pt")
        else:
          label = float(0)
          attn_file = attn_file[:-4]
          attn_path = os.path.join(self.org_attn_dir, attn_file+".pt")

        attention = torch.load(attn_path)
      
        sample = {'attention': attention.squeeze(), 'label': label}

        if self.transform:
            sample = self.transform(sample)

        return sample

### Sample visualization

In [None]:
BATCH_SIZE = 4

test = torch.load(ORIGINAL_ATTENTION_PATH+f"{0}.pt")
print(test.shape)

PATCH_SIZE = 8
w_featmap = 28
h_featmap = 28
attentions = test
nh = attentions.shape[1] # number of heads
# we keep only the output patch attention
plt.ion() # interactive mode

fig = plt.figure(figsize=(16, 24), dpi=80)

nr = 12
for j in range(nr):
  attention = attentions[0, :, j, 1:].reshape(nh, -1)
  # print(attentions)
  attention = attention.reshape(nh, w_featmap, h_featmap)
  attention = nn.functional.interpolate(attention.unsqueeze(0), scale_factor=PATCH_SIZE, mode="nearest")[0].detach().cpu().numpy()
  for i in range(6):
    ind = (i+1)+j*6
    ax = plt.subplot(nr, 6, ind)
    plt.tight_layout()
    ax.set_title('Test Layer:{} Head:{}'.format(j,i))
    ax.axis('off')
    plt.imshow(attention[i,:,:])

plt.show()

FileNotFoundError: ignored

In [None]:
test.mean(axis=2).shape

## Classifiers
- Classifier 1: 2 CNN, 2 pool, and 3 FC Layers
- Classifier 2: etc.

In [None]:
BATCH_SIZE = 6

attn_dataset = AdverserialAttentionDataset(adv_attn_dir=ADVERSARIAL_ATTENTION_PATH, org_attn_dir=ORIGINAL_ATTENTION_PATH)

In [None]:
# Training set and training loader
dataset_size = len(attn_dataset)
train_size = int(0.8 * dataset_size)
test_size = dataset_size - train_size

train_dataset, test_dataset = torch.utils.data.random_split(attn_dataset, 
                                                            [train_size, 
                                                             test_size],
                                                            generator=torch.Generator().manual_seed(42))

# Train and test loader
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE,
                                          shuffle=True, num_workers=0)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=len(test_dataset),
                                         shuffle=True, num_workers=0)

classes = ('non-adversarial', 'adversarial') # Binary classifier

In [None]:
attention = torch.load(ADVERSARIAL_ATTENTION_PATH + "1.pt")

In [None]:
attention.shape

In [None]:
# Classifier 1 Network
class ClassifierOne(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(6, 12, 3, stride=1)
        self.conv2 = nn.Conv2d(12, 16, 5, stride=1)
        self.conv3 = nn.Conv2d(16, 24, 5, stride=1)

        self.fc1 = nn.Linear(7*7*24, 120)
        self.fc2 = nn.Linear(120, 10)
        self.fc3 = nn.Linear(10, 1)

        # Utils
        self.pool = nn.MaxPool2d(2, 2)

    def initialize(self):
      nn.init.normal_(self.fc1.weight, mean=0, std=1.0)
      nn.init.normal_(self.fc1.bias, mean=0, std=1.0)
      nn.init.normal_(self.fc2.weight, mean=0, std=1.0)
      nn.init.normal_(self.fc2.bias, mean=0, std=1.0)
      nn.init.normal_(self.fc3.weight, mean=0, std=1.0)
      nn.init.normal_(self.fc3.bias, mean=0, std=1.0)
      nn.init.normal_(self.conv1.weight, mean=0, std=1.0)
      nn.init.normal_(self.conv1.bias, mean=0, std=1.0)
      nn.init.normal_(self.conv2.weight, mean=0, std=1.0)
      nn.init.normal_(self.conv2.bias, mean=0, std=1.0)
      nn.init.normal_(self.conv3.weight, mean=0, std=1.0)
      nn.init.normal_(self.conv3.bias, mean=0, std=1.0)
      # nn.init.xavier_uniform(self.linear.weight.data)
      # self.linear.bias.data.zero_()

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(torch.relu(self.conv2(x)))
        x = torch.relu(self.conv3(x))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
# Simple Binary Classifier Network
class SimpleBC(nn.Module):
  def __init__(self,input_shape):
    super(SimpleBC,self).__init__()
    self.fc1 = nn.Linear(input_shape,32)
    self.fc2 = nn.Linear(32,64)
    self.fc3 = nn.Linear(64,1)
    self.initialize()

  def initialize(self):
    nn.init.normal_(self.fc1.weight, mean=0, std=1.0)
    nn.init.normal_(self.fc1.bias, mean=0, std=1.0)
    nn.init.normal_(self.fc2.weight, mean=0, std=1.0)
    nn.init.normal_(self.fc2.bias, mean=0, std=1.0)
    nn.init.normal_(self.fc3.weight, mean=0, std=1.0)
    nn.init.normal_(self.fc3.bias, mean=0, std=1.0)
    # nn.init.xavier_uniform(self.linear.weight.data)
    # self.linear.bias.data.zero_()

  def forward(self,x):
    x = torch.relu(self.fc1(x))
    x = torch.relu(self.fc2(x))
    x = torch.sigmoid(self.fc3(x))
    return x

### Training

In [None]:
# Load TensorBoard notebook extension
%load_ext tensorboard

# Import SummaryWriter
from torch.utils.tensorboard import SummaryWriter

# Create a SummaryWriter instance
# SummaryWriter writes event files to log_dir
log_dir = "./logs"
writer = SummaryWriter(log_dir)
np.set_printoptions(precision=4)

In [None]:
# Hyperparameters
EPOCHS = 100

# Initialise network
net = ClassifierOne()
# net = SimpleBC(6*28*28)

# Select device
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
net.to(device)

# Set model to train
net.train()

# define loss, optimizer, and scheduler
criterion = nn.BCELoss()
# optimizer = optim.Adam(net.parameters(), lr=0.001)
optimizer = optim.Adagrad(net.parameters(), lr=0.01, lr_decay=1e-08, weight_decay=0)
# scheduler = MultiStepLR(optimizer, milestones=[30,80], gamma=0.1)

losses = []
accur = []

# Train network
pbar = tqdm(range(EPOCHS))
for epoch in pbar:  # loop over the dataset multiple times

    # Metrics
    train_running_loss = 0.0
    train_running_loss_mean = 0.0
    train_acc = 0.0
    train_acc_mean = 0.0
    test_running_loss = 0.0
    test_acc = 0.0

    for i, data in enumerate(train_loader, start=0):
      try:
        # get the inputs; data is a list of [inputs, labels] and write to device
        inputs, labels = data['attention'], data['label']
        inputs = inputs[:, :, 0, 1:].reshape(labels.shape[0], 6, 28, 28)
        # for simpleBC START
        # inputs = inputs.flatten(start_dim=1)
        # END
        inputs = inputs.to(device)
        labels = labels.to(device).float()
        
        # Forward Pass
        outputs = net(inputs).float()
        outputs = outputs.reshape(-1)
        
        # Backpropagation
        optimizer.zero_grad() # Reset the gradient
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # loss train
        train_running_loss += loss.item()
        train_running_loss_mean = train_running_loss / (i+1)

        # accuracy train
        predicted = net(inputs).reshape(-1).detach().cpu().numpy().round()
        acc_labels = labels
        acc_labels = acc_labels.detach().cpu().numpy()
        inter = np.equal(predicted, acc_labels)
        train_acc += inter.mean()
        train_acc_mean = train_acc / (i+1)

      except Exception as e:
        print("Error: {}".format(e))
        pass
    
    with torch.set_grad_enabled(False):
      for data in test_loader:
        try:
          # get the inputs; data is a list of [inputs, labels] and write to device
          inputs, labels = data['attention'], data['label']
          inputs = inputs[:, :, 0, 1:].reshape(labels.shape[0], 6, 28, 28)
          # for simpleBC START
          # inputs = inputs.flatten(start_dim=1)
          # END
          inputs = inputs.to(device)
          labels = labels.to(device).float()
          
          # Forward Pass
          outputs = net(inputs).float()
          outputs = outputs.reshape(-1)
          
          # loss test
          loss = criterion(outputs, labels)
          test_running_loss += loss.item()

          # accuracy test
          outputs = outputs.detach().cpu().numpy().round()
          comparison = np.equal(labels.detach().cpu().numpy(), outputs)
          test_acc = comparison.mean()

        except Exception as e:
          print("Error: {}".format(e))
          pass

    writer.add_scalar("Loss/train", train_running_loss_mean, epoch)
    writer.add_scalar('Loss/test', test_running_loss, epoch) 
    writer.add_scalar('Accuracy/train', train_acc_mean, epoch)
    writer.add_scalar('Accuracy/test', test_acc, epoch)

    losses.append(train_running_loss_mean)
    accur.append(train_acc_mean)
    pbar.set_description("Ep: {}\t Tr. Loss: {:.4f}\t Tr. Acc: {:.4f}\t T. Loss: {:.4f}\t T. Acc: {:.4f}".format(epoch, 
                                                                            train_running_loss_mean, 
                                                                            train_acc_mean, 
                                                                            test_running_loss, 
                                                                            test_acc))

print('Finished Training')
writer.flush()

In [None]:
writer.close()

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
%tensorboard --logdir ./logs

In [None]:
from tensorboard import notebook
notebook.list() # View open TensorBoard instances

Known TensorBoard instances:
  - port 6006: logdir logs (started 0:01:33 ago; pid 1563)


In [None]:
PATH = './simple_net.pth'
torch.save(net.state_dict(), PATH)


### Testing

In [None]:
model = SimpleBC(4704)
model.load_state_dict(torch.load(PATH))
model.eval()

# TODO

# Test stored files

In [None]:
for f in os.listdir(ORIGINAL_ATTENTION_PATH):
  try:
    torch.load(ADVERSARIAL_ATTENTION_PATH+f)
  except:
    print(f)

56.pt
30.pt
