In [None]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import torchvision
import os
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import random
import copy
import logging
import torchvision.datasets as dset
from torch.autograd import Variable

In [None]:
# Modified AlexNet architecture from Krizhevsky et al. in 2012 
# http://www.cs.toronto.edu/~kriz/imagenet_classification_with_deep_convolutional.pdf
class MNIST_target_net(nn.Module):
    def __init__(self):
        super(MNIST_target_net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3)

        self.fc1 = nn.Linear(64*4*4, 200)
        self.fc2 = nn.Linear(200, 200)
        self.logits = nn.Linear(200, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, 2)
        x = x.view(-1, 64*4*4)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, 0.5)
        x = F.relu(self.fc2(x))
        x = self.logits(x)
        return x

In [None]:
# Core AdvGAN model from Xiao et al. in 2019 - https://arxiv.org/pdf/1801.02610.pdf

# Implements discriminator model which takes in the generated adversarial sample and outputs the probability that it is real.
# This gives the generator feedback via backprop and results in more realistic adversarial samples
class Discriminator(nn.Module):
    def __init__(self, image_nc):
        super(Discriminator, self).__init__()
        self.conv1 = nn.Conv2d(image_nc, 8, kernel_size=4, stride=2, padding=0, bias=True)
        self.conv2 = nn.Conv2d(8, 16, kernel_size=4, stride=2, padding=0, bias=True)
        self.conv3 = nn.Conv2d(16, 32, kernel_size=4, stride=2, padding=0, bias=True)
        self.conv4 = nn.Conv2d(32, 1, 1)
        # MNIST: 1*28*28
        model = [
            nn.Conv2d(image_nc, 8, kernel_size=4, stride=2, padding=0, bias=True),
            nn.LeakyReLU(0.2),
            # 8*13*13
            nn.Conv2d(8, 16, kernel_size=4, stride=2, padding=0, bias=True),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.2),
            # 16*5*5
            nn.Conv2d(16, 32, kernel_size=4, stride=2, padding=0, bias=True),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2),
            nn.Conv2d(32, 1, 1),
            nn.Sigmoid()
            # 32*1*1
        ]
        self.model = nn.Sequential(*model)

    def forward(self, x):
        output = self.model(x).squeeze()
        return output

# Implements generator model which takes in a random noise vector and creates an image resembling the data set
# Takes feedback from the discriminator in order to generate more realistic samples
class Generator(nn.Module):
    def __init__(self,
                 gen_input_nc,
                 image_nc,
                 ):
        super(Generator, self).__init__()

        encoder_lis = [
            # MNIST:1*28*28
            nn.Conv2d(gen_input_nc, 8, kernel_size=3, stride=1, padding=0, bias=True),
            nn.InstanceNorm2d(8),
            nn.ReLU(),
            # 8*26*26
            nn.Conv2d(8, 16, kernel_size=3, stride=2, padding=0, bias=True),
            nn.InstanceNorm2d(16),
            nn.ReLU(),
            # 16*12*12
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=0, bias=True),
            nn.InstanceNorm2d(32),
            nn.ReLU(),
            # 32*5*5
        ]

        bottle_neck_lis = [ResnetBlock(32),
                       ResnetBlock(32),
                       ResnetBlock(32),
                       ResnetBlock(32),]

        decoder_lis = [
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=0, bias=False),
            nn.InstanceNorm2d(16),
            nn.ReLU(),
            # state size. 16 x 11 x 11
            nn.ConvTranspose2d(16, 8, kernel_size=3, stride=2, padding=0, bias=False),
            nn.InstanceNorm2d(8),
            nn.ReLU(),
            # state size. 8 x 23 x 23
            nn.ConvTranspose2d(8, image_nc, kernel_size=6, stride=1, padding=0, bias=False),
            nn.Tanh()
            # state size. image_nc x 28 x 28
        ]


        self.encoder = nn.Sequential(*encoder_lis)
        self.bottle_neck = nn.Sequential(*bottle_neck_lis)
        self.decoder = nn.Sequential(*decoder_lis)

    def forward(self, x):
        x = self.encoder(x)
        x = self.bottle_neck(x)
        x = self.decoder(x)
        return x

# ResnetBlock implementation from https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py
# In a network with residual blocks, each layer feeds into the next layer as well as a few layers beyond

class ResnetBlock(nn.Module):
    def __init__(self, dim, padding_type='reflect', norm_layer=nn.BatchNorm2d, use_dropout=False, use_bias=False):
        super(ResnetBlock, self).__init__()
        self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias)

    def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias):
        conv_block = []
        p = 0
        if padding_type == 'reflect':
            conv_block += [nn.ReflectionPad2d(1)]
        elif padding_type == 'replicate':
            conv_block += [nn.ReplicationPad2d(1)]
        elif padding_type == 'zero':
            p = 1
        else:
            raise NotImplementedError('padding [%s] is not implemented' % padding_type)

        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias),
                       norm_layer(dim),
                       nn.ReLU(True)]
        if use_dropout:
            conv_block += [nn.Dropout(0.5)]

        p = 0
        if padding_type == 'reflect':
            conv_block += [nn.ReflectionPad2d(1)]
        elif padding_type == 'replicate':
            conv_block += [nn.ReplicationPad2d(1)]
        elif padding_type == 'zero':
            p = 1
        else:
            raise NotImplementedError('padding [%s] is not implemented' % padding_type)

        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias),
                       norm_layer(dim)]

        return nn.Sequential(*conv_block)

    def forward(self, x):
        out = x + self.conv_block(x)
        return out

In [None]:
# Prepares training and testing data
root = './data'
if not os.path.exists(root):
    os.mkdir(root)
#trans = transforms.ToTensor()
trans = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])
train_set = dset.MNIST(root=root, train=True, transform=transforms.ToTensor(), download=True)
test_set = dset.MNIST(root=root, train=False, transform=transforms.ToTensor(), download=True)
train_dataloader = torch.utils.data.DataLoader(
                 dataset=train_set,
                 batch_size=128,
                 shuffle=True, num_workers = 0)
test_dataloader = torch.utils.data.DataLoader(
                dataset=test_set,
                shuffle=False, num_workers = 0)

In [None]:
# training the target model
target_model = MNIST_target_net()
target_model.train()
optim = torch.optim.SGD(target_model.parameters(), lr=1e-3)
epochs = 3
for epoch in range(epochs):
    loss_epoch = 0
    for i, data in enumerate(train_dataloader, 0):
      train_imgs, train_labels = data
      train_imgs, train_labels = train_imgs, train_labels
      print(train_labels)
      target_model.eval()
      logits_model = target_model(train_imgs)
      target_model.train()
      print(torch.argmax(logits_model,1))
      loss_model = F.cross_entropy(logits_model, train_labels)
      print(loss_model)
      loss_epoch += loss_model
      loss_model.backward()
      optim.step()
      optim.zero_grad()
      print('batch done')
    print('loss in epoch %d: %f' % (epoch, loss_epoch.item()))



In [None]:
# loads weights of trained target model
pretrained_model = 'MNIST_target_model.pth'
target_model = MNIST_target_net()
target_model.load_state_dict(torch.load(pretrained_model))
target_model.eval()

In [None]:
# tests trained model on testing data as a baseline
target_model.train(False)
target_model.to('cpu')
num_correct = 0
for i, data in enumerate(test_dataloader, 0):
        test_img, test_label = data
        pred_lab = torch.argmax(target_model(test_img), 1)
        num_correct += torch.sum(pred_lab==test_label,0)
        print(test_label)
        print(pred_lab)

print('accuracy in testing set: %f\n'%(num_correct.item()/len(test_dataloader)))

In [None]:
# Model mutation testing approach from Wang et al. in 2019 - https://arxiv.org/pdf/1812.05793.pdf

def mut_test(model, test_loader, verbose=False,device=torch.cuda.get_device_name(0)):
      model = model.to(device)
      model.eval()
      test_loss = 0
      correct = 0
      batch_size = 128
      data_size = len(test_loader.dataset)
      time_count = []
      with torch.no_grad():
          for data, target in test_loader:
              data, target = data.to(device), target.to(device)
              output = model(data)
              test_loss += F.nll_loss(output, target, size_average=False).item()  # sum up batch loss
              pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
              if verbose:
                  sys.stdout.write('\r progress:{:.2f}%'.format((1.*batch_size*progress*100)/data_size))

      test_loss /= len(test_loader.dataset)
      acc = 1. * correct / len(test_loader.dataset)

      if verbose:
          print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.1f}%) time:{:.6f}\n'.format(
              test_loss, correct, len(test_loader.dataset),
              100 * acc, np.average(time_count)))
      return acc

# defines mutation operators
# NAI - neuron activation inverse
# GF - gaussian fuzzing
# WS - weight shuffling
# NS - neuron switch

class OpType(object):
    NAI = 'NAI'
    GF  = 'GF'
    WS = 'WS'
    NS = 'NS'


class MutationOperator(object):
    def __init__(self, ration, model, acc_tolerant=0.90, verbose=True,test=True,test_data_loader=None,device='cpu'):
        '''
        :param ration:
        :param model:
        :param acc_tolerant:
        :param verbose: print the mutated detail or not. like the number of weights to be mutated with layer
        :param test:
        '''
        self.ration = ration
        self.original_model = model.to(device)
        self.verbose = verbose
        self.test_data_loader = test_data_loader
        self.device = device

        if test:
            premier_acc = mut_test(self.original_model, self.test_data_loader,device=self.device)
            logging.info('orginal model acc={0}'.format(premier_acc))
            self.acc_threshold = round(premier_acc * acc_tolerant, 2)
            logging.info('acc_threshold:{}%'.format(100 * self.acc_threshold))
            
    # weight level operator which fuzzes the weights using a gaussian distribution
    
    def gaussian_fuzzing(self, std=None):
        mutation_model = copy.deepcopy(self.original_model)
        num_weights = 0
        num_layers = 0  
        std_layers = [] 
        for param in mutation_model.parameters():
            num_weights += (param.data.view(-1)).size()[0]
            num_layers += 1
            std_layers.append(param.data.std().item())

        indices = np.random.choice(num_weights, int(num_weights * self.ration), replace=False)
        weights_count = 0
        for idx_layer, param in enumerate(mutation_model.parameters()):
            shape = param.data.size()
            num_weights_layer = (param.data.view(-1)).size()[0]
            mutated_indices = set(indices) & set(
                np.arange(weights_count, weights_count + num_weights_layer))

            if mutated_indices:
                mutated_indices = np.array(list(mutated_indices))
                mutated_indices = mutated_indices - weights_count

                current_weights = param.data.cpu().view(-1).numpy()
                
                avg_weights = np.mean(current_weights)
                current_std = std if std else std_layers[idx_layer]
                mutated_weights = np.random.normal(avg_weights, current_std, mutated_indices.size)

                current_weights[mutated_indices] = mutated_weights
                new_weights = torch.Tensor(current_weights).reshape(shape)
                param.data = new_weights.to(self.device)

        return mutation_model

    # shuffles the weights of randomly selecting neuron slices in a layer
    
    def ws(self):
        unique_neurons = 0
        mutation_model = copy.deepcopy(self.original_model)
        for param in mutation_model.parameters():
            shape = param.size()
            dim = len(shape)
            if dim > 1:
                unique_neurons += shape[0]

        indices = np.random.choice(unique_neurons, int(unique_neurons * self.ration), replace=False)
        neurons_count = 0
        for idx_layer, param in enumerate(mutation_model.parameters()):
            shape = param.size()
            dim = len(shape)
            if dim > 1:
                unique_neurons_layer = shape[0]
                mutated_neurons = set(indices) & set(np.arange(neurons_count, neurons_count + unique_neurons_layer))
                if mutated_neurons:
                    mutated_neurons = np.array(list(mutated_neurons)) - neurons_count
                    for neuron in mutated_neurons:
                        ori_shape = param.data[neuron].size()
                        old_data = param.data[neuron].view(-1).cpu().numpy()
                        shuffle_idx = np.arange(len(old_data))
                        np.random.shuffle(shuffle_idx)
                        new_data = old_data[shuffle_idx]
                        new_data = torch.Tensor(new_data).reshape(ori_shape)
                        param.data[neuron] = new_data.to(self.device)
                neurons_count += unique_neurons_layer

        return mutation_model

    # switches two random neurons in a layer
    
    def ns(self, skip=10):
        unique_neurons = 0
        mutation_model = copy.deepcopy(self.original_model)
        for idx_layer, param in enumerate(mutation_model.parameters()):
            shape = param.size()
            dim = len(shape)
            unique_neurons_layer = shape[0]
            if dim > 1 and unique_neurons_layer >= skip:
                import math
                temp = unique_neurons_layer * self.ration
                num_mutated = math.floor(temp) if temp > 2. else math.ceil(temp)
                mutated_neurons = np.random.choice(unique_neurons_layer,
                                                   int(num_mutated), replace=False)
                switch = copy.copy(mutated_neurons)
                np.random.shuffle(switch)
                param.data[mutated_neurons] = param.data[switch]
        return mutation_model

    # inverts the activation status of a neuron by changing the sign of its output value
    
    def nai(self):
        unique_neurons = 0
        mutation_model = copy.deepcopy(self.original_model)
        for param in mutation_model.parameters():
            shape = param.size()
            dim = len(shape)
            if dim > 1:
                unique_neurons += shape[0]
        indices = np.random.choice(unique_neurons, int(unique_neurons * self.ration), replace=False)
        neurons_count = 0
        last_mutated_neurons = []
        for idx_layer, param in enumerate(mutation_model.parameters()):
            shape = param.size()
            dim = len(shape)
            if dim > 1:
                unique_neurons_layer = shape[0]
                mutated_neurons = set(indices) & set(np.arange(neurons_count, neurons_count + unique_neurons_layer))
                if mutated_neurons:
                    mutated_neurons = np.array(list(mutated_neurons)) - neurons_count
                    param.data[mutated_neurons] = -1 * param.data[mutated_neurons]
                    last_mutated_neurons = mutated_neurons
                neurons_count += unique_neurons_layer
            else:
                param.data[last_mutated_neurons] = -1 * param.data[last_mutated_neurons]
                last_mutated_neurons = []

        return mutation_model

    # chooses activation function
    
    def afr(self, act_type):
        '''
        :param act_type: the type of activation func
        :return:
        '''
        model = copy.deepcopy(self.original_model)
        ActFun = nn.ReLU if act_type == 'relu' else nn.ELU

        num_actlayers = 0
        for module in model.modules():
            if isinstance(module, ActFun):
                num_actlayers += 1

        if num_actlayers == 0:
            raise Exception('No [{}] layer found'.format(ActFun))

        temp = num_actlayers * self.ration
        num_remove = 1 if temp < 1 else math.floor(temp)
        num_remove = int(num_remove)
        idces_remove = np.random.choice(num_actlayers, num_remove, replace=False)
        print('>>>>>>>idces_remove:{}'.format(idces_remove))
        idx = 0
        for name, module in model.named_children():
            if isinstance(module, nn.ReLU):
                if idx in idces_remove:
                    model.__delattr__(name)
                idx += 1
            else:
                for grand_name, child in module.named_children():
                    if isinstance(child, nn.ReLU):
                        if idx in idces_remove:
                            module.__delattr__(grand_name)
                        idx += 1
        print(model)
        return model
      
    # determines whether the mutated model is qualified or not

    def __is_qualified(self, mutated_model):
        acc = mut_test(mutated_model, self.test_data_loader,device=self.device)
        if round(acc,2) < self.acc_threshold:
            logging.info('Warning: bad accurate {0},reproduce mutated model'.format(acc))
            return False
        logging.info('Mutated model: accurate {0}'.format(acc))
        return True

    # assures that the accuracy of the mutant model satifies the specified thresholds and returns the qualified model
    
    def filter(self, f, **kwargs):
        qualified = False
        while not qualified:
            mutated_model = f(**kwargs)
            qualified = self.__is_qualified(mutated_model)
        return mutated_model


In [None]:
# specify training parameters
image_nc=1
epochs = 60
batch_size = 128
BOX_MIN = 0
BOX_MAX = 1
model_num_labels = 10

In [None]:
# custom weight initialization used in previous AdvGAN implementation to assure fairness
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
# Same training procedure from AdvGAB implementation in https://github.com/mathcbc/advGAN_pytorch/blob/master/advGAN.py

class Mutating_GAN_Attack:
    def __init__(self,
                 model,
                 device,
                 model_num_labels,
                 image_nc,
                 box_min,
                 box_max,
                 train_dataloader):
        output_nc = image_nc
        self.model_num_labels = model_num_labels
        self.device = device
        self.input_nc = image_nc
        self.output_nc = output_nc
        self.box_min = box_min
        self.box_max = box_max
        self.gen_input_nc = image_nc
        self.netG = Generator(self.gen_input_nc, image_nc).to(device)
        self.netG.to('cuda')
        self.netDisc = Discriminator(image_nc).to(device)
        self.netDisc.to('cuda')
        self.classifier = model
        self.classifier.eval()
        self.dataloader = train_dataloader

        self.netG.apply(weights_init)
        self.netDisc.apply(weights_init)
        self.operator = operator = MutationOperator(ration=0.2, model=self.classifier, test=False, test_data_loader = self.dataloader)

        self.optimizer_G = torch.optim.Adam(self.netG.parameters(),
                                            lr=0.001)
        self.optimizer_D = torch.optim.Adam(self.netDisc.parameters(),
                                            lr=0.001)
        self.trans = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
        
    def train_batch(self, x, labels, mutate):
        # optimize D
        for i in range(1):
            perturbation = self.netG(x)

            # clipping trick used in https://github.com/mathcbc/advGAN_pytorch/blob/master/advGAN.py
            adv_images = torch.clamp(perturbation, -0.3, 0.3) + x
            adv_images = torch.clamp(adv_images, self.box_min, self.box_max)
            adv_images_norm = torch.sub(adv_images, 0.1307)
            adv_images_norm = torch.div(adv_images_norm, 0.3081)
            self.optimizer_D.zero_grad()
            pred_real = self.netDisc(x)
            loss_D_real = F.mse_loss(pred_real, torch.ones_like(pred_real, device=self.device))
            loss_D_real.backward()

            pred_fake = self.netDisc(adv_images.detach())
            loss_D_fake = F.mse_loss(pred_fake, torch.zeros_like(pred_fake, device=self.device))
            loss_D_fake.backward()
            loss_D_GAN = loss_D_fake + loss_D_real
            self.optimizer_D.step()


        for i in range(1):
            self.optimizer_G.zero_grad()
            pred_fake = self.netDisc(adv_images)
            loss_G_fake = F.mse_loss(pred_fake, torch.ones_like(pred_fake, device=self.device))
            loss_G_fake.backward(retain_graph=True)


            C = 0.1
            loss_perturb = torch.mean(torch.norm(perturbation.view(perturbation.shape[0], -1), 2, dim=1))

            self.classifier.to('cuda')
            logits_model = self.classifier(adv_images_norm)
            probs_model = F.softmax(logits_model, dim=1)
            onehot_labels = torch.eye(self.model_num_labels, device=self.device)[labels]

            real = torch.sum(onehot_labels * probs_model, dim=1)
            other, _ = torch.max((1 - onehot_labels) * probs_model - onehot_labels * 10000, dim=1)
            zeros = torch.zeros_like(other)
            loss_adv = torch.max(real - other, zeros)
            loss_adv = torch.sum(loss_adv)
            #custom mutant loss
            if mutate:
                with torch.no_grad():
                    labels = self.classifier(adv_images)
                mutate_model = self.operator.ws()  
                mutate_model.to('cuda')
                with torch.no_grad():
                    mutant_labels = mutate_model(adv_images)
                mutate_loss = F.mse_loss(labels, mutant_labels)
            else:
                mutate_loss = 0
            #loss_adv = -F.mse_loss(logits_model, onehot_labels)
            #loss_adv = - F.cross_entropy(logits_model, labels)
            adv_lambda = 10
            pert_lambda = 1
            mutate_lambda = 5
            loss_G = adv_lambda * loss_adv + pert_lambda * loss_perturb + mutate_lambda * mutate_loss
            loss_G.backward()
            self.optimizer_G.step()
        return loss_D_GAN.item(), loss_G_fake.item(), loss_perturb.item(), loss_adv.item(), mutate_loss

    def train(self, epochs):
        for epoch in range(1, epochs+1):

            if epoch == 50:
                self.optimizer_G = torch.optim.Adam(self.netG.parameters(),
                                                    lr=0.0001)
                self.optimizer_D = torch.optim.Adam(self.netDisc.parameters(),
                                                    lr=0.0001)
            if epoch == 80:
                self.optimizer_G = torch.optim.Adam(self.netG.parameters(),
                                                    lr=0.00001)
                self.optimizer_D = torch.optim.Adam(self.netDisc.parameters(),
                                                    lr=0.00001)
            loss_D_sum = 0
            loss_G_fake_sum = 0
            loss_perturb_sum = 0
            loss_adv_sum = 0
            loss_mutate_sum = 0
            for i, data in enumerate(self.dataloader, start=0):
                images, labels = data
                images, labels = images.to(self.device), labels.to(self.device)
                if epoch > 60:
                    loss_D_batch, loss_G_fake_batch, loss_perturb_batch, loss_adv_batch, mutate_loss_batch = \
                        self.train_batch(images, labels, True)
                else:
                    loss_D_batch, loss_G_fake_batch, loss_perturb_batch, loss_adv_batch, mutate_loss_batch = \
                        self.train_batch(images, labels, False)
                loss_D_sum += loss_D_batch
                loss_G_fake_sum += loss_G_fake_batch
                loss_perturb_sum += loss_perturb_batch
                loss_adv_sum += loss_adv_batch
                loss_mutate_sum += mutate_loss_batch

            # print statistics
            num_batch = len(self.dataloader)
            print("epoch %d:\nloss_D: %.3f, loss_G_fake: %.3f,\
             \nloss_perturb: %.3f, loss_adv: %.3f, \n, loss_mutate: %.3f, \n" %
                  (epoch, loss_D_sum/num_batch, loss_G_fake_sum/num_batch,
                   loss_perturb_sum/num_batch, loss_adv_sum/num_batch, loss_mutate_sum/num_batch))

            # save generator
            if epoch%2==0:
                netG_file_name = 'netG_WS_epoch_' + str(epoch) + '.pth'
                torch.save(self.netG.state_dict(), netG_file_name)

In [None]:
# trains MGAN
train_dataloader = torch.utils.data.DataLoader(
                 dataset=train_set,
                 batch_size=32,
                 shuffle=False, num_workers = 0)
target_model = target_model.to('cuda')
device = torch.device("cuda")
MGAN = Mutating_GAN_Attack(target_model,
                          device,
                          model_num_labels,
                          image_nc,
                          BOX_MIN,
                          BOX_MAX,
                          train_dataloader)
MGAN.train(epochs)

In [None]:
use_cuda=True
image_nc=1
batch_size = 128

gen_input_nc = image_nc

print("CUDA Available: ",torch.cuda.is_available())
device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")

# load the generator of adversarial examples
pretrained_generator_path = 'netG_epoch_62.pth'
pretrained_G = Generator(gen_input_nc, image_nc)
pretrained_G.load_state_dict(torch.load(pretrained_generator_path))

In [None]:
# tests the accuracy of MGAN

device = torch.device("cuda")
test_dataloader = torch.utils.data.DataLoader(
                dataset=test_set,
                shuffle=False, num_workers = 0)
num_correct = 0
for i, data in enumerate(test_dataloader, 0):
    test_img, test_label = data
    test_img, test_label = test_img.to(device), test_label.to(device)
    pretrained_G.to('cuda')
    perturbation = pretrained_G(test_img)
    adv_images = perturbation + test_img
    perturbation = torch.clamp(perturbation, -0.3, 0.3)
    adv_images = torch.clamp(adv_images, 0,1)
    operator = MutationOperator(ration=0.001, model=target_model, test=False, test_data_loader = test_dataloader)
    mutate_model = operator.nai()  
    mutate_model.to('cuda')
    target_model.to('cuda')
    mutant_label = torch.argmax(mutate_model(test_img),1)
    pred_lab = torch.argmax(target_model(test_img),1)
    num_correct += torch.sum(mutant_label==pred_lab,0)
    adv_images=torch.Tensor.cpu(adv_images).detach().numpy()[-1,-1,:,:]
    if i % 1000 == 0:
        print('done')
        #fig = plt.figure()
        #ax = plt.subplot(111)
        #ax.imshow(adv_images, cmap = 'gray')
        #plt.title('predicted: %d, actual: %d' %(pred_lab, test_label))
        #fig.savefig('adv_img %d' %(i))
print('num_correct: ', num_correct.item())
print('accuracy of adv imgs in testing set: %f\n'%(num_correct.item()/10000))

In [None]:
# calculate label change rate of adversarial samples as a measure of sensitivity

num_incorrect = 0
for i, data in enumerate(test_dataloader, 0):
    test_img, test_label = data
    perturbation = pretrained_G(test_img)
    perturbation = torch.clamp(perturbation, -0.3, 0.3)
    adv_img = perturbation + test_img
    adv_img = torch.clamp(adv_img, 0, 1)
    pred_mutant = torch.argmax(mutate_model(adv_img),1)
    pred_lab = torch.argmax(target_model(adv_img),1)
    num_incorrect += torch.sum(pred_lab!=pred_mutant,0)
print('Label change rate of adversarial samples: %f\n'%(num_correct.item()/len(test_dataloader)))

True
