In [4]:
import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
from functools import lru_cache

In [5]:
use_cuda = True
device = torch.device("cuda" if use_cuda else "cpu")
batch_size = 64

print(device)
batch_size = 64
np.random.seed(42)
torch.manual_seed(42)

## Dataloaders
train_dataset = datasets.MNIST('mnist_data/', train=True, download=True, transform=transforms.Compose(
    [transforms.ToTensor()]
))
test_dataset = datasets.MNIST('mnist_data/', train=False, download=True, transform=transforms.Compose(
    [transforms.ToTensor()]
))

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


cuda


In [6]:
#Implement Neural Network
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(784, 50)
        self.layer2 = nn.Linear(50, 50)
        self.layer3 = nn.Linear(50, 10)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.view(-1, 784)
        x = self.relu(self.layer1(x))
        x = self.relu(self.layer2(x))
        x = self.relu(self.layer3(x))
        return x
standard_trained_model = NeuralNetwork().to(device)
standard_trained_model.train()
model = NeuralNetwork().to(device)
model.train()

NeuralNetwork(
  (layer1): Linear(in_features=784, out_features=50, bias=True)
  (layer2): Linear(in_features=50, out_features=50, bias=True)
  (layer3): Linear(in_features=50, out_features=10, bias=True)
  (relu): ReLU()
)

In [7]:
def train_model(model, num_epochs):
    # TODO: implement this function that trains a given model on the MNIST dataset.
    # this is a general-purpose function for both standard training and adversarial training.
    # (toggle enable_defense parameter to switch between training schemes)
    model.train()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    for i in range(num_epochs):
        for idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = nn.CrossEntropyLoss()(output, target)
            loss.backward()
            optimizer.step()
            #if idx % 10 == 0:
                #print(loss)

In [8]:
train_model(standard_trained_model, 3)

In [9]:
def standard_test(model, device, test_loader):
    model.eval()
    correct = 0
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True) 
        correct += pred.eq(target.view_as(pred)).sum().item()

    print('\n Accuracy: {}/{} ({:.0f}%)\n'.format(
        correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [10]:
standard_test(standard_trained_model, device, test_loader)


 Accuracy: 6836/10000 (68%)



In [11]:
def fgsm(model, x, eps, y):
    #TODO: implement this as an intermediate step of PGD
    # Notes: put the model in eval() mode for this function
    model.eval()
    x = x.clone().detach()
    x.requires_grad = True
    model_output = model(x)
    entropy_loss = nn.CrossEntropyLoss()
    loss = entropy_loss(model_output, y)
    model.zero_grad()
    loss.backward()
    loss_grad = x.grad.data
    sign_loss_grad = loss_grad.sign()
    eta = eps*loss_grad.sign()
    perturbed_output = x+eta
    perturbed_output = torch.clamp(perturbed_output, 0, 1)
    return perturbed_output

def pgd_untargeted(model, x, y, k, eps, eps_step):
    #TODO: implement this 
    # Notes: put the model in eval() mode for this function
    # x: input image
    # y: ground truth label for x
    # k: steps of FGSM
    # eps: projection region for PGD (note the need for normalization before projection, as eps values are for inputs in [0,1])
    # eps_step: step for one iteration of FGSM
    model.eval()
    adv=x
    for i in range(k):
        adv = fgsm(model, adv, eps_step, y)
        #clipping
        x_adv = torch.min(x + eps, torch.max(x-eps, adv))
    x_adv = torch.clamp(x_adv, 0 ,1)
    return adv
#return adverserial examples
@lru_cache(maxsize=60000) 
def pgd_untargeted_batch(model, inputs, targets, eps):
    k=3
    perturbed_inputs = []
    for i in range(len(inputs)):
        x = inputs[i]
        y = targets[i]
        perturbed_data = pgd_untargeted(model, x, torch.atleast_1d(y), k, eps, eps)
        perturbed_inputs.append(perturbed_data)
        
    return torch.cat(perturbed_inputs, dim=0)


In [12]:
def train_model_ibp(model, num_epochs, eps, kappa):
    model.train()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    for i in range(num_epochs):
        for idx, (data, target) in tqdm(enumerate(train_loader)):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
        
            output = model(data)
            standard_loss = nn.CrossEntropyLoss()(output, target)
            total_loss = standard_loss
            adv_input = pgd_untargeted_batch(standard_trained_model, data, target, eps)
            robust_output = model(adv_input)
            robustness_loss = nn.CrossEntropyLoss()(robust_output,target)
            total_loss = (1-kappa)*robustness_loss + kappa*standard_loss
            total_loss.backward()
            optimizer.step()
            if idx % 100 == 0:
                print(' [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    idx * len(adv_input), len(train_loader.dataset),
                    100. * idx / len(train_loader), total_loss.item()))

In [13]:
def find_robustness(model, attack='pgd', eps=0.1):
    # TODO: implement this function to test the robust accuracy of the given model
    # use pgd_untargeted() within this function
    model.eval()
    correct = 0
    total = 0
    for data, targets in test_loader:
        data,targets = data.to(device), targets.to(device)
        if attack == 'pgd':
            total +=len(data)
            ori_output = model(data)
            ori_prediction = ori_output.argmax(1, keepdim=True)
            correct+=ori_prediction.eq(targets.view_as(ori_prediction)).sum().item()
        total +=len(data)
        pgd_data = pgd_untargeted_batch(model, data, targets, eps)
        pgd_output = model(pgd_data)
        prediction_after_attack = pgd_output.argmax(dim=1, keepdim=True)
        correct+=prediction_after_attack.eq(targets.view_as(prediction_after_attack)).sum().item()
    print('\n Eps: {}, Robustness: {}/{} ({:.0f}%)\n'.format(
        eps, correct, total,
        100. * correct / total))

In [None]:
for k in (0.6, 0.5):
    for e in (0.01, 0.02, 0.03, 0.04, 0.05):
        model = NeuralNetwork().to(device)
        train_model_ibp(model, 1, e, k)
        print('kappa: ',k, ' eps: ', e)
        standard_test(model, device, test_loader)
        find_robustness(model, attack='pgd', eps=e)