In [0]:
import torch
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data.sampler import SubsetRandomSampler
import torch.utils.data as DataUtils
import numpy as np
import time
import sys

# Readymade data loading function
DATA_ROOT='./MNISTData/'
def getMNISTDataLoaders(batchSize=64, nTrain=50000, nVal=10000, nTest=10000):
  # You can use technically use the same transform instance for all 3 sets
  assert (60000 - nVal) == nTrain, 'nTrain + nVal must be equal to 60000'
  trainTransform = transforms.Compose([transforms.ToTensor()])
  valTransform = transforms.Compose([transforms.ToTensor()])
  testTransform = transforms.Compose([transforms.ToTensor()])
  
  trainSet = datasets.MNIST(root=DATA_ROOT, download=True, train=True, \
                           transform=trainTransform)
  valSet = datasets.MNIST(root=DATA_ROOT, download=True, train=True, \
                         transform=valTransform)
  testSet = datasets.MNIST(root=DATA_ROOT, download=True, train=False, \
                                 transform=testTransform)
  
  indices = np.arange(0, 60000)
  np.random.shuffle(indices)
  
  trainSampler = SubsetRandomSampler(indices[:nTrain])
  valSampler = SubsetRandomSampler(indices[nTrain:])
  testSampler = SubsetRandomSampler(np.arange(0, nTest))
  
  trainLoader = DataUtils.DataLoader(trainSet, batch_size=batchSize, \
                                   sampler=trainSampler)
  valLoader = DataUtils.DataLoader(valSet, batch_size=batchSize, \
                                  sampler=valSampler)
  testLoader = DataUtils.DataLoader(testSet, batch_size=batchSize, \
                                    sampler=testSampler)
  return trainLoader, valLoader, testLoader

In [0]:
# Defining the network (LeNet-5)  
class LeNet5(torch.nn.Module):          
     
    def __init__(self):     
        super(LeNet5, self).__init__()
        # Convolution (In LeNet-5, 32x32 images are given as input. Hence padding of 2 is done below)
        self.conv1 = torch.nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2 , bias=True)
        # Max-pooling
        self.max_pool_1 = torch.nn.MaxPool2d(kernel_size=2)
        # Convolution
        self.conv2 = torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, stride=1, padding=2, bias=True)
        # Max-pooling
        self.max_pool_2 = torch.nn.MaxPool2d(kernel_size=2) 
        # Fully connected layer
        self.fc1 = torch.nn.Linear(64*7*7, 1024)   # convert matrix with 16*5*5 (= 400) features to a matrix of 120 features (columns)
       # self.fc2 = torch.nn.Linear(120, 84)       # convert matrix with 120 features to a matrix of 84 features (columns)
        self.fc3 = torch.nn.Linear(1024, 10)        # convert matrix with 84 features to a matrix of 10 features (columns)
        
    def forward(self, x):
        # convolve, then perform ReLU non-linearity
        x = torch.nn.functional.relu(self.conv1(x))  
        # max-pooling with 2x2 grid 
        x = self.max_pool_1(x) 
        # convolve, then perform ReLU non-linearity
        x = torch.nn.functional.relu(self.conv2(x))
        # max-pooling with 2x2 grid
        x = self.max_pool_2(x)
        # first flatten 'max_pool_2_out' to contain 16*5*5 columns
        # read through https://stackoverflow.com/a/42482819/7551231
        x = x.view(-1, 64*7*7)
        # FC-1, then perform ReLU non-linearity
        x = torch.nn.functional.relu(self.fc1(x))
        # FC-2, then perform ReLU non-linearity
       # x = torch.nn.functional.relu(self.fc2(x))
        # FC-3
        x = self.fc3(x)
        return x

In [3]:
# Define the `device` PyTorch will be running on
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Notebook will use PyTorch Device: ' + device.upper())

Notebook will use PyTorch Device: CUDA


In [0]:
# Utility Progress Bar Function
def progress(curr, total, suffix=''):
  bar_len = 48
  filled = int(round(bar_len * curr / float(total)))
  if filled == 0:
    filled = 1
  bar = '=' * (filled - 1) + '>' + '-' * (bar_len - filled)
  sys.stdout.write('\r[%s] .. %s' % (bar, suffix))
  sys.stdout.flush()
  if curr == total:
    bar = bar_len * '='
    sys.stdout.write('\r[%s] .. %s .. Completed\n' % (bar, suffix))

In [0]:
import torch.nn as nn
n_epochs = 20
lr = 1e-2
step = 0
xent_loss = nn.CrossEntropyLoss()
model = LeNet5().to(device)
model.train()
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

In [6]:
train_loader, val_loader, test_loader = getMNISTDataLoaders()
start_time = time.time()
for i in range(n_epochs):
  for j, (images, labels) in enumerate(train_loader):
    images, labels = images.to(device), labels.to(device)
    optimizer.zero_grad()
    logits = model(images)
    loss = xent_loss(logits, labels)
    loss.backward()
    optimizer.step()
    if j % 8 == 0:
      progress(j+1, len(train_loader), 'Batch [{}/{}] Epoch [{}/{}] Loss = {:.3f}'.format(j+1, len(train_loader), i+1, n_epochs, loss.item()))
    step += 1
end_time = time.time()
print('\nTotal training steps = {}'.format(step))
print('Total time taken = {}'.format(end_time - start_time))

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw/train-images-idx3-ubyte.gz


HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ./MNISTData/MNIST/raw/train-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw/train-labels-idx1-ubyte.gz


HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ./MNISTData/MNIST/raw/train-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw/t10k-images-idx3-ubyte.gz


HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ./MNISTData/MNIST/raw/t10k-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw/t10k-labels-idx1-ubyte.gz


HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ./MNISTData/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw
Processing...
Done!


Total training steps = 15640
Total time taken = 179.67595863342285


In [7]:
# Evaluation code
correct = 0
model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  logits = model(images)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
model.train()
print('Accuracy = {}%'.format(float(correct) * 100 / 10000))

Accuracy = 99.2%


In [8]:
###  Google Colab doesn't ship with advertorch and we will have to install it ourselves
!pip install advertorch > /dev/null
import advertorch
print(advertorch.__version__)

0.2.2


In [9]:
# Evaluating against FGSM attack
from advertorch.attacks import LinfPGDAttack
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary = LinfPGDAttack(model, eps=0.3)
correct = 0
model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images = adversary.perturb(images, labels) # This is extra step as compared to normal clean accuracy testing
  logits = model(adv_images)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))

Accuracy on FGSM adversarial samples = 0.14%


In [0]:
n_epochs = 20
lr = 1e-2
step = 0
xent_loss = nn.CrossEntropyLoss()
adv_model = LeNet5().to(device)
adv_model.train()
optimizer = torch.optim.SGD(adv_model.parameters(), lr=lr)

train_loader, val_loader, test_loader = getMNISTDataLoaders()
start_time = time.time()

In [11]:
"""
Although not officially mentioned, making `size_average=False` for the loss 
function improves reliability of the result in PyTorch 0.4.0. This is required
since we are taking step against the gradient for "every" image in the batch.
So reducing them to a single value won't cut it.
"""
advertorch_loss_fn = nn.CrossEntropyLoss(size_average=False)
for i in range(n_epochs):
  for j, (images, labels) in enumerate(train_loader):
    images, labels = images.to(device), labels.to(device)
    """
    Creating the adversary :
    ------------------------
    Adversarial examples should be typically generated when model parameters are not 
    changing i.e. model parameters are frozen. This step may not be required for very
    simple linear models, but is a must for models using components such as dropout 
    or batch normalization.
    """
    adv_model.eval() # Freezes the model parameters
    """
    The `clip` values here determine the clipping range after taking the adversarial step
    The clipping is essential to keep the domain of input images within the range
    MNIST images for this notebook are normalized to [0, 1]. If you're using something else, 
    make sure to modify these values accordingly. The `eps` value decides the magnitude
    of the attack. For all MNIST models, the threat model advises to stick to maximum eps of 0.3 
    for input in range [0, 1]
    """
    fgsm_adversary = LinfPGDAttack(adv_model, advertorch_loss_fn, eps=0.3, clip_min=0., \
                  clip_max=1., targeted=False)
    adv_images = fgsm_adversary.perturb(images, labels) # Generate adversarial samples
     # Allows model parameters to be changed again
    adv_model.train()
    train_images = adv_images 
    train_labels = labels
    optimizer.zero_grad()
    logits = adv_model(train_images)
    loss = xent_loss(logits, train_labels)
    
    loss.backward()
    optimizer.step()
    if j % 8 == 0:
      progress(j+1, len(train_loader), 'Batch [{}/{}] Epoch [{}/{}] Loss = {:.3f}'.format(j+1, len(train_loader), i+1, n_epochs, loss.item()))
    step += 1

    end_time = time.time()
print('\nTotal training steps = {}'.format(step))
print('Total time taken = {}'.format(end_time - start_time))





KeyboardInterrupt: ignored

In [0]:
# Evaluating against PGD attack
from advertorch.attacks import LinfPGDAttack
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary = LinfPGDAttack(model, eps=0.3)
correct = 0
adv_model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images = adversary.perturb(images, labels)
  logits = adv_model(adv_images)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
adv_model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))