# Robust Neaural Network Training

## Imports and setups

In [None]:
from __future__ import print_function
from __future__ import division
from builtins import range
from builtins import int
from builtins import dict


import argparse

import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.utils.data import sampler
import torch.backends.cudnn as cudnn

import torch.nn.functional as F

import torchvision.datasets as dset
import torchvision.transforms as T

import os


os.environ["CUDA_VISIBLE_DEVICES"] = "0"

## Loader class

In [None]:
class ChunkSampler(sampler.Sampler):
    def __init__(self, num_samples, start=0):
        self.num_samples = num_samples
        self.start = start

    def __iter__(self):
        return iter(range(self.start, self.start + self.num_samples))

    def __len__(self):
        return self.num_samples

def loadData():

    transform = T.Compose([T.ToTensor()])

    MNIST_train = dset.MNIST('./Third Party/Robust-NN-Training/dataset', train=True, transform=T.ToTensor(), download=True)

    MNIST_test = dset.MNIST('./Third Party/Robust-NN-Training/dataset', train=False, transform=T.ToTensor(), download=True)


    loader_train = DataLoader(MNIST_train, batch_size=64)

    loader_test = DataLoader(MNIST_test, batch_size=64)

    return loader_train, loader_test

## Model

In [None]:
class ConvNet(nn.Module):

    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, kernel_size=5)
        self.conv2 = nn.Conv2d(20, 50, kernel_size=5)
        self.fc1 = nn.Linear(800, 500)
        self.fc2 = nn.Linear(500, 10)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(2)
        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, (nn.Conv2d)):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 1 / m.bias.numel())
            if isinstance(m, (nn.Linear)):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 1 / m.bias.numel())

    def forward(self, x):
        x = self.maxpool(self.relu(self.conv1(x)))
        x = self.maxpool(self.relu(self.conv2(x)))
        x = x.view(-1, 800)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)

        return x

## Loss Function

In [None]:
def loss_function(model, X, y, dtype):

    N = X.shape[0]
    X = X.repeat(1, 10, 1, 1).reshape(N * 10, 1, 28, 28)
    X_copy = X.clone()
    X.requires_grad = True


    eps = 0.4

    y = y.view(-1, 1).repeat(1, 10).view(-1, 1).long().cuda()

    index = torch.tensor([jj for jj in range(10)] * N).view(-1, 1).cuda().long()

    MaxIter_max = 11
    step_size_max = 0.1

    for i in range(MaxIter_max):
        output = model(X)

        maxLoss = (output.gather(1, index) - output.gather(1, y)).mean()
        X_grad = torch.autograd.grad(maxLoss, X, retain_graph=True)[0]
        X = X + X_grad.sign() * step_size_max

        X.data = X_copy.data + (X.data - X_copy.data).clamp(-eps, eps)
        X.data = X.data.clamp(0, 1)

    preds = model(X)

    loss = (-F.log_softmax(preds)).gather(1, y.view(-1, 1)).view(-1, 10).max(dim=1)[0].mean()

    return loss

## Training Process

In [None]:
def train(loader_train, loader_test, dtype):

    model = ConvNet()
    model = model.type(dtype)
    model.train()

    SCHEDULE_EPOCHS = [10, 10]
    learning_rate = 5e-4

    for num_epochs in SCHEDULE_EPOCHS:
	
        print('\nTraining %d epochs with learning rate %.7f' % (num_epochs, learning_rate))

        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
	
            print('\nTraining epoch %d / %d ...\n' % (epoch + 1, num_epochs))

            for i, (X_, y_) in enumerate(loader_train):

                X = Variable(X_.type(dtype), requires_grad=False)
                y = Variable(y_.type(dtype), requires_grad=False)

                loss = loss_function(model, X, y, dtype)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                if (i + 1) % 200 == 0:
                    print('Batch %d done, loss = %.7f' % (i + 1, loss.item()))

                    test(model, loader_test, dtype)

            print('Batch %d done, loss = %.7f' % (i + 1, loss.item()))



        learning_rate *= 0.1

    return model

## Training quality test

In [None]:
def test(model, loader_test, dtype):
    num_correct = 0
    num_samples = 0
    model.eval()
    for X_, y_ in loader_test:

        X = Variable(X_.type(dtype), requires_grad=False)
        y = Variable(y_.type(dtype), requires_grad=False).long()

        logits = model(X)
        _, preds = logits.max(1)

        num_correct += (preds == y).sum()
        num_samples += preds.size(0)

    accuracy = float(num_correct) / num_samples * 100
    print('\nAccuracy = %.2f%%' % accuracy)
    model.train()

## Adversarial Attacks

### Projected Gradient Descent Attack

In [None]:
def pgdAttackTest(model, loader_test, dtype):

    model.eval()
    epss = [0.0, 0.1, 0.2, 0.3, 0.4]
    MaxIter = 40
    step_size = 1e-2

    for eps in epss:

        num_correct = 0
        num_samples = 0


        for X_, y_ in loader_test:

            X = Variable(X_.type(dtype), requires_grad=True)
            X_original = Variable(X_.type(dtype), requires_grad=False)
            y = Variable(y_.type(dtype), requires_grad=False).long()

            for i in range(MaxIter):
                logits = model(X)
                loss = F.cross_entropy(logits, y)
                loss.backward()

                with torch.no_grad():
                    X.data = X.data + step_size * X.grad.sign()
                    X.data = X_original + (X.data - X_original).clamp(min=-eps, max=eps)
                    X.data = X.data.clamp(min=0, max=1)
                    X.grad.zero_()

            X.requires_grad = False
            X = (X * 255).long().float() / 255

            logits = model(X)
            _, preds = logits.max(1)

            num_correct += (preds == y).sum()
            num_samples += preds.size(0)

        accuracy = float(num_correct) / num_samples * 100
        print('\nAttack using PGD with eps = %.3f, accuracy = %.2f%%' % (eps, accuracy))

###  Fast Gradient Sign Method Attack

In [None]:
def fgsmAttackTest(model, loader_test, dtype):

    model.eval()
    epss = [0.0, 0.1, 0.2, 0.3, 0.4]

    for eps in epss:

        num_correct = 0
        num_samples = 0


        for X_, y_ in loader_test:

            X = Variable(X_.type(dtype), requires_grad=True)
            y = Variable(y_.type(dtype), requires_grad=False).long()

            logits = model(X)
            loss = F.cross_entropy(logits, y)
            loss.backward()

            with torch.no_grad():
                X += X.grad.sign() * eps
                X.grad.zero_()

            X.requires_grad = False
            X = (X * 255).long().float() / 255

            logits = model(X)
            _, preds = logits.max(1)

            num_correct += (preds == y).sum()
            num_samples += preds.size(0)

        accuracy = float(num_correct) / num_samples * 100
        print('\nAttack using FGSM with eps = %.3f, accuracy = %.2f%%' % (eps, accuracy))

## Main Function

In [None]:
loader_train, loader_test = loadData()
dtype = torch.cuda.FloatTensor

model = train(loader_train, loader_test, dtype)
fname = "./Third Party/Robust-NN-Training/model/test_model.pth"
torch.save(model, fname)
print("Training done, model save to %s" % fname)

# fname = "./Code/Third Party/Robust-NN-Training/model/test_model.pth"
# model = torch.load(fname)

pgdAttackTest(model, loader_test, dtype)
fgsmAttackTest(model, loader_test, dtype)