In [1]:
import matplotlib.pylab as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import glob
import numpy as np
import imageio
import itertools
import foolbox as fb
import foolbox.ext.native as fbn
import time
import torchvision

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
# initialisation attack from existing validation images
import eagerpy as ep
import numpy as np

class PrecomputedSamplesAttack:
    """This is a helper attack that makes it straight-forward to choose initialisation points
       for boundary-type attacks from a given data set. All it does is to store samples and
       the predicted responses of a given model in order to select suitable adversarial images
       from the given data set.
    """

    def __init__(self, model):
        self.model = model
        self.samples = []
        self.labels = []
        
    def feed(self, inputs):
        response = self.model.forward(inputs).argmax(1)
        
        for k in range(len(inputs)):
            self.labels.append(int(response[k]))
            self.samples.append(inputs[k])

    def __call__(self, inputs, labels):
        inputs = ep.astensor(inputs)
        labels = ep.astensor(labels)
        x = ep.zeros_like(inputs)
        
        for k in range(len(labels)):
            while True:
                idx = np.random.randint(len(self.labels))
                if int(labels[k].numpy()) != self.labels[idx]:
                    x.tensor[k] = self.samples[idx]
                    break
        
        return x.tensor


In [3]:
data_dir = '../data'
device = torch.device('cuda:0') if torch.cuda.is_available() else 'cpu'

In [4]:
data = {}

metrics = ['Linf']
repetitions = 1
step_scale = 1
num_batches = 16

BB_static = {'init_attack' : 'init_attack', 'steps' : int(step_scale * 10000)}
BB_lr = [{'lr' : 1e-2}, {'lr' : 1e-1}]

L2_attacks = [(fbn.attacks.PGD,{'num_steps' : int(1000 * step_scale)}, [{'epsilon' : 8.0/255.0}])]

attacks = {'Linf' : L2_attacks}


# load model
test_batch_size = 100

transform_test = transforms.Compose([
    transforms.ToTensor(),
])

testset = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=test_batch_size, shuffle=False, num_workers=2)

Files already downloaded and verified


In [5]:
import math
import torch.nn as nn

class Net(nn.Module):
    def __init__(self, classes=10, kernel = 3, channels=3):
        super(Net, self).__init__()
        self.kernel = kernel
        self.channels = channels
        self.conv = nn.Conv2d(3,self.channels,self.kernel,1) 
        self.pad = math.ceil((32 - (32 - self.kernel + 1))/2)
        self.linear = nn.Linear(self.channels*(32 - self.kernel + 1 + self.pad*2)*(32 - self.kernel + self.pad*2 + 1),10)
        self.flat = nn.Flatten()

    def forward(self, x):
        x = torch.nn.functional.pad(x,(self.pad,self.pad,self.pad,self.pad),'circular')
        x = self.conv(x)
        x = self.flat(x)
        x = self.linear(x)
        return x
    
class Net3(nn.Module):
    def __init__(self, classes=10, kernel = 3, channels=3):
        super(Net3, self).__init__()
        self.kernel = kernel
        self.channels = channels
        
        self.conv1 = nn.Conv2d(3,self.channels,self.kernel,1) 
        self.pad1 = math.ceil((32 - (32 - self.kernel + 1))/2)
        self.output1 = int((32 - self.kernel + 1 + self.pad1*2))
        
        self.conv2 = nn.Conv2d(self.channels, self.channels, self.kernel,1) 
        self.pad2 = math.ceil((self.output1 - (self.output1 - self.kernel + 1))/2)
        self.output2 = int((self.output1 - self.kernel + 1 + self.pad2*2))
        
        self.conv3 = nn.Conv2d(self.channels, self.channels, self.kernel,1) 
        self.pad3 = math.ceil((self.output2 - (self.output2 - self.kernel + 1))/2)
        self.output3 = int((self.output2 - self.kernel + 1 + self.pad3*2))
        
        self.linear = nn.Linear(self.channels*(self.output3 - self.kernel + 1 + self.pad3*2)*(self.output3 - self.kernel + self.pad3*2 + 1),10)
        self.flat = nn.Flatten()

    def forward(self, x):
        x = torch.nn.functional.pad(x,(self.pad1,self.pad1,self.pad1,self.pad1),'circular')
        x = self.conv1(x)
        x = torch.nn.functional.pad(x,(self.pad2,self.pad2,self.pad2,self.pad2),'circular')
        x = self.conv2(x)
        x = torch.nn.functional.pad(x,(self.pad3,self.pad3,self.pad3,self.pad3),'circular')
        x = self.conv3(x)
        x = self.flat(x)
        x = self.linear(x)
        return x
    


posible_models = ['fclinearl3', 'fclinearl1','convlinearl1k3c3', 'convlinearl3k3c3','convlinearl1k11c3','convlinearl3k11c3','convlinearl1k32c3',
                 'convlinearl3k32c3','convlinearl1k3c8','convlinearl1k3c32','convlinearl3k3c8','convlinearl3k3c32']

def return_model(name='fclinearl1'):
    if name == 'fclinearl3':
        model = nn.Sequential(nn.Flatten(), nn.Linear(3*32*32,3*32*32),nn.Linear(3*32*32,3*32*32), nn.Linear(3*32*32,3*32*32), nn.Linear(3*32*32,10))
    elif name == 'fclinearl1':
        model = nn.Sequential(nn.Flatten(),nn.Linear(3*32*32,3*32*32), nn.Linear(3*32*32,10))
    elif name == 'convlinearl1k3c3':
        model = Net(kernel=3,channels=3)
    elif name == 'convlinearl3k3c3':
        model = Net3(kernel=3,channels=3)
    elif name == 'convlinearl1k11c3':
        model = Net(kernel=11,channels=3)
    elif name == 'convlinearl3k11c3':
        model = Net3(kernel=11, channels=3)
    elif name == 'convlinearl1k32c3':
        model = Net(kernel=32, channels=3)
    elif name == 'convlinearl3k32c3':
        model = Net3(kernel=32, channels=3)
    elif name == 'convlinearl1k3c8':
        model = Net(kernel=3,channels=8)
    elif name == 'convlinearl1k3c32':
        model = Net(kernel=3, channels=32)
    elif name == 'convlinearl3k3c8':
        model = Net3(kernel=3, channels=8)
    elif name == 'convlinearl3k3c32':
        model = Net3(kernel=3, channels=32)
        
    return model

In [8]:
model_name = 'convlinearl1k3c3'
resnet = return_model(model_name)
resnet = torch.nn.DataParallel(resnet)
resnet = resnet.to(device)
resnet.eval();

checkpoint = torch.load('./cifar_resnet/ckpt_{}.pth'.format(model_name))
for i in range(checkpoint['epoch']+1):
    for j in range(1,392,30):
        names = './cifar_resnet/ckpt_{}_{}_{}.pth'.format(model_name, i, j)
        saved = torch.load(names)
        resnet.load_state_dict(saved)

        #resnet.to(device)

        # init foolbox models
        fbn_model = fbn.models.PyTorchModel(resnet.module, bounds=[0.0, 1.0], device=device)
        fb_model = fb.models.PyTorchModel(resnet.module, bounds=[0.0, 1.0], device=device, num_classes=10)   

        for metric in metrics:
            for attack in attacks[metric]:
                iteration = 0
                if len(attack) == 3:
                    Attack, static_kwargs, dynamic_kwargs = attack
                    native = 'foolbox.ext.native' in Attack.__module__
                    attack = Attack(fbn_model) if native else Attack(fb_model)
                else:
                    Attack, static_kwargs, dynamic_kwargs, init_kwargs = attack
                    native = 'foolbox.ext.native' in Attack.__module__
                    attack = Attack(fbn_model) if native else Attack(fb_model, **init_kwargs)

                name = str(attack.__class__).split('.')[-1].split("'")[0]
                if metric == 'L2':
                    bbattack = fbn.attacks.L2BrendelBethgeAttack(fbn_model)

                if native:
                    model = fbn_model
                    print("Native")
                else:
                    model = fb_model
                # create init attack if necessary
                if 'init_attack' in static_kwargs.keys():
                    init_attack = PrecomputedSamplesAttack(fbn_model)

                    for batch in testloader:
                        inputs, labels = batch
                        inputs = inputs.to(device)
                        labels = labels.to(device)

                        out = init_attack.feed(inputs)

                    static_kwargs['init_attack'] = init_attack         


            # perform attack with different arguments
                img = []
                for kwarg in dynamic_kwargs:
                    kwargs = {**kwarg, **static_kwargs}
                    print(kwarg)
                    images = []
                    for b, batch in enumerate(testloader):
                        if b == 1:
                            break
                        check = time.time()
                        inputs, labels = batch
                        inputs = inputs.to(device)
                        labels = labels.to(device)
                        if not native:
                            inputs = inputs.data.cpu().numpy()
                            labels = labels.data.cpu().numpy()
                        adversarials = attack(
                            inputs,
                            labels,
                            **kwargs
                        )

                        out = model.forward(adversarials)
                        is_adv = out.argmax(1) != labels
                        out_x = model.forward(inputs)
                        is_cor = out_x.argmax(1) == labels

                        # check if adversarial
                        if native:
                            out = out.data.cpu().numpy()
                            adversarials = adversarials.data.cpu().numpy()
                            inputs = inputs.data.cpu().numpy()
                            labels = labels.data.cpu().numpy()
                            output = out.argmax(1)
                        for k in range(len(inputs)):
                            if is_adv[k] and is_cor[k]:
                                x0 = inputs[k]
                                x = adversarials[k]

                                images.append([x, x0,labels[k],output[k]])
                            else:
                                x0 = inputs[k]
                                x = np.zeros((3,32,32))
                                images.append([x,x0,labels[k],None])
                        if b % 10 == 0:
                            print('{}/{}'.format(i,checkpoint['epoch']), j, 'Time: ', time.time() - check)
                    img.append(images)


                np.save("./cifar_resnet/{}_linf_adversarial_100.npy".format(names.split('/')[-1].split('.')[0]),img)

                # delete model to free memory
                del fb_model
                del fbn_model

Native
{'epsilon': 0.03137254901960784}
0/5 1 Time:  3.516789674758911
Native
{'epsilon': 0.03137254901960784}
0/5 31 Time:  3.0623841285705566
Native
{'epsilon': 0.03137254901960784}
0/5 61 Time:  2.7536396980285645
Native
{'epsilon': 0.03137254901960784}
0/5 91 Time:  2.8644416332244873
Native
{'epsilon': 0.03137254901960784}
0/5 121 Time:  2.965296745300293
Native
{'epsilon': 0.03137254901960784}
0/5 151 Time:  2.864016056060791
Native
{'epsilon': 0.03137254901960784}
0/5 181 Time:  2.960399627685547
Native
{'epsilon': 0.03137254901960784}
0/5 211 Time:  3.446011543273926
Native
{'epsilon': 0.03137254901960784}
0/5 241 Time:  3.4177777767181396
Native
{'epsilon': 0.03137254901960784}
0/5 271 Time:  3.381951332092285
Native
{'epsilon': 0.03137254901960784}
0/5 301 Time:  3.6130099296569824
Native
{'epsilon': 0.03137254901960784}
0/5 331 Time:  3.495917797088623
Native
{'epsilon': 0.03137254901960784}
0/5 361 Time:  3.523052215576172
Native
{'epsilon': 0.03137254901960784}
0/5 391 Tim

In [12]:
class VanillaBackprop():
    """
        Produces gradients generated with vanilla back propagation from the image
    """
    def __init__(self, model):
        self.model = model
        self.gradients = None
        # Put model in evaluation mode
        self.model.eval()
        # Hook the first layer to get the gradient
        self.hook_layers() 

    def hook_layers(self):
        def hook_function(module, grad_in, grad_out):
            self.gradients = grad_in[0]

        # Register hook to the first layer
        first_layer = list(self.model.module._modules.items())[0][1]
        first_layer.register_backward_hook(hook_function)

    def generate_gradients(self, input_image, target_class):
        # Forward
        model_output = self.model(input_image)
        # Zero grads
        self.model.zero_grad()
        # Target for backprop
        one_hot_output = torch.FloatTensor(1, model_output.size()[-1]).zero_()
        one_hot_output[0][target_class] = 1
        one_hot_output = one_hot_output.to(device)
        # Backward pass
        model_output.backward(gradient=one_hot_output)
        # Convert Pytorch variable to numpy array
        # [0] to get rid of the first channel (1,3,224,224)
        gradients_as_arr = self.gradients.cpu().data.numpy()[0]
        return gradients_as_arr

In [15]:
# load model
test_batch_size = 1

transform_test = transforms.Compose([
    transforms.ToTensor(),
])

testset = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=test_batch_size, shuffle=False, num_workers=2)

Files already downloaded and verified


In [16]:
resnet = return_model(model_name) 
resnet = torch.nn.DataParallel(resnet)
resnet = resnet.to(device)
resnet.eval()
checkpoint = torch.load('./cifar_resnet/ckpt_{}.pth'.format(model_name))
for i in range(checkpoint['epoch']+1):
    for j in range(1,392,30):
        lss = []
        names = './cifar_resnet/ckpt_{}_{}_{}.pth'.format(model_name,i, j)
        saved = torch.load(names)
        resnet.load_state_dict(saved)
        VBP = VanillaBackprop(resnet)
        print(i,checkpoint['epoch'],j)
        for b, batch in enumerate(testloader):
            if b == 100:
                break
            inputs, labels = batch
            inputs.requires_grad = True
            inputs = inputs.to(device)
            out_x = resnet.forward(inputs)
            cat = out_x.argmax(1)
            vanilla_grads = VBP.generate_gradients(inputs, int(cat.cpu().numpy()))
            vanilla_grads = vanilla_grads - vanilla_grads.min()
            vanilla_grads = vanilla_grads/vanilla_grads.max()
            lss.append(vanilla_grads)
  
        np.save("./cifar_resnet/{}_linf_saliency_100.npy".format(names.split('/')[-1].split('.')[0]),lss)



0 5 1
0 5 31
0 5 61
0 5 91
0 5 121
0 5 151
0 5 181
0 5 211
0 5 241
0 5 271
0 5 301
0 5 331
0 5 361
0 5 391
1 5 1
1 5 31
1 5 61
1 5 91
1 5 121
1 5 151
1 5 181
1 5 211
1 5 241
1 5 271
1 5 301
1 5 331
1 5 361
1 5 391
2 5 1
2 5 31
2 5 61
2 5 91
2 5 121
2 5 151
2 5 181
2 5 211
2 5 241
2 5 271
2 5 301
2 5 331
2 5 361
2 5 391
3 5 1
3 5 31
3 5 61
3 5 91
3 5 121
3 5 151
3 5 181
3 5 211
3 5 241
3 5 271
3 5 301
3 5 331
3 5 361
3 5 391
4 5 1
4 5 31
4 5 61
4 5 91
4 5 121
4 5 151
4 5 181
4 5 211
4 5 241
4 5 271
4 5 301
4 5 331
4 5 361
4 5 391
5 5 1
5 5 31
5 5 61
5 5 91
5 5 121
5 5 151
5 5 181
5 5 211
5 5 241
5 5 271
5 5 301
5 5 331
5 5 361
5 5 391
