<a href="https://colab.research.google.com/github/ttchengab/AdversarialDefense/blob/master/CleverhansTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip uninstall -y tensorflow-addons
!pip uninstall -y tensorflow-datasets
!pip uninstall -y tensorflow-estimator
!pip uninstall -y tensorflow-gcs-config
!pip uninstall -y tensorflow-hub
!pip uninstall -y tensorflow-privacy
!pip uninstall -y tensorflow-metadata
!pip uninstall -y tensorflow-probability
!pip uninstall -y tensorflow
!pip install tensorflow-gpu==1.14
!pip install git+https://github.com/tensorflow/cleverhans.git#egg=cleverhans

Uninstalling tensorflow-addons-0.8.3:
  Successfully uninstalled tensorflow-addons-0.8.3
Uninstalling tensorflow-datasets-2.1.0:
  Successfully uninstalled tensorflow-datasets-2.1.0
Uninstalling tensorflow-estimator-2.2.0:
  Successfully uninstalled tensorflow-estimator-2.2.0
Uninstalling tensorflow-gcs-config-2.2.0:
  Successfully uninstalled tensorflow-gcs-config-2.2.0
Uninstalling tensorflow-hub-0.8.0:
  Successfully uninstalled tensorflow-hub-0.8.0
Uninstalling tensorflow-privacy-0.2.2:
  Successfully uninstalled tensorflow-privacy-0.2.2
Uninstalling tensorflow-metadata-0.22.2:
  Successfully uninstalled tensorflow-metadata-0.22.2
Uninstalling tensorflow-probability-0.10.0:
  Successfully uninstalled tensorflow-probability-0.10.0
Uninstalling tensorflow-2.2.0:
  Successfully uninstalled tensorflow-2.2.0
Collecting tensorflow-gpu==1.14
[?25l  Downloading https://files.pythonhosted.org/packages/76/04/43153bfdfcf6c9a4c38ecdb971ca9a75b9a791bb69a764d652c359aca504/tensorflow_gpu-1.14.0-

In [3]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import warnings
import numpy as np
import tensorflow as tf
import torch
from torch import nn
import torch.nn.functional as F
from torch import optim
from torch.autograd import Variable
from torchvision import datasets, transforms

from cleverhans.attacks import FastGradientMethod
from cleverhans.compat import flags
from cleverhans.model import CallableModelWrapper
from cleverhans.utils import AccuracyReport
from cleverhans.utils_pytorch import convert_pytorch_model_to_tf

In [4]:
from tensorflow.python.util import deprecation
deprecation._PRINT_DEPRECATION_WARNINGS = False

In [5]:
class ResBlock(nn.Module):
    
    def __init__(self, in_size:int, hidden_size:int, out_size:int):
        super().__init__()
        self.conv1 = nn.Conv2d(in_size, hidden_size, 3, padding=1)
        self.conv2 = nn.Conv2d(hidden_size, out_size, 3, padding=1)
        self.batchnorm1 = nn.BatchNorm2d(hidden_size)
        self.batchnorm2 = nn.BatchNorm2d(out_size)
    
    def convblock(self, x):
        x = F.relu(self.batchnorm1(self.conv1(x)))
        x = F.relu(self.batchnorm2(self.conv2(x)))
        return x
    
    def forward(self, x): return x + self.convblock(x) # skip connection

class ResNet1(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.res1 = ResBlock(1, 8, 16)
        self.res2 = ResBlock(16, 32, 16)
        self.fc1 = nn.Linear(16 * 14 * 14, 512)
        self.fc2 = nn.Linear(512, 10)
        
    def forward(self, x):
        #1x28x28
        x = self.res1(x)
        #16x28x28
        x = self.res2(x) 
        #16x28x28
        x = F.max_pool2d(F.relu(x), 2)
        #16x14x14
        x = x.view(-1, 16*14*14)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x,dim=-1)

class ResNet2(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.res1 = ResBlock(1, 8, 12)
        self.res2 = ResBlock(12, 16, 12)
        self.fc1 = nn.Linear(12 * 14 * 14, 1024)
        self.fc2 = nn.Linear(1024, 10)
        
    def forward(self, x):
        #1x28x28
        x = self.res1(x)
        #32x28x28
        x = self.res2(x) 
        #16x28x28
        x = F.max_pool2d(F.relu(x), 2)
        #16x14x14
        x = x.view(-1, 12*14*14)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x,dim=-1)

class ResNet3(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.res1 = ResBlock(1, 8, 32)
        self.fc1 = nn.Linear(32 * 14 * 14, 1024)
        self.fc2 = nn.Linear(1024, 10)
        
    def forward(self, x):
        #1x28x28
        x = self.res1(x)
        #32x28x28
        x = F.max_pool2d(F.relu(x), 2)
        #32x14x14
        x = x.view(-1, 32*14*14)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x,dim=-1)

class ResNet4(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.res1 = ResBlock(1, 8, 16)
        self.res2 = ResBlock(16, 32, 16)
        self.res3 = ResBlock(16, 8, 16)
        self.fc1 = nn.Linear(16 * 14 * 14, 1024)
        self.fc2 = nn.Linear(1024, 10)
        
    def forward(self, x):
        #1x28x28
        x = self.res1(x)
        #16x28x28
        x = self.res2(x) 
        #32x28x28
        x = self.res3(x)
        #16x28x28
        x = F.max_pool2d(F.relu(x), 2)
        #16x14x14
        x = x.view(-1, 16*14*14)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x,dim=-1)


  

class LeNet5(torch.nn.Module):          
     
    def __init__(self):     
        super(LeNet5, self).__init__()
        # Convolution (In LeNet-5, 32x32 images are given as input. Hence padding of 2 is done below)
        self.conv1 = torch.nn.Conv2d(1, 6, 5, padding=2)
        # Max-pooling
        self.max_pool_1 = torch.nn.MaxPool2d(kernel_size=2)
        # Convolution
        self.conv2 = torch.nn.Conv2d(6, 16, 5)
        
        # Fully connected layer
        self.fc1 = nn.Linear(16*5*5, 120)   
        self.fc2 = nn.Linear(120, 84)       
        self.fc3 = nn.Linear(84, 10)    
        
    def forward(self, x):
        # convolve, then perform ReLU non-linearity
        x = F.relu(self.conv1(x))  
        # max-pooling with 2x2 grid 
        x = F.max_pool2d(x, 2) 
        # convolve, then perform ReLU non-linearity
        x = F.relu(self.conv2(x))
        # max-pooling with 2x2 grid
        x = F.max_pool2d(x, 2)
        # first flatten 'max_pool_2_out' to contain 16*5*5 columns
        # read through https://stackoverflow.com/a/42482819/7551231
        x = x.view(-1, 16*5*5)
        # FC-1, then perform ReLU non-linearity
        x = F.relu(self.fc1(x))
        # FC-2, then perform ReLU non-linearity
        x = F.relu(self.fc2(x))
        # FC-3
        x = self.fc3(x)
        
        return F.log_softmax(x,dim=-1)

class PytorchMnistModel(nn.Module):
  def __init__(self):
    super(PytorchMnistModel, self).__init__()
    # input is 28x28
    # padding=2 for same padding
    self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
    # feature map size is 14*14 by pooling
    # padding=2 for same padding
    self.conv2 = nn.Conv2d(32, 64, 5, padding=2)
    # feature map size is 7*7 by pooling
    self.fc1 = nn.Linear(64 * 7 * 7, 1024)
    self.fc2 = nn.Linear(1024, 10)

  def forward(self, x):
    x = F.max_pool2d(F.relu(self.conv1(x)), 2)
    x = F.max_pool2d(F.relu(self.conv2(x)), 2)
    x = x.view(-1, 64 * 7 * 7)  # reshape Variable
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return F.log_softmax(x, dim=-1)


class VGGNet(nn.Module):
   def __init__(self):
        super(VGGNet, self).__init__()
        self.conv11 = nn.Conv2d(1, 64, 3)
        self.conv12 = nn.Conv2d(64, 64, 3)
        self.conv21 = nn.Conv2d(64, 128, 3)
        self.conv22 = nn.Conv2d(128, 128, 3)
        self.fc1 = nn.Linear(128 * 2 * 2, 1024)
        self.fc2 = nn.Linear(1024, 10)
   def forward(self, x):
       #1, 28, 28
       x = F.relu(self.conv11(x))
       #64, 26, 26
       x = F.relu(self.conv12(x))
       #64, 24, 24
       x = F.max_pool2d(x, (2,2))
       #64, 12, 12
       x = F.relu(self.conv21(x))
       #128, 10, 10
       x = F.relu(self.conv22(x))
       #128, 8, 8
       x = F.max_pool2d(x, (2,2))
       #128, 4, 4
       x = F.max_pool2d(x, (2,2))
       #128, 2, 2
       x = x.view(-1, 128 * 2 * 2)
       x = F.relu(self.fc1(x))
       x = self.fc2(x)
       return F.log_softmax(x, dim=-1)


In [6]:
# Summary to check torch model structure
from torchsummary import summary
resnet = ResNet4()
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
resnet = resnet.to(device)
summary(resnet, input_size=(1, 28, 28))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1            [-1, 8, 28, 28]              80
       BatchNorm2d-2            [-1, 8, 28, 28]              16
            Conv2d-3           [-1, 16, 28, 28]           1,168
       BatchNorm2d-4           [-1, 16, 28, 28]              32
          ResBlock-5           [-1, 16, 28, 28]               0
            Conv2d-6           [-1, 32, 28, 28]           4,640
       BatchNorm2d-7           [-1, 32, 28, 28]              64
            Conv2d-8           [-1, 16, 28, 28]           4,624
       BatchNorm2d-9           [-1, 16, 28, 28]              32
         ResBlock-10           [-1, 16, 28, 28]               0
           Conv2d-11            [-1, 8, 28, 28]           1,160
      BatchNorm2d-12            [-1, 8, 28, 28]              16
           Conv2d-13           [-1, 16, 28, 28]           1,168
      BatchNorm2d-14           [-1, 16,

In [18]:
FLAGS = flags.FLAGS
NB_EPOCHS = 2
BATCH_SIZE = 128
LEARNING_RATE = .001

def train(torch_model, train_loader, test_loader,
        nb_epochs=NB_EPOCHS, batch_size=BATCH_SIZE, train_end=-1, test_end=-1, learning_rate=LEARNING_RATE):


    # Truncate the datasets so that our test run more quickly
  #   train_loader.dataset.train_data = train_loader.dataset.train_data[:train_end]
  #   test_loader.dataset.test_data = test_loader.dataset.test_data[:test_end]

    # Train our model
    optimizer = optim.Adam(torch_model.parameters(), lr=learning_rate)
    train_loss = []

    total = 0
    correct = 0
    step = 0
    # breakstep = 0
    for _epoch in range(nb_epochs):
      # if breakstep == 2:
      #     # print("break all!")
      #     break
      for xs, ys in train_loader:
        xs, ys = Variable(xs), Variable(ys)
        if torch.cuda.is_available():
          xs, ys = xs.cuda(), ys.cuda()
        optimizer.zero_grad()
        preds = torch_model(xs)
        loss = F.nll_loss(preds, ys)
        loss.backward()  # calc gradients
        train_loss.append(loss.data.item())
        optimizer.step()  # update gradients

        preds_np = preds.cpu().detach().numpy()
        correct += (np.argmax(preds_np, axis=1) == ys.cpu().detach().numpy()).sum()
        total += train_loader.batch_size
        step += 1
        if total % 1000 == 0:
          acc = float(correct) / total
          print('[%s] Training accuracy: %.2f%%' % (step, acc * 100))
          total = 0
          correct = 0
          # breakstep += 1
          # if breakstep == 2:
          #     # print("break!")
          #     break
    
def Ensembler(preds):
    finalPred = np.zeros(len(preds[0]))
    for i in range(len(preds[0])):
      scoreList = np.zeros(10)
      for pred in preds:
        scoreList[pred[i]] += 1
      finalPred[i] = np.argmax(scoreList)
    return finalPred

def eval(model1, model2, model3, model4, test_loader, report, singleModel):
    total = 0
    correct = 0
    for xs, ys in test_loader:
      xs, ys = Variable(xs), Variable(ys)
      if torch.cuda.is_available():
        xs, ys = xs.cuda(), ys.cuda()

      preds1 = model1(xs)
      preds_np1 = preds1.cpu().detach().numpy()
      preds2 = model2(xs)
      preds_np2 = preds2.cpu().detach().numpy()
      preds3 = model3(xs)
      preds_np3 = preds3.cpu().detach().numpy()
      preds4 = model4(xs)
      preds_np4 = preds4.cpu().detach().numpy()

      #preds for 3 and 4
      # preds = [np.argmax(preds_np1, axis=1), np.argmax(preds_np2, axis=1), np.argmax(preds_np3, axis=1)]
      preds = [np.argmax(preds_np1, axis=1), np.argmax(preds_np2, axis=1), np.argmax(preds_np3, axis=1), np.argmax(preds_np4, axis=1)]
      if not singleModel:
        finalPred = Ensembler(preds)
      else:
        finalPred = np.argmax(preds_np1, axis=1)
      correct += (finalPred == ys.cpu().detach().numpy()).sum()
      total += len(xs)

    acc = float(correct) / total
    report.clean_train_clean_eval = acc
    print('Clean accuracy: %.2f%%' % (acc * 100))
    return report


def AttackOnModel(fgsm_model, test_model1, test_model2, test_model3, test_loader, report, singleModel):
    # We use tf for evaluation on adversarial data
    sess = tf.Session()
    x_op = tf.placeholder(tf.float32, shape=(None, 1, 28, 28,))

    # Convert pytorch model to a tf_model and wrap it in cleverhans
    tf_model_fn = convert_pytorch_model_to_tf(fgsm_model)
    cleverhans_model = CallableModelWrapper(tf_model_fn, output_layer='logits')

    # Convert the testing models
    test_model_fn1 = convert_pytorch_model_to_tf(test_model1)
    test_model_fn2 = convert_pytorch_model_to_tf(test_model2)
    test_model_fn3 = convert_pytorch_model_to_tf(test_model3)

    # Create an FGSM attack
    fgsm_op = FastGradientMethod(cleverhans_model, sess=sess)
    fgsm_params = {'eps': 0.3,
                   'clip_min': 0.,
                   'clip_max': 1.}
    adv_x_op = fgsm_op.generate(x_op, **fgsm_params)
    adv_preds_op0 = tf_model_fn(adv_x_op)
    adv_preds_op1 = test_model_fn1(adv_x_op)
    adv_preds_op2 = test_model_fn2(adv_x_op)
    adv_preds_op3 = test_model_fn3(adv_x_op)

    # Run an evaluation of our model against fgsm
    total = 0
    correct = 0
    for xs, ys in test_loader:
      adv_preds0 = sess.run(adv_preds_op0, feed_dict={x_op: xs})
      adv_preds1 = sess.run(adv_preds_op1, feed_dict={x_op: xs})
      adv_preds2 = sess.run(adv_preds_op2, feed_dict={x_op: xs})
      adv_preds3 = sess.run(adv_preds_op3, feed_dict={x_op: xs})
      preds = [np.argmax(adv_preds0, axis=1), np.argmax(adv_preds1, axis=1), np.argmax(adv_preds2, axis=1), np.argmax(adv_preds3, axis=1)]
      if not singleModel:
        finalPred = Ensembler(preds)
      else:
        finalPred = np.argmax(adv_preds1, axis=1)
      correct += (finalPred == ys.cpu().detach().numpy()).sum()
      total += test_loader.batch_size

    acc = float(correct) / total
    print('Adv accuracy: {:.3f}％'.format(acc * 100))
    report.clean_train_adv_eval = acc
    return report

def mnist_test(nb_epochs=NB_EPOCHS, batch_size=BATCH_SIZE,
               train_end=-1, test_end=-1, learning_rate=LEARNING_RATE, 
               torch_model=None, test_model1=None, test_model2=None, test_model3=None, 
               training=1, testEval=0, testAttack=0):
  """
  MNIST cleverhans tutorial
  :param nb_epochs: number of epochs to train model
  :param batch_size: size of training batches
  :param learning_rate: learning rate for training
  :return: an AccuracyReport object
  """
      # Train a pytorch MNIST model
  if torch.cuda.is_available():
    torch_model = torch_model.cuda()

  report = AccuracyReport()
  train_loader = torch.utils.data.DataLoader(
      datasets.MNIST('data', train=True, download=True,
                     transform=transforms.ToTensor()),
      batch_size=batch_size, shuffle=True)
  test_loader = torch.utils.data.DataLoader(
      datasets.MNIST('data', train=False, transform=transforms.ToTensor()),
      batch_size=batch_size)
  if training:
    train(torch_model, train_loader, test_loader, nb_epochs, batch_size, train_end, test_end, learning_rate)
  
  if testEval:
    print("Test Clean on same model")
    eval(torch_model, torch_model, torch_model, torch_model, test_loader, report, 1)
    print("Test Clean on ensembled model")
    eval(torch_model, test_model1, test_model2, test_model3, test_loader, report, 0)
  
  # Evaluate on clean data
  
  if testAttack:
    print("Test FGSM on the same model")
    report = AttackOnModel(torch_model, torch_model, torch_model, torch_model, test_loader, report, 1)
    print("Test FGSM on ensembled model")
    report = AttackOnModel(torch_model, test_model1, test_model2, test_model3, test_loader, report, 0)


In [29]:
# # Experiment with 3 models
# model1 = LeNet5()
# model2 = ResNet1()
# model3 = VGGNet()

# # Experiment 1
# model1 = PytorchMnistModel()
# model2 = LeNet5()
# model3 = ResNet1()
# model4 = VGGNet()

# #Experiment 2
# model1 = ResNet1()
# model2 = ResNet1()
# model3 = ResNet1()
# model4 = ResNet1()

# #Experiment 3
# model1 = ResNet1()
# model2 = ResNet2()
# model3 = ResNet3()
# model4 = ResNet4()


In [30]:
#train the two models
print('Running Model 1')
mnist_test(nb_epochs=4, batch_size=128, learning_rate=0.001, torch_model=model1, training=1, testAttack=0)
print('Running Model 2')
mnist_test(nb_epochs=4, batch_size=128, learning_rate=0.001, torch_model=model2, training=1, testAttack=0)
print('Running Model 3')
mnist_test(nb_epochs=4, batch_size=128, learning_rate=0.001, torch_model=model3, training=1, testAttack=0)
# print('Running Model 4')
# mnist_test(nb_epochs=4, batch_size=128, learning_rate=0.001, torch_model=model4, training=1, testAttack=0)

Running Model 1
[125] Training accuracy: 73.71%
[250] Training accuracy: 91.95%
[375] Training accuracy: 95.47%
[500] Training accuracy: 95.86%
[625] Training accuracy: 96.90%
[750] Training accuracy: 97.08%
[875] Training accuracy: 97.42%
[1000] Training accuracy: 97.59%
[1125] Training accuracy: 97.78%
[1250] Training accuracy: 98.07%
[1375] Training accuracy: 97.87%
[1500] Training accuracy: 98.11%
[1625] Training accuracy: 98.64%
[1750] Training accuracy: 98.28%
[1875] Training accuracy: 98.26%
Running Model 2
[125] Training accuracy: 85.26%
[250] Training accuracy: 96.23%
[375] Training accuracy: 97.63%
[500] Training accuracy: 97.65%
[625] Training accuracy: 98.24%
[750] Training accuracy: 98.51%
[875] Training accuracy: 98.39%
[1000] Training accuracy: 98.41%
[1125] Training accuracy: 98.71%
[1250] Training accuracy: 98.96%
[1375] Training accuracy: 98.89%
[1500] Training accuracy: 98.80%
[1625] Training accuracy: 99.05%
[1750] Training accuracy: 98.92%
[1875] Training accuracy:

In [31]:
#test clean
print('Testing model 1 clean')
mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model1, 
           test_model1=model2, test_model2=model3, test_model3=model4, training=0, testEval=1, testAttack=0)
print('Testing model 2 clean')
mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model2, 
           test_model1=model1, test_model2=model3, test_model3=model4, training=0, testEval=1, testAttack=0)
print('Testing model 3 clean')
mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model3, 
           test_model1=model1, test_model2=model2, test_model3=model4, training=0, testEval=1, testAttack=0)
# print('Testing model 4 clean')
# mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model4, 
#            test_model1=model1, test_model2=model2, test_model3=model3, training=0, testEval=1, testAttack=0)

# #test fgsm
# print('Testing model 1 FGSM')
# mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model1, 
#            test_model1=model2, test_model2=model3, test_model3=model4, training=0, testEval=1, testAttack=0)
# print('Testing model 2 FGSM')
# mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model2, 
#            test_model1=model1, test_model2=model3, test_model3=model4, training=0, testEval=1, testAttack=0)
# print('Testing model 3 FGSM')
# mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model3, 
#            test_model1=model1, test_model2=model2, test_model3=model4, training=0, testEval=1, testAttack=0)
# print('Testing model 4 FGSM')
# mnist_test(nb_epochs=2, batch_size=128, learning_rate=0.001, torch_model=model4, 
#            test_model1=model1, test_model2=model2, test_model3=model3, training=0, testEval=1, testAttack=0)

Testing model 1 clean
Test Clean on same model
Clean accuracy: 98.60%
Test Clean on ensembled model
Clean accuracy: 99.40%
Testing model 2 clean
Test Clean on same model
Clean accuracy: 98.76%
Test Clean on ensembled model
Clean accuracy: 99.40%
Testing model 3 clean
Test Clean on same model
Clean accuracy: 99.24%
Test Clean on ensembled model
Clean accuracy: 99.40%
