In [None]:
import matplotlib.pyplot as plt
from tqdm import tqdm, tqdm_notebook

import numpy as np

from sklearn.decomposition import PCA

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets 
from torchvision import transforms

# General settings and normalization tools

In [None]:
batch_size        = 128
device            = torch.device('cuda')
means             = torch.tensor((0.4914, 0.4822, 0.4465)).to(device)
stds              = torch.tensor((0.2023, 0.1994, 0.2010)).to(device)
normalize         = lambda x: (x - means) / stds
normalize_red     = lambda x: (x - means[0]) / stds[0]
normalize_green   = lambda x: (x - means[1]) / stds[1]
normalize_blue    = lambda x: (x - means[2]) / stds[2]
reverse_normalize = lambda x: (x * stds) + means

In [None]:
def display_img(img):
    img_data = lambda img: reverse_normalize(img.squeeze().permute(1,2,0)).data.cpu().numpy().clip(0, 1)
    plt.imshow(img_data(img))    

# Data loading

In [None]:
cifar_cats = [
    'airplane',
    'automobile',
    'bird',
    'cat',
    'deer',
    'dog',
    'frog',
    'horse',
    'ship',
    'truck'
]

In [None]:
train_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10(
        '../data',
        train = True,
        download = True,
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize(means, stds)
            ]
        )
    ),
    shuffle = True,
    batch_size = batch_size
)

In [None]:
test_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10(
        '../data',
        train = False,
        download = True,
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize(means, stds)
            ]
        )
    ),
    shuffle = False,
    batch_size = 2 * batch_size
)

# Training and evaluation tools

In [None]:
def train(model, device, train_loader, optimizer, criterion, n_epochs, log_interval):
    model.train()
    for epoch in tqdm_notebook(range(n_epochs), desc = 'Epochs'):
        for batch_idx, (X, y) in tqdm_notebook(enumerate(train_loader), total = len(train_loader), desc = 'Batches', leave = False):
            X, y       = X.to(device), y.to(device)
            optimizer.zero_grad()
            *_, y_pred = model(X)
            loss       = criterion(y_pred, y)
            loss.backward()
            optimizer.step()
            if batch_idx % log_interval == 0:
                print(f'Train epoch {epoch}: [{batch_idx * len(X):5d}/{len(train_loader.dataset):5d}] Loss: {loss.item():7.4f}')

In [None]:
def evaluate(model, device, test_loader, criterion):
    model.eval()
    test_set_size = len(test_loader.dataset)
    correct_answers = 0
    sum_loss = 0
    with torch.no_grad():
        for X, y in tqdm_notebook(test_loader):
            X, y             = X.to(device), y.to(device)
            *_, y_pred       = model(X)
            class_pred       = y_pred.argmax(dim = 1)
            correct_answers += (y == class_pred).sum().item()
            sum_loss        += criterion(y_pred, y).item()
    accuracy = correct_answers / test_set_size
    average_loss = sum_loss / len(test_loader)
    
    return accuracy, average_loss

# Model

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(3 , 32 , 3, padding = 1)
        self.conv2 = nn.Conv2d(32, 32 , 3, padding = 1)
        self.conv3 = nn.Conv2d(32, 64,  3, padding = 1)
        self.conv4 = nn.Conv2d(64, 64,  3, padding = 1)
        self.fc1   = nn.Linear(64 * 8 * 8, 128)
        self.fc2   = nn.Linear(128      , 10)
        
    def forward(self, x):
        out_conv1 = F.dropout(F.relu(self.conv1(x)), 0.2)
        out_conv2 = F.dropout(F.relu(self.conv2(out_conv1)), 0.2)
        out_pool1 = F.max_pool2d(out_conv2, kernel_size = (2, 2))
        out_conv3 = F.dropout(F.relu(self.conv3(out_pool1)), 0.2)
        out_conv4 = F.dropout(F.relu(self.conv4(out_conv3)), 0.2)
        out_pool2 = F.max_pool2d(out_conv4, kernel_size = (2, 2))
        out_view  = out_pool2.view(-1, 64 * 8 * 8)
        out_fc    = F.dropout(F.relu(self.fc1(out_view)), 0.2)
        out       = self.fc2(out_fc)
        
        return out_conv1, out_conv2, out_conv3, out_conv4, out

# Training

In [None]:
cnn       = CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(cnn.parameters())

In [None]:
train(cnn, device, train_loader, optimizer, criterion, 10, len(train_loader) // 5)

In [None]:
accuracy, avg_loss = evaluate(cnn, device, train_loader, criterion)
print(f'[Train] Accuracy: {100 * accuracy:5.2f}%, loss: {avg_loss:7.4f}')
accuracy, avg_loss = evaluate(cnn, device, test_loader, criterion)
print(f'[Test] Accuracy: {100 * accuracy:5.2f}%, loss: {avg_loss:7.4f}')

In [None]:
X, y  = next(iter(test_loader))
X     = X.to(device)
ex_id = 9
img   = X[ex_id].permute(1,2,0)
img   = reverse_normalize(img)
plt.imshow(img.cpu())
print(f'Real label: {y[ex_id].item()}')
print(f'Predicted label: {cnn(X.to(device))[-1][ex_id].argmax().item()}')

# Adversarial examples

In [None]:
def generate_adv_example(model, epsilon, criterion, X):
    X.requires_grad = True
    *_, y_pred      = model(X)
    loss            = criterion(y_pred, y_pred.argmax(dim = 1))
    loss.backward()
    adv_example     = X + epsilon * X.grad.sign()
    
    # Clamping the result to [0, 1] in each channel.
    # I don't think that there exist a multidimensional clamp function.
    for channel, norm_func in enumerate([normalize_red, normalize_green, normalize_blue]):
        adv_example[:, channel, ...].clamp_(norm_func(0), norm_func(1))
    
    return adv_example

In [None]:
def adv_sample_process(model, epsilon, ex_idx, X, y):
    source       = X[ex_idx, ...].unsqueeze(0).to(device)
    *_, y_pred   = model(source)
    adv_example  = generate_adv_example(model, epsilon, criterion, source)
    *_, adv_pred = cnn(adv_example)
    for i in range(10):
        print(f'Class {i}: {y_pred[0, i]:6.4f} \t->\t {adv_pred[0, i]:^6.4f}')
    plt.figure(figsize = (15, 10))
    plt.subplot(1, 2, 1)
    display_img(source)
    plt.subplot(1, 2, 2)
    display_img(adv_example)
    print(f'Real label: {y[ex_idx].item()}')
    print(f'Predicted label: {adv_pred[0].argmax().item()}')

In [None]:
adv_sample_process(cnn, 0.15, 5, X, y)

# KNN experimentations

In [None]:
conv_features = [[] for _ in range(4)]
targets       = []
predictions   = []
# c = 0
for X, y in tqdm_notebook(train_loader):
    X                  = X.to(device)
    *out_convs, y_pred = cnn(X)
    for i, out_conv in enumerate(out_convs):
        conv_feat = out_conv.view(out_conv.size(0), -1).cpu().detach().numpy()
        conv_features[i].append(conv_feat)
    targets.append(y.numpy())
    predictions.append(y_pred.cpu().detach().numpy())
#     if c == 5:
#         break
#     c += 1

In [None]:
conv_features = [np.concatenate(out_convs) for out_convs in conv_features]
targets       = np.concatenate(targets)
predictions   = np.concatenate(predictions, axis = 0)
print([conv_feat.shape for conv_feat in conv_features])
print(targets.shape)
print(predictions.shape)

# Plots

In [None]:
X = conv_features[-1]

In [None]:
pca = PCA(n_components = 2)
X_trans = pca.fit_transform(X)

In [None]:
plt.figure(figsize = (10, 10))
for i, category in enumerate(cifar_cats):
    category_inputs = X_trans[targets == i]
    plt.scatter(category_inputs[:, 0], category_inputs[:, 1], label = category, alpha = 0.3)
plt.legend()

In [None]:
plt.figure(figsize = (10, 10))
for i, category in enumerate(cifar_cats):
    category_inputs = X_trans[np.argmax(predictions, axis = 1) == i]
    plt.scatter(category_inputs[:, 0], category_inputs[:, 1], label = category, alpha = 0.3)
plt.legend()