<a href="https://colab.research.google.com/github/KaichengDING/Triple-Defense/blob/main/idl_project_ensemble.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

# Install ART

In [None]:
!pip install adversarial-robustness-toolbox==1.4.3

# Import Dependencies

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models

import numpy as np
import pandas as pd

import PIL

import sys
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler

from torch.utils import data

from art.attacks.evasion import FastGradientMethod
from art.attacks.evasion import ProjectedGradientDescentPyTorch
from art.estimators.classification import PyTorchClassifier

from art.estimators.classification import EnsembleClassifier
from typing import List, Optional, Union, TYPE_CHECKING
from art.estimators.classification.classifier import ClassifierNeuralNetwork
from scipy.special import softmax

import matplotlib.pyplot as plt
import time
import logging
import datetime
import random

cuda = torch.cuda.is_available()

device = torch.device("cuda" if cuda else "cpu")


# Model Definition
`ShuffleNet` and `ShuffleNetV2` are actually identical, but models with seed number smaller than 200 are created using ShuffleNet and models with seed number greater than 200 are created using ShuffleNetV2. In order to load all models into ensemble, the two definitions are provided here.

In [None]:
class ShuffleNet(nn.Module):
    def __init__(self, nb_classes =10):
        super(ShuffleNet, self).__init__()
        self.shuffle = models.shufflenet_v2_x2_0()
        self.linear = nn.Linear(1000, nb_classes)
        
    def forward(self, x):
        x = self.shuffle(x)
        x = self.linear(x)
        return x


class ShuffleNetV2(nn.Module):
    def __init__(self, nb_classes=10):
        super(ShuffleNetV2, self).__init__()
        self.shufflenet = models.shufflenet_v2_x2_0()
        self.linear = nn.Linear(1000, nb_classes)
        
    def forward(self, x):
        x = self.shufflenet(x)
        x = self.linear(x)
        return x

# Logging Configuration

In [None]:
# configure logging
logger = logging.getLogger("")

# reset handler
for handler in logging.root.handlers[:]:
  logging.root.removeHandler(handler)

# set handler
stream_hdlr = logging.StreamHandler()
# logging on colab
file_hdlr = logging.FileHandler('/content/gdrive/My Drive/IDL_Project/logs/log_{}.log'.format(datetime.datetime.now()))

formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
stream_hdlr.setFormatter(formatter)
file_hdlr.setFormatter(formatter)

logger.addHandler(stream_hdlr)
logger.addHandler(file_hdlr)

logger.setLevel(logging.INFO)

# Get Test Examples

In [None]:
class MyDataset(torch.utils.data.Dataset):
  def __init__(self, X, Y, transform=None):
    self.X = X
    self.Y = Y
    self.transform = transform
  
  def __len__(self):
    return len(self.Y)
  
  def __getitem__(self, idx):
    if self.transform is None:
      return torch.from_numpy(self.X[idx]), torch.tensor(self.Y[idx]).long()
    else:
      return self.transform(self.X[idx]), torch.tensor(self.Y[idx]).long()

In [None]:
test_batchsize = 200
num_workers = 4
nb_classes = 10
img_size = 224
subset_size = 1000

test_transform = transforms.Compose([transforms.ToPILImage(),
                                     transforms.Resize(size=img_size),
                                      transforms.ToTensor(),
                                      transforms.Normalize((0, 0, 0), (1, 1, 1))])

testset= torchvision.datasets.CIFAR10(root='./data', train=False, download=True)
testset_data = testset.data[0:subset_size]
testset_labels = testset.targets[0:subset_size]

testset_sub = MyDataset(testset_data, testset_labels, transform=test_transform)

testloader = torch.utils.data.DataLoader(testset_sub, batch_size=test_batchsize, shuffle=False, num_workers=num_workers, drop_last=True)

# Load Saved Models (ignore for now)
All saved models will be loaded into `model_dict`. Models with seed number smaller than 200 belong to `ShuffleNet` class, and models with seed number greater than 200 belong to `ShuffleNetV2` class.

In [None]:
# models with 0.1 training noise
# model_names = ['ShuffleNet_1',
#                'ShuffleNet_2',
#                'ShuffleNet_3',
#                'ShuffleNet_4',
#                'ShuffleNet_5',
#                'ShuffleNet_6',
#                'ShuffleNet_7',
#                'ShuffleNet_8',
#                'ShuffleNet_31',
#                'ShuffleNet_32',
#                'ShuffleNet_33',
#                'ShuffleNet_34',
#                'ShuffleNet_35',
#                'ShuffleNet_36',
#                'ShuffleNet_37',
#                'ShuffleNet_38',
#                'ShuffleNet_39',
#                'ShuffleNet_61',
#                'ShuffleNet_62',
#                'ShuffleNet_63',
#                'ShuffleNet_64',
#                'ShuffleNet_65',
#                'ShuffleNet_66',
#                'ShuffleNet_67',
#                'ShuffleNet_68',
#                'ShuffleNet_69',
#                'ShuffleNet_70',
#                'ShuffleNet_71',
#                'ShuffleNet_72',
#                'ShuffleNet_73']

# model_names_v2 = ['ShuffleNet_200', 
#                   'ShuffleNet_201',
#                   'ShuffleNet_202',
#                   'ShuffleNet_203',
#                   'ShuffleNet_204',
#                   'ShuffleNet_205',
#                   'ShuffleNet_206',
#                   'ShuffleNet_207',
#                   'ShuffleNet_208',
#                   'ShuffleNet_209']

# model_dict = {}

# for model_name in model_names:
#   model = ShuffleNet()
#   model_data = torch.load('/content/gdrive/My Drive/IDL_Project/modelS/{}'.format(model_name), map_location=torch.device('cpu'))
#   model.load_state_dict(model_data['model_state_dict'])
#   model = model.to(device)
#   model_dict[model_name] = model

# for model_name in model_names_v2:
#   model = ShuffleNetV2()
#   model_data = torch.load('/content/gdrive/My Drive/IDL_Project/modelS/{}'.format(model_name), map_location=torch.device('cpu'))
#   model.load_state_dict(model_data['model_state_dict'])
#   model = model.to(device)
#   model_dict[model_name] = model

# logging.info("Total number of models: {}".format(len(model_names) + len(model_names_v2)))

# Creating classifier list (ignore for now)

In [None]:
# nb_classes = 10
# criterion = nn.CrossEntropyLoss()
# classifier_list = []
# for model_name in model_dict:
#   classifier_list.append(PyTorchClassifier(
#     model=model_dict[model_name],
#     clip_values=(0, 1),
#     loss=criterion,
#     input_shape=(3, img_size, img_size),
#     nb_classes=nb_classes,
# ))

# MyEnsembleClassifier
Inherited from `EnsembleClassifier` with `predict`, `loss_gradient` and `loss_gradient_framework` overwritten



In [None]:
class MyEnsembleClassifier(EnsembleClassifier):
    def __init__(
        self,
        classifiers: List[ClassifierNeuralNetwork],
        device,
        infer_noise,
        num_selected_models,
        classifier_weights: Union[list, np.ndarray, None] = None,
        channels_first: bool = False,
        clip_values: Optional["CLIP_VALUES_TYPE"] = None,
        preprocessing_defences: Union["Preprocessor", List["Preprocessor"], None] = None,
        postprocessing_defences: Union["Postprocessor", List["Postprocessor"], None] = None,
        preprocessing: "PREPROCESSING_TYPE" = (0, 1),
    ) -> None:
      super().__init__(
          classifiers=classifiers,
          classifier_weights=classifier_weights,
          channels_first=channels_first,
          clip_values=clip_values,
          preprocessing_defences=preprocessing_defences,
          postprocessing_defences=postprocessing_defences,
          preprocessing=preprocessing
      )
      self.device = device
      self.infer_noise = infer_noise
      self.num_models = len(classifiers)
      self.num_selected_models = num_selected_models

    def predict(self, x: np.ndarray, batch_size: int = 128, raw: bool = False, **kwargs) -> np.ndarray:
        """
        Perform prediction for a batch of inputs. Predictions from classifiers should only be aggregated if they all
        have the same type of output (e.g., probabilities). Otherwise, use `raw=True` to get predictions from all
        models without aggregation. The same option should be used for logits output, as logits are not comparable
        between models and should not be aggregated.
        :param x: Test set.
        :param batch_size: Size of batches.
        :param raw: Return the individual classifier raw outputs (not aggregated).
        :return: Array of predictions of shape `(nb_inputs, nb_classes)`, or of shape
                 `(nb_classifiers, nb_inputs, nb_classes)` if `raw=True`.
        """

        indices = np.random.choice(self.num_models, self.num_selected_models, replace=True)

        noise_list = [np.random.randn(*x.shape) * self.infer_noise for i in indices]

        preds = []
        for iidx, i in enumerate(indices):
          preds.append(softmax(self._classifiers[i].predict(np.float32(x + noise_list[iidx])), axis=1))
        preds = np.array(preds)


        del indices
        del noise_list
        torch.cuda.empty_cache()

        if raw:
            return preds

        # 6 x 100
        preds_classes = np.argmax(preds, axis=2)
        row, col = preds_classes.shape

        # 100,
        majority_vote = np.array(
            [
             np.bincount(preds_classes[:,c]).argmax()
             for c in range(col)
            ]
        )

        mask = preds_classes == majority_vote
        del majority_vote

        mask = np.repeat(np.expand_dims(mask, axis=2), repeats=10, axis=2)

        # Aggregate predictions only at probabilities level, as logits are not comparable between models
        var_z = np.sum(mask * preds, axis=0) / np.sum(mask, axis=0) # take mean (check sum to 1)
        del mask
        del preds

        # Apply postprocessing
        predictions = self._apply_postprocessing(preds=var_z, fit=False)
        del var_z

        return predictions


    def loss_gradient(self, x: np.ndarray, y: np.ndarray, raw: bool = False, **kwargs) -> np.ndarray:
        """
        Compute the gradient of the loss function w.r.t. `x`.
        :param x: Sample input with shape as expected by the model.
        :param y: Target values (class labels) one-hot-encoded of shape (nb_samples, nb_classes) or indices of shape
                  (nb_samples,).
        :param raw: Return the individual classifier raw outputs (not aggregated).
        :return: Array of gradients of the same shape as `x`. If `raw=True`, shape becomes `[nb_classifiers, x.shape]`.
        """

        indices = np.random.choice(self.num_models, self.num_selected_models, replace=True)

        noise_list = [np.random.randn(*x.shape) * self.infer_noise for i in indices]
        
        grads = np.array(
            [
                self._classifiers[i].loss_gradient(np.float32(x + noise_list[iidx]), y)
                for iidx, i in enumerate(indices)
            ]
        )

        torch.cuda.empty_cache()
        del indices
        del noise_list

        if raw:
            return grads

        return np.sum(grads, axis=0)

    def loss_gradient_framework(self, x: "torch.Tensor", y: "torch.Tensor", **kwargs) -> "torch.Tensor":
        """
        Compute the gradient of the loss function w.r.t. `x`.
        :param x: Sample input with shape as expected by the model.
        :param y: Target values (class labels) one-hot-encoded of shape (nb_samples, nb_classes) or indices of shape
                  (nb_samples,).
        :param raw: Return the individual classifier raw outputs (not aggregated).
        :return: Array of gradients of the same shape as `x`. If `raw=True`, shape becomes `[nb_classifiers, x.shape]`.
        """

        
        indices = np.random.choice(self.num_models, self.num_selected_models, replace=True)

        noise_list = [(torch.randn(x.size()) * self.infer_noise).to(device) for i in indices]


        # gradient shape: batch_size, 3, 224, 224
        accumulator = torch.zeros(test_batchsize, 3, img_size, img_size)
        for iidx, i in enumerate(indices):
          accumulator += self._classifiers[i].loss_gradient_framework(x + noise_list[iidx], y).cpu()
        
        torch.cuda.empty_cache()
        del indices
        del noise_list

        return accumulator.to(device)

# Experiment Settings
### common settings:
*   inference noise: 0.1
*   eps: 0.5
*   eps step: 0.4
*   max iter: 20
*   L2 norm
*   number of samplings: 50

### to experiment:
1. Total number of candidate models in the ensemble: 1 (randomized smoothing)
2. Total number of candidate models in the ensemble: 5
3. Total number of candidate models in the ensemble: 9
4. Total number of candidate models in the ensemble: 13

# Run this code block to load common variables

In [None]:
infer_noise = 0.1
eps = 0.5
eps_step = 0.4
max_iter = 20
norm = 2
num_selected_models = 50

# Experiment 1 (Shivani)

In [None]:
model_names = ['ShuffleNet_1']

# Experiment 2 (Kriti)

In [None]:
model_names = ['ShuffleNet_1',
               'ShuffleNet_2',
               'ShuffleNet_3',
               'ShuffleNet_4',
               'ShuffleNet_5']

# Experiment 3 (Shriti)

In [None]:
model_names = ['ShuffleNet_1',
               'ShuffleNet_2',
               'ShuffleNet_3',
               'ShuffleNet_4',
               'ShuffleNet_5',
               'ShuffleNet_6',
               'ShuffleNet_7',
               'ShuffleNet_8',
               'ShuffleNet_31']

# Experiment 4 (Kaicheng)

In [None]:
model_names = ['ShuffleNet_1',
               'ShuffleNet_2',
               'ShuffleNet_3',
               'ShuffleNet_4',
               'ShuffleNet_5',
               'ShuffleNet_6',
               'ShuffleNet_7',
               'ShuffleNet_8',
               'ShuffleNet_31',
               'ShuffleNet_32',
               'ShuffleNet_33',
               'ShuffleNet_34',
               'ShuffleNet_35']

# Kick off Experiment

In [None]:
# load models
model_dict = {}
for model_name in model_names:
  model = ShuffleNet()
  model_data = torch.load('/content/gdrive/My Drive/IDL_Project/modelS/{}'.format(model_name), map_location=torch.device('cpu'))
  model.load_state_dict(model_data['model_state_dict'])
  model = model.to(device)
  model_dict[model_name] = model

logging.info("Models loaded successfully!")
logging.info("Total number of models: {}".format(len(model_names)))

# create classifier list
criterion = nn.CrossEntropyLoss()
classifier_list = []
for model_name in model_dict:
  classifier_list.append(PyTorchClassifier(model=model_dict[model_name],
                                           clip_values=(0, 1),
                                           loss=criterion,
                                           input_shape=(3, img_size, img_size),
                                           nb_classes=nb_classes))

# create ensemble classifier
my_ensemble_classifier = MyEnsembleClassifier(classifier_list,
                                              device, 
                                              infer_noise=infer_noise,
                                              num_selected_models=num_selected_models,
                                              clip_values=[0., 1.], 
                                              channels_first=True)

# create attack object
attack = ProjectedGradientDescentPyTorch(my_ensemble_classifier, 
                                         eps=eps, 
                                         eps_step=eps_step, 
                                         norm=norm, 
                                         max_iter=max_iter, 
                                         batch_size=test_batchsize)

# kick off experiment
Acc_adv = []
Acc_nat = []
for batch_idx, (X, Y) in enumerate(testloader):
  x_test = X.numpy()
  y_test = Y.numpy()
  predictions_nat = my_ensemble_classifier.predict(x_test)
  accuracy_nat = np.sum(np.argmax(predictions_nat, axis=1) == y_test) / len(y_test)
  Acc_nat.append(accuracy_nat)

  x_test_adv = attack.generate(torch.from_numpy(x_test), torch.from_numpy(y_test))
  predictions_adv = my_ensemble_classifier.predict(x_test_adv)
  accuracy_adv = np.sum(np.argmax(predictions_adv, axis=1) == y_test) / len(y_test)
  Acc_adv.append(accuracy_adv)

  logging.info('accuracy (adversarial): {}'.format(accuracy_adv))
  logging.info('accuracy (natural): {}'.format(accuracy_nat))
    

# saving the dataframe 
Acc_adv = np.asanyarray(Acc_adv)
Acc_nat = np.asanyarray(Acc_nat)
idx = np.arange(0, len(testset)/test_batchsize).tolist()
res = {'Id': idx ,'AccADV': Acc_adv, 'AccNAT': Acc_nat} 
df = pd.DataFrame(res)
df.to_csv('./gdrive/My Drive/IDL_Project/results/result_{}_models.csv'.format(len(model_names)),index=False)


# **The rest parts of the notebook was regarding previous work. Discard them for now.**

# Experiment Settings (**deprecated**)
Refer to [this google doc](https://docs.google.com/document/d/1VaS5THALdudd_63Zv7ZR2u8rj8r8iDF6xgXzzLHXR8w/edit)

1.   `num_selected_models`=15, `infer_noise`=[0.0025, 0.005, 0.01, 0.05, 0.08, 0.1], `eps`=[0.0025, 0.01, 0.1, 0.5, 0.8], `norm`=Linf
2.   `num_selected_models`=[5, 10, 15, 20, 25, 30, 35, 40], `infer_noise`=0.1, `eps`=[0.0025, 0.01, 0.1, 0.5, 0.8], `norm`=Linf
3.   `num_selected_models`=15, `infer_noise`=[0.0025, 0.005, 0.01, 0.05, 0.08, 0.1], `eps`=[0.1, 0.5, 0.8, 0.9, 1], `norm`=L2
4.   `num_selected_models`=[5, 10, 15, 20, 25, 30, 35, 40], `infer_noise`=0.1, `eps`=[0.1, 0.5, 0.8, 0.9, 1], `norm`=L2








### Arbitrary Experiment

In [None]:
num_selected_models = 50
# norm = np.inf
norm = 2
infer_noise = 0.1
# eps = 8/255
eps = 0.5
logging.info("Current experiment setting: eps={}, inference noise={}".format(eps, infer_noise))
my_ensemble_classifier = MyEnsembleClassifier(classifier_list,
                                              device, 
                                              infer_noise=infer_noise,
                                              num_selected_models=num_selected_models,
                                              clip_values=[0., 1.], 
                                              channels_first=True)
attack = ProjectedGradientDescentPyTorch(my_ensemble_classifier, eps=eps, eps_step=0.4, norm=norm, max_iter=20, batch_size=test_batchsize)
Acc_adv = []
Acc_nat = []
for batch_idx, (X, Y) in enumerate(testloader):
  x_test = X.numpy()
  y_test = Y.numpy()
  predictions_nat = my_ensemble_classifier.predict(x_test)
  accuracy_nat = np.sum(np.argmax(predictions_nat, axis=1) == y_test) / len(y_test)
  Acc_nat.append(accuracy_nat)

  x_test_adv = attack.generate(torch.from_numpy(x_test), torch.from_numpy(y_test))
  predictions_adv = my_ensemble_classifier.predict(x_test_adv)
  accuracy_adv = np.sum(np.argmax(predictions_adv, axis=1) == y_test) / len(y_test)
  Acc_adv.append(accuracy_adv)
  
  # print('accuracy (adversarial): {}'.format(accuracy_adv))
  # print('accuracy (natural): {}'.format(accuracy_nat))

  # logging.info('accuracy (adversarial): {}'.format(accuracy_adv))
  # logging.info('accuracy (natural): {}'.format(accuracy_nat))
    
Acc_adv = np.asanyarray(Acc_adv)
Acc_nat = np.asanyarray(Acc_nat)
idx = np.arange(0, len(testset)/test_batchsize).tolist()
res = {'Id': idx ,'AccADV': Acc_adv, 'AccNAT': Acc_nat} 
df = pd.DataFrame(res) 
# saving the dataframe 
df.to_csv('./gdrive/My Drive/IDL_Project/results/PGD_exp1_infer_noise{}_eps{}.csv'.format(infer_noise, eps),index=False)

# Experiment 1

In [None]:
num_selected_models = 15
norm = np.inf
infer_noise_list = [0.0025, 0.005, 0.01, 0.05, 0.08, 0.1]
eps_list = [0.0025, 0.01, 0.1, 0.5, 0.8]

for infer_noise in infer_noise_list:
  for eps in eps_list:
    my_ensemble_classifier = MyEnsembleClassifier(classifier_list,
                                                  device, 
                                                  infer_noise=infer_noise,
                                                  num_selected_models=num_selected_models,
                                                  clip_values=[0., 1.], 
                                                  channels_first=True)
    attack = ProjectedGradientDescentPyTorch(my_ensemble_classifier, eps=eps, eps_step=eps/3, norm=norm)
    Acc_adv = []
    Acc_nat = []
    for batch_idx, (X, Y) in enumerate(testloader):
      x_test = X.numpy()
      y_test = Y.numpy()
      predictions_nat = my_ensemble_classifier.predict(x_test)
      accuracy_nat = np.sum(np.argmax(predictions_nat, axis=1) == y_test) / len(y_test)
      Acc_nat.append(accuracy_nat)

      x_test_adv = attack.generate(torch.from_numpy(x_test), torch.from_numpy(y_test))
      predictions_adv = my_ensemble_classifier.predict(x_test_adv)
      accuracy_adv = np.sum(np.argmax(predictions_adv, axis=1) == y_test) / len(y_test)
      Acc_adv.append(accuracy_adv)
      
      logging.info('accuracy (adversarial): {}'.format(accuracy_adv))
      logging.info('accuracy (natural): {}'.format(accuracy_nat))
    
    Acc_adv = np.asanyarray(Acc_adv)
    Acc_nat = np.asanyarray(Acc_nat)
    idx = np.arange(0, len(testset)/test_batchsize).tolist()
    res = {'Id': idx ,'AccADV': Acc_adv, 'AccNAT': Acc_nat} 
    df = pd.DataFrame(res) 
    # saving the dataframe 
    df.to_csv('./gdrive/My Drive/IDL_Project/results/PGD_exp1_infer_noise{}_eps{}.csv'.format(infer_noise, eps),index=False)

# Experiment 2

In [None]:
num_selected_models_list = [5, 10, 15, 20, 25, 30, 35, 40]
norm = np.inf
infer_noise = 0.1
eps_list = [0.0025, 0.01, 0.1, 0.5, 0.8]

for num_selected_models in num_selected_models_list:
  for eps in eps_list:
    my_ensemble_classifier = MyEnsembleClassifier(classifier_list,
                                                  device, 
                                                  infer_noise=infer_noise,
                                                  num_selected_models=num_selected_models,
                                                  clip_values=[0., 1.], 
                                                  channels_first=True)
    attack = ProjectedGradientDescentPyTorch(my_ensemble_classifier, eps=eps, eps_step=eps/3, norm=norm)
    Acc_adv = []
    Acc_nat = []
    for batch_idx, (X, Y) in enumerate(testloader):
      x_test = X.numpy()
      y_test = Y.numpy()
      x_test_adv = attack.generate(torch.from_numpy(x_test), torch.from_numpy(y_test))
      predictions_adv = my_ensemble_classifier.predict(x_test_adv)
      predictions_nat = my_ensemble_classifier.predict(x_test)
      accuracy_adv = np.sum(np.argmax(predictions_adv, axis=1) == y_test) / len(y_test)
      accuracy_nat = np.sum(np.argmax(predictions_nat, axis=1) == y_test) / len(y_test)
      Acc_adv.append(accuracy_adv)
      Acc_nat.append(accuracy_nat)
      logging.info('accuracy (adversarial): {}'.format(accuracy_adv))
      logging.info('accuracy (natural): {}'.format(accuracy_nat))
    
    Acc_adv = np.asanyarray(Acc_adv)
    Acc_nat = np.asanyarray(Acc_nat)
    idx = np.arange(0, len(testset)/test_batchsize).tolist()
    res = {'Id': idx ,'AccADV': Acc_adv, 'AccNAT': Acc_nat} 
    df = pd.DataFrame(res) 
    # saving the dataframe 
    df.to_csv('./gdrive/My Drive/IDL_Project/results/PGD_exp2_N{}_eps{}.csv'.format(num_selected_models, eps),index=False)

# Experiment 3

In [None]:
num_selected_models = 15
norm = 2
infer_noise_list = [0.0025, 0.005, 0.01, 0.05, 0.08, 0.1]
eps_list = [0.1, 0.5, 0.8, 0.9, 1]

for infer_noise in infer_noise_list:
  for eps in eps_list:
    my_ensemble_classifier = MyEnsembleClassifier(classifier_list,
                                                  device, 
                                                  infer_noise=infer_noise,
                                                  num_selected_models=num_selected_models,
                                                  clip_values=[0., 1.], 
                                                  channels_first=True)
    attack = ProjectedGradientDescentPyTorch(my_ensemble_classifier, eps=eps, eps_step=eps/3, norm=norm)
    Acc_adv = []
    Acc_nat = []
    for batch_idx, (X, Y) in enumerate(testloader):
      x_test = X.numpy()
      y_test = Y.numpy()
      x_test_adv = attack.generate(torch.from_numpy(x_test), torch.from_numpy(y_test))
      predictions_adv = my_ensemble_classifier.predict(x_test_adv)
      predictions_nat = my_ensemble_classifier.predict(x_test)
      accuracy_adv = np.sum(np.argmax(predictions_adv, axis=1) == y_test) / len(y_test)
      accuracy_nat = np.sum(np.argmax(predictions_nat, axis=1) == y_test) / len(y_test)
      Acc_adv.append(accuracy_adv)
      Acc_nat.append(accuracy_nat)
      logging.info('accuracy (adversarial): {}'.format(accuracy_adv))
      logging.info('accuracy (natural): {}'.format(accuracy_nat))
    
    Acc_adv = np.asanyarray(Acc_adv)
    Acc_nat = np.asanyarray(Acc_nat)
    idx = np.arange(0, len(testset)/test_batchsize).tolist()
    res = {'Id': idx ,'AccADV': Acc_adv, 'AccNAT': Acc_nat} 
    df = pd.DataFrame(res) 
    # saving the dataframe 
    df.to_csv('./gdrive/My Drive/IDL_Project/results/PGD_exp3_infer_noise{}_eps{}.csv'.format(infer_noise, eps),index=False)

# Experiment 4

In [None]:
num_selected_models_list = [5, 10, 15, 20, 25, 30, 35, 40]
norm = 2
infer_noise = 0.1
eps_list = [0.1, 0.5, 0.8, 0.9, 1]

for num_selected_models in num_selected_models_list:
  for eps in eps_list:
    my_ensemble_classifier = MyEnsembleClassifier(classifier_list,
                                                  device, 
                                                  infer_noise=infer_noise,
                                                  num_selected_models=num_selected_models,
                                                  clip_values=[0., 1.], 
                                                  channels_first=True)
    attack = ProjectedGradientDescentPyTorch(my_ensemble_classifier, eps=eps, eps_step=eps/3, norm=norm)
    Acc_adv = []
    Acc_nat = []
    for batch_idx, (X, Y) in enumerate(testloader):
      x_test = X.numpy()
      y_test = Y.numpy()
      x_test_adv = attack.generate(torch.from_numpy(x_test), torch.from_numpy(y_test))
      predictions_adv = my_ensemble_classifier.predict(x_test_adv)
      predictions_nat = my_ensemble_classifier.predict(x_test)
      accuracy_adv = np.sum(np.argmax(predictions_adv, axis=1) == y_test) / len(y_test)
      accuracy_nat = np.sum(np.argmax(predictions_nat, axis=1) == y_test) / len(y_test)
      Acc_adv.append(accuracy_adv)
      Acc_nat.append(accuracy_nat)
      logging.info('accuracy (adversarial): {}'.format(accuracy_adv))
      logging.info('accuracy (natural): {}'.format(accuracy_nat))
    
    Acc_adv = np.asanyarray(Acc_adv)
    Acc_nat = np.asanyarray(Acc_nat)
    idx = np.arange(0, len(testset)/test_batchsize).tolist()
    res = {'Id': idx ,'AccADV': Acc_adv, 'AccNAT': Acc_nat} 
    df = pd.DataFrame(res) 
    # saving the dataframe 
    df.to_csv('./gdrive/My Drive/IDL_Project/results/PGD_exp4_N{}_eps{}.csv'.format(num_selected_models, eps),index=False)

# FGM Attack

In [None]:
attack = FastGradientMethod(estimator=my_ensemble_classifier, eps=0.5, targeted=False, norm=2)
Acc = []
for batch_idx, (X, Y) in enumerate(testloader):
    x_test = X.numpy()
    y_test = Y.numpy()
    # target = (y_test + 1) % 10
    x_test_adv = attack.generate(torch.from_numpy(x_test), torch.from_numpy(y_test))
    predictions = my_ensemble_classifier.predict(x_test_adv)
    predictions_test = my_ensemble_classifier.predict(x_test)
    accuracy = np.sum(np.argmax(predictions, axis=1) == y_test) / len(y_test)
    acc_test = np.sum(np.argmax(predictions_test, axis=1) == y_test) / len(y_test)
    # print(y_test.shape)
    # acc_rate, coverage_rate = compute_accuracy(preds=predictions, labels=np.reshape(y_test, (test_batchsize, 1)))
    # logging.info('acc rate: {}, coverage rate: {}'.format(acc_rate, coverage_rate))
    Acc.append(accuracy)
    logging.info('accuracy: {}'.format(accuracy))
    logging.info('accuracy non adv: {}'.format(acc_test))

# PGD Attack

In [None]:
attack = ProjectedGradientDescentPyTorch(my_ensemble_classifier, eps=0.5, eps_step=0.03)
Acc = []
for batch_idx, (X, Y) in enumerate(testloader):
    x_test = X.numpy()
    y_test = Y.numpy()
    x_test_adv = attack.generate(torch.from_numpy(x_test))
    predictions = my_ensemble_classifier.predict(x_test_adv)
    accuracy = np.sum(np.argmax(predictions, axis=1) == np.array(y_test)) / len(y_test)
    Acc.append(accuracy)
    logging.info('accuracy: {}'.format(accuracy))

# Experiment settings

In [None]:
fgm_epsilon = np.array([0.005, 0.01, 0.02, 0.04, 0.08])
pgd_epsilon = 8.0 / 255.0 / np.array([2.0, 4.0, 8.0, 16.0])
diff = np.array([1e-3, 1e-4, 1e-5, 1e-6])
pgd_epsilon_step = pgd_epsilon - diff

Experiment on inference noise

In [None]:
noise_stds = [0.02, 0.04, 0.06, 0.08, 0.1]
for i in range(len(noise_stds)):
  my_ensemble = MyEnsemble(model_dict, num_models_selected, noise_std=noise_stds[i], num_classes=10)
  classifier = PyTorchClassifier(
      model=my_ensemble,
      clip_values=(0, 1),
      loss=nn.CrossEntropyLoss(),
      input_shape=(3, 224, 224),
      nb_classes=10)
  attack = FastGradientMethod(estimator=classifier, eps=0.01)
  Acc = []
  for batch_idx, (X, Y) in enumerate(testloader):
    x_test = X.numpy()
    y_test = Y.numpy()
    x_test_adv = attack.generate(torch.from_numpy(x_test))
    predictions = classifier.predict(x_test_adv)
    accuracy = np.sum(np.argmax(predictions, axis=1) == np.array(y_test)) / len(y_test)
    Acc.append(accuracy)
    logging.info('accuracy: {}'.format(accuracy))
  
  logging.info("Accuracy for eps={}: {}".format(fgm_epsilon[i], np.mean(np.array(Acc))))

  pred_list = np.asanyarray(Acc)
  idx = np.arange(0,len(testset)/test_batchsize).tolist()
  dict = {'Id': idx ,'acc': pred_list} 
  df = pd.DataFrame(dict) 
  # saving the dataframe 
  df.to_csv('./gdrive/My Drive/IDL_Project/results/FGM_result_noise{}_{}.csv'.format(noise_stds[i], datetime.datetime.now()),index=False)

In [None]:
pred_list = np.asanyarray(Acc)
idx = np.arange(0,len(testset)/test_batchsize).tolist()
dict = {'Id': idx ,'acc': pred_list} 
df = pd.DataFrame(dict) 
# saving the dataframe 
df.to_csv('./gdrive/My Drive/IDL_Project/results/FGM_result_eps{}_{}.csv'.format(fgm_epsilon[-1], datetime.datetime.now()),index=False)

Plot PGD adversarial examples

In [None]:
attack = ProjectedGradientDescentPyTorch(estimator=classifier, eps=pgd_epsilon[0], eps_step=pgd_epsilon_step[0])
Acc = []
adv = []
orig = []
labels = []
pred = []
for batch_idx, (X, Y) in enumerate(testloader):
  if batch_idx > 0:
    break
  x_test = X.numpy()
  y_test = Y.numpy()
  x_test_adv = attack.generate(torch.from_numpy(x_test))
  predictions = classifier.predict(x_test_adv)
  accuracy = np.sum(np.argmax(predictions, axis=1) == np.array(y_test)) / len(y_test)
  Acc.append(accuracy)
  adv.append(x_test_adv)
  orig.append(x_test)
  labels.append(y_test)
  pred.append(predictions)
  logging.info('accuracy: {}'.format(accuracy))

In [None]:
example_adv = adv[-1]
example_orig = orig[-1]
label = labels[-1]
predi = np.argmax(predictions, axis=1)

In [None]:
print(label)
print(predi)

In [None]:
idx = 2
selected_adv = example_adv[idx]
selected_orig = example_orig[idx]
selected_lab = label[idx]
selected_pred = predi[idx]

In [None]:
print(selected_lab)
print(selected_pred)

In [None]:
from PIL import Image
import matplotlib.pyplot as plt

plt.imshow(np.transpose(selected_orig, (1, 2, 0)))

In [None]:
plt.imshow(np.transpose(selected_adv, (1, 2, 0)))

In [None]:
plt.imshow(10*(np.transpose(selected_orig, (1, 2, 0)) - np.transpose(selected_adv, (1, 2, 0))))

Experiment PGD attacks

In [None]:
for i in range(len(pgd_epsilon)):
  attack = FastGradientMethod(estimator=classifier, eps=fgm_epsilon[i])
  Acc = []
  for batch_idx, (X, Y) in enumerate(testloader):
    x_test = X.numpy()
    y_test = Y.numpy()
    x_test_adv = attack.generate(torch.from_numpy(x_test))
    predictions = classifier.predict(x_test_adv)
    accuracy = np.sum(np.argmax(predictions, axis=1) == np.array(y_test)) / len(y_test)
    Acc.append(accuracy)
    logging.info('accuracy: {}'.format(accuracy))
  
  logging.info("Accuracy for eps={}: {}".format(fgm_epsilon[i], np.mean(np.array(Acc))))

  pred_list = np.asanyarray(Acc)
  idx = np.arange(0,len(testset)/test_batchsize).tolist()
  dict = {'Id': idx ,'acc': pred_list} 
  df = pd.DataFrame(dict) 
  # saving the dataframe 
  df.to_csv('./gdrive/My Drive/IDL_Project/results/FGM_result_eps{}_{}.csv'.format(fgm_epsilon[i], datetime.datetime.now()),index=False)