###  Imports

In [18]:
from __future__ import print_function
import numpy as np
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms, utils
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import time
import h5py
from pathlib import Path
import os

from art.attacks import CarliniL2Method, CarliniLInfMethod
from art.classifiers import PyTorchClassifier
from art.utils import load_mnist
from art.attacks.evasion.projected_gradient_descent import ProjectedGradientDescent

%matplotlib inline
%config InlineBackend.figure_format='retina'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### Dataset

In [19]:
(x_train, y_train), (x_test, y_test), min_, max_ = load_mnist()

x_train = np.swapaxes(x_train, 1, 3).astype(np.float32)
x_test = np.swapaxes(x_test, 1, 3).astype(np.float32)

train_dataset = TensorDataset(torch.Tensor(x_train), torch.Tensor(y_train))
train_dataloader = DataLoader(train_dataset, batch_size=128)

test_dataset = TensorDataset(torch.Tensor(x_test), torch.Tensor(y_test))
test_dataloader = DataLoader(test_dataset, batch_size=1000)
test_dataloader_single =  DataLoader(test_dataset, batch_size=1)

### Classifier

In [20]:
path = Path("C:/Users/Matach/OneDrive - Imperial College London/ICL/FYP/Experiments/saved_models/MNIST/Carlini/Adversarial PGD/regularisation")
file = "L2_1e-03.pth"
pretrained_model = os.path.join(path, file)
use_cuda = True

In [21]:
class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 512)
        self.fc2 = nn.Linear(512, 200)
        self.fc3 = nn.Linear(200,10)
        

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc2(x))
        x = F.dropout(x, training=self.training)
        x = self.fc3(x)
        return x
    

def train_classifier(cl, opt, x, y):
    x.to(device)
    y.to(device)
    opt.zero_grad()
    pred = cl(x)
    pred.to(device)
    err = F.nll_loss(F.log_softmax(pred, dim=0), y)
    err.backward()
    opt.step()
    return err, pred

def test_model(cl,test_loader): 
    correct = 0
    with torch.no_grad():
        cl.eval()
        for data, target in test_loader:
            output = cl(data.to(device))
            pred = output.data.max(1, keepdim=True)[1].to("cpu")
            target = np.argmax(target, axis=1)
            correct += pred.eq(target.data.view_as(pred)).sum()
        acc_test = float(correct.numpy() / len(test_loader.dataset))
        
    return acc_test

In [22]:
# Define what device we are using
print("CUDA Available: ",torch.cuda.is_available())
device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")
    
# Initialize the model.
model = Classifier().to(device)

# Load pre-trained model
model.load_state_dict(torch.load(pretrained_model, map_location='cpu'))

# Set the model in evaluation mode. In this case this is for the Dropout layers
print(model.eval())

# Load loss and optimiser
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Make a classifier wrapper!
classifier = PyTorchClassifier(
    model=model,
    clip_values=(min_, max_),
    loss=criterion,
    optimizer=optimizer,
    input_shape=(1, 28, 28),
    nb_classes=10,
)

# Test model
predictions = classifier.predict(x_test)
accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
print("Accuracy on benign test examples: {} %".format(accuracy * 100))

CUDA Available:  True
Classifier(
  (conv1): Conv2d(1, 10, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(10, 20, kernel_size=(5, 5), stride=(1, 1))
  (conv2_drop): Dropout2d(p=0.5, inplace=False)
  (fc1): Linear(in_features=320, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=200, bias=True)
  (fc3): Linear(in_features=200, out_features=10, bias=True)
)
Accuracy on benign test examples: 94.6 %


### PGD attack

In [23]:
# Create ART classifier 
def make_classifier(model):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
    # Make a classifier wrapper!
    classifier = PyTorchClassifier(
        model=model,
        clip_values=(min_, max_),
        loss=criterion,
        optimizer=optimizer,
        input_shape=(1, 28, 28),
        nb_classes=10,
    )
    return classifier


def adversarial_training_PGD(model, device, data, epsilon):
    dim = data.size()
    batch = torch.Tensor()
    fgsm = np.random.choice([0, 1], size=dim[0], p=[0, 1])
    for image, is_fgsm in zip(data, fgsm):
        # VIP Reshape image for model compatibility
        image = image.view(1, dim[1], dim[2], dim[3])
        image, batch = image.to(device), batch.to(device)
        if is_fgsm:
            classifier = make_classifier(model)
            adv_crafter = ProjectedGradientDescent(classifier, norm=np.inf, eps=epsilon, eps_step=0.01, max_iter=40)
            perturbed_image = adv_crafter.generate(x=image.cpu().detach().numpy())
            batch = torch.cat((batch, torch.Tensor(perturbed_image).to(device)), dim=0)
        else:
            batch = torch.cat((batch, image), dim=0)
    return batch

### Training

In [24]:
path = Path("C:/Users/Matach/OneDrive - Imperial College London/ICL/FYP/Experiments/saved_models/MNIST/Carlini/Adversarial PGD/regularisation")
file = "L2_1e-03.pth"

In [25]:
num_epochs = 4
e_losses = [] 

# Load pre-trained model
# cl = Classifier().to(device)
cl = model

# optimizer
cl_opt = optim.Adam(cl.parameters(), lr=0.001, weight_decay=1e-03)
criterion = nn.CrossEntropyLoss()

for e in range(num_epochs):
    cl.train()
    for batch_idx, (data, target) in enumerate(train_dataloader):
        target = np.argmax(target, axis=1)  # transform from one-hot to int
        new_batch = adversarial_training_PGD(cl, device, data, epsilon=0.3)
        c_error,c_pred = train_classifier(cl,cl_opt, new_batch.to(device), target.to(device))
        e_losses.append(c_error.cpu().data.numpy())
    acc = test_model(cl,test_dataloader)
    torch.save(cl.state_dict(), '{name}_{epoch}'.format(name=os.path.join(path, file), epoch=e)) # save file
    print("Epoch ", e, ": Test accuracy: ", 100*acc, "%")

print("Performance in the trained model: ")
acc_test = test_model(cl,test_dataloader)
print("Test accuracy: ", 100*acc_test, "%")
print("FINISHED!!!")

Epoch  0 : Test accuracy:  95.91 %
Epoch  1 : Test accuracy:  96.58 %
Epoch  2 : Test accuracy:  96.66 %
Epoch  3 : Test accuracy:  96.92 %
Performance in the trained model: 
Test accuracy:  96.92 %
FINISHED!!!
