In [1]:
!git clone https://github.com/knamdar/data.git


Cloning into 'data'...
remote: Enumerating objects: 16, done.[K
remote: Counting objects: 100% (16/16), done.[K
remote: Compressing objects: 100% (14/14), done.[K
remote: Total 16 (delta 2), reused 16 (delta 2), pack-reused 0[K
Unpacking objects: 100% (16/16), done.


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
import numpy as np
from torch.distributions import Categorical


mnist_train = datasets.MNIST("data", train=True, download=False, transform=transforms.ToTensor())
mnist_test = datasets.MNIST("data", train=False, download=False, transform=transforms.ToTensor())
train_loader = DataLoader(mnist_train, batch_size = 64, shuffle=True)
test_loader = DataLoader(mnist_test, batch_size = 64, shuffle=False)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [3]:
torch.manual_seed(2)

class Flatten(nn.Module):
    def forward(self, x):
        return x.view(x.shape[0], -1)

In [4]:
class Model_Drop(nn.Module):
    def __init__(self):

        super(Model_Drop, self).__init__()

        self.conv1 = nn.Conv2d(1, 10, kernel_size=3,padding=1)
        self.conv2 = nn.Conv2d(10, 10, kernel_size=3,padding=1)
        self.conv3 = nn.Conv2d(10, 20, kernel_size=3,padding=1)
        self.conv4 = nn.Conv2d(20, 20, kernel_size=3,padding=1)
        self.fc1 = nn.Linear(7*7*20, 100)
        self.fc2 = nn.Linear(100, 10)
        self.drop_layer = nn.Dropout(p=0.25)

    def last_hidden_layer_output(self, x):
        x = self.drop_layer(F.relu(self.conv1(x)))
        x = self.drop_layer(F.relu(self.conv2(x)))
        x = F.max_pool2d(F.relu(self.conv3(x)), 2)
        x = F.max_pool2d(F.relu(self.conv4(x)), 2)
        x = x.view(-1, 7*7*20)
        x = F.relu(self.fc1(x))
        return x

    def forward(self, x):
        x = self.last_hidden_layer_output(x)
        x = self.fc2(x)
        return x

model_cnn_robust = Model_Drop()
model_cnn_robust = model_cnn_robust.to(device)

model_cnn_normal = Model_Drop()
model_cnn_normal = model_cnn_normal.to(device)


In [5]:

softmax = nn.Softmax(dim=1)

def enable_dropout(model):
    """ Function to enable the dropout layers during test-time """
    for m in model.modules():
        if m.__class__.__name__.startswith('Dropout'):
            m.train()


def pgd_linf(model, X, y, epsilon=0.1, alpha=0.01, num_iter=20, randomize=False):
    model.eval()
    if randomize:
        delta = torch.rand_like(X, requires_grad=True)
        delta.data = delta.data * 2 * epsilon - epsilon
    else:
        delta = torch.zeros_like(X, requires_grad=True)
        
    for t in range(num_iter):
        loss = nn.CrossEntropyLoss()(model(X + delta), y)
        loss.backward()
        delta.data = (delta + alpha*delta.grad.detach().sign()).clamp(-epsilon,epsilon)
        delta.grad.zero_()
    return delta.detach()



In [6]:
def epoch(loader, model, opt=None):
    """Standard training/evaluation epoch over the dataset"""
    total_loss, total_err = 0.,0.

    if opt:
      model.train()
      enable_dropout(model)
    else:
      model.eval()

    for X,y in loader:
        X,y = X.to(device), y.to(device)
        yp = model(X)
        loss = nn.CrossEntropyLoss()(yp,y)

        if opt:
            opt.zero_grad()
            loss.backward()
            opt.step()
        
        total_err += (yp.max(dim=1)[1] != y).sum().item()
        total_loss += loss.item() * X.shape[0]
    return total_err / len(loader.dataset), total_loss / len(loader.dataset)


def epoch_adversarial(loader, model, attack, opt=None, **kwargs):
    total_loss, total_err = 0.,0.

    for X,y in loader:
        X,y = X.to(device), y.to(device)

        delta = attack(model, X, y, **kwargs)

        if opt:
          model.train()
        else:
          model.eval()

        yp = model(X+delta)
        loss = nn.CrossEntropyLoss()(yp,y)        
        if opt:
            opt.zero_grad()
            loss.backward()
            opt.step()
        
        total_err += (yp.max(dim=1)[1] != y).sum().item()
        total_loss += loss.item() * X.shape[0]
    return total_err / len(loader.dataset), total_loss / len(loader.dataset)




In [7]:
optt = optim.SGD(model_cnn_robust.parameters(), lr=1e-1)

for t in range(10):

  test_err, test_loss = 0,0

  train_err, train_loss = epoch_adversarial(train_loader, model_cnn_robust, pgd_linf, optt)
  #train_err, train_loss = epoch(train_loader, model_cnn_normal, optt)

  if t == 4:
    for param_group in optt.param_groups:
      param_group["lr"] = 1e-2
  print(*("{:.6f}".format(i) for i in (train_err, test_err)), sep="\t")

model_cnn_robust.eval()
torch.save(model_cnn_robust.state_dict(), "model_cnn_robust.pt")

#model_cnn_normal.eval()
#torch.save(model_cnn_normal.state_dict(), "model_cnn_normal.pt")

0.328300	0.000000
0.052200	0.000000
0.036183	0.000000
0.030233	0.000000
0.025933	0.000000
0.019167	0.000000
0.017317	0.000000
0.016817	0.000000
0.016300	0.000000
0.016183	0.000000


In [8]:
model_cnn_robust.eval()
print(epoch(test_loader, model_cnn_robust)[0])

0.0065
