In [None]:
!pip install torchattacks
!pip install tensorboardX
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import torchattacks
from torch.distributions.normal import Normal
from torch.distributions import kl_divergence
from torch.autograd import Function
from torchvision import transforms, datasets
import numpy as np
import os
import math
from torchvision.utils import save_image, make_grid
from tensorboardX import SummaryWriter
import operator as op
from operator import methodcaller
from typing import Union, Tuple
from torch.autograd import Variable
from skimage import io
from sklearn.cluster import KMeans
import tensorflow as tf
from skimage.util import img_as_ubyte
from skimage.util import img_as_float
import cv2
from skimage import color, data
from skimage.restoration import denoise_tv_bregman

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
class Bottleneck(nn.Module):
    def __init__(self, in_planes, growth_rate):
        super(Bottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, 4*growth_rate, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(4*growth_rate)
        self.conv2 = nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)

    def forward(self, x):
        out = self.conv1(F.relu(self.bn1(x)))
        out = self.conv2(F.relu(self.bn2(out)))
        out = torch.cat([out,x], 1)
        return out


class Transition(nn.Module):
    def __init__(self, in_planes, out_planes):
        super(Transition, self).__init__()
        self.bn = nn.BatchNorm2d(in_planes)
        self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=1, bias=False)

    def forward(self, x):
        out = self.conv(F.relu(self.bn(x)))
        out = F.avg_pool2d(out, 2)
        return out


class DenseNet(nn.Module):
    def __init__(self, block, nblocks, growth_rate=12, reduction=0.5, num_classes=10):
        super(DenseNet, self).__init__()
        self.growth_rate = growth_rate

        num_planes = 2*growth_rate
        self.conv1 = nn.Conv2d(3, num_planes, kernel_size=3, padding=1, bias=False)

        self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0])
        num_planes += nblocks[0]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans1 = Transition(num_planes, out_planes)
        num_planes = out_planes

        self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1])
        num_planes += nblocks[1]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans2 = Transition(num_planes, out_planes)
        num_planes = out_planes

        self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2])
        num_planes += nblocks[2]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans3 = Transition(num_planes, out_planes)
        num_planes = out_planes

        self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3])
        num_planes += nblocks[3]*growth_rate

        self.bn = nn.BatchNorm2d(num_planes)
        self.linear = nn.Linear(num_planes, num_classes)

    def _make_dense_layers(self, block, in_planes, nblock):
        layers = []
        for i in range(nblock):
            layers.append(block(in_planes, self.growth_rate))
            in_planes += self.growth_rate
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.trans1(self.dense1(out))
        out = self.trans2(self.dense2(out))
        out = self.trans3(self.dense3(out))
        out = self.dense4(out)
        out = F.avg_pool2d(F.relu(self.bn(out)), 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def DenseNet121():
    return DenseNet(Bottleneck, [6,12,24,16], growth_rate=32)

def DenseNet169():
    return DenseNet(Bottleneck, [6,12,32,32], growth_rate=32)

def DenseNet201():
    return DenseNet(Bottleneck, [6,12,48,32], growth_rate=32)

def DenseNet161():
    return DenseNet(Bottleneck, [6,12,36,24], growth_rate=48)

def densenet_cifar():
    return DenseNet(Bottleneck, [6,12,24,16], growth_rate=12)

In [None]:
model = densenet_cifar()
model = model.to(device)

In [None]:
from torchsummary import summary
summary(model,(3,32,32))

In [None]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform_train)
train_loader = torch.utils.data.DataLoader(
    trainset, batch_size=100, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(
    root='./data', train=False, download=True, transform=transform_test)
test_loader = torch.utils.data.DataLoader(
    testset, batch_size=100, shuffle=False, num_workers=2)

testset1 = torchvision.datasets.CIFAR10(
    root='./data', train=False, download=True, transform=transform_test)
test_loader1 = torch.utils.data.DataLoader(
    testset1, batch_size=1, shuffle=False, num_workers=2)

In [None]:
optimizer = torch.optim.Adam(model.parameters())

In [None]:
def train(epoch):
  model.train()
  print('Training Epoch: ' + str(epoch))
  for batch_idx, (data, target) in enumerate(train_loader):
    data = data.to(device)
    target = target.to(device)
    optimizer.zero_grad()
    output = model(data)
    loss = F.cross_entropy(output, target)
    loss.backward()
    optimizer.step()
    if(batch_idx % 10==0):
      print(str(batch_idx*100) + '/' + str(60000) + ' Training Loss: ' + str(loss.item()))
  torch.save(model.state_dict(), 'model.pth')

In [None]:
def test():
  model.eval()
  test_loss = 0
  correct = 0
  with torch.no_grad():
    for data, target in test_loader:
      data = data.to(device)
      target = target.to(device)
      output = model(data)
      test_loss += F.cross_entropy(output, target, size_average=False).item()
      pred = output.data.max(1, keepdim=True)[1]
      correct += pred.eq(target.data.view_as(pred)).sum()
  test_loss /= len(test_loader.dataset)
  print('Testing loss: ' + str(test_loss) + ' Accuracy: '
        + str((100. * correct / len(test_loader.dataset)).item()))

Train Baseline Model:

In [None]:
epochs = 5
for iter in range(epochs):
  train(iter)
  print('.....................................')
  print('Testing')
  test()
  print('.....................................')

List of Attacks Available: Projected Gradient Descent (PGD), Fast Gradient Sign Method (FGSM), Iterative FGSM (IFGSM), Carlini Wagner L2 Attack (CWLA) and Deep Fool Attack (DF). Run on one of the attacks and then skip to next markdown.

In [None]:
#PGD

def pgd_linf(model, X, y, epsilon, alpha, num_iter):
    delta = torch.zeros_like(X, requires_grad=True)
    for t in range(num_iter):
        loss = nn.CrossEntropyLoss()(model(X + delta), y)
        loss.backward()
        delta.data = (delta + alpha*delta.grad.detach().sign()).clamp(-epsilon,epsilon)
        delta.grad.zero_()
    return delta

def PGDtest(model,device,test_loader,epsilon):
  correct=0
  k=0
  adv_img=[]
  adv_img_label=[]
  for image,label in test_loader:
    if k==50:
      break
    if k%10==0:
      print(k)
    image,label=image.to(device),label.to(device)
    delta=pgd_linf(model,image,label,epsilon,1e-2,10)
    image=image+delta
    for i in range(0,len(image)):
      adv_img.append(image[i])
      adv_img_label.append(label[i])
    
    output=model(image)
    final_pred = output.max(1, keepdim=True)[1] 
    correct += final_pred.eq(label.data.view_as(final_pred)).sum()
    k=k+1

        
  print(k)
  final_acc = correct/len(adv_img)
  print("Epsilon: {}\tTest Accuracy = {} / {} = {}".format(epsilon, correct, len(adv_img), final_acc))

  return final_acc, adv_img,adv_img_label
  
acc,adv_img,adv_img_label=PGDtest(model,device,test_loader,0.3)

In [None]:
#IFGSM

def i_fgsm_attack(image,out, epsilon, data_grad,n_iter=10):
    sign_data_grad = data_grad.sign()
    perturbed_image = image
    for i in range(n_iter):

        output = model(perturbed_image)
        init_pred=output.max(1,keepdim=True)[1]
        if init_pred.item()!=out.item():
          continue

        loss = F.nll_loss(output, out)
        model.zero_grad()
        loss.backward()
        data_grad = image.grad.data
        sign_data_grad = data_grad.sign()

        perturbed_image = image + epsilon*sign_data_grad
        perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image

def IFGSMtest( model, device, test_loader, epsilon ):

    correct = 0
    k=0
    adv_img = []
    adv_img_label=[]

    for data, target in test_loader:
        if k==1000:
          break
        
        if k%100==0:
          print(k)

        data, target = data.to(device), target.to(device)

        data.requires_grad = True

        output = model(data)

        loss = F.nll_loss(output, target)

        model.zero_grad()

        loss.backward()

        data_grad = data.grad.data

        perturbed_data = i_fgsm_attack(data, target, epsilon, data_grad,10)
        adv_img.append(perturbed_data[0])
        adv_img_label.append(target[0])
        out = model(perturbed_data)

        final_pred = out.max(1, keepdim=True)[1] 
        if final_pred.item()==target.item():
          correct=correct+1
        k=k+1

        
    print(k,len(adv_img))
    final_acc = correct/len(adv_img)
    print("Epsilon: {}\tTest Accuracy = {} / {} = {}".format(epsilon, correct, len(adv_img), final_acc))

    return final_acc, adv_img,adv_img_label
  
acc,adv_img,adv_img_label=IFGSMtest(model,device,test_loader1,0.3)

In [None]:
#FGSM

def fgsm_attack(image, epsilon, data_grad):
    sign_data_grad = data_grad.sign()
    perturbed_image = image + epsilon*sign_data_grad
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image

def FGSMtest(model, device, test_loader, epsilon):

    correct = 0
    k=0
    adv_img = []
    adv_img_label=[]

    for data, target in test_loader:
        if k==1000:
          break
        
        if k%100==0:
          print(k)

        data, target = data.to(device), target.to(device)

        data.requires_grad = True

        output = model(data)

        loss = F.nll_loss(output, target)

        model.zero_grad()

        loss.backward()

        data_grad = data.grad.data

        perturbed_data = fgsm_attack(data, epsilon, data_grad)
        adv_img.append(perturbed_data[0])
        adv_img_label.append(target[0])

        output = model(perturbed_data)

        final_pred = output.max(1, keepdim=True)[1] 
        if final_pred.item()==target.item():
          correct=correct+1
        k=k+1

    final_acc = correct/len(adv_img)
    print("Epsilon: {}\tTest Accuracy = {} / {} = {}".format(epsilon, correct, len(adv_img), final_acc))

    return final_acc, adv_img,adv_img_label
  
acc,adv_img,adv_img_label=FGSMtest(model,device,test_loader1,0.3)

In [None]:
#DF

def DeepFool(image,model):

    steps = 3
    image = image.to(device)

    image.requires_grad = True
    output = model(image)[0]

    _, pre_0 = torch.max(output, 0)
    f_0 = output[pre_0]
    grad_f_0 = torch.autograd.grad(f_0, image,
                                    retain_graph=False,
                                    create_graph=False)[0]
    num_classes = len(output)

    for i in range(steps):
        image.requires_grad = True
        output = model(image)[0]
        _, pre = torch.max(output, 0)

        if pre != pre_0:
            image = torch.clamp(image, min=0, max=1).detach()
            break

        r = None
        min_value = None

        for k in range(num_classes):
            if k == pre_0:
                continue

            f_k = output[k]
            grad_f_k = torch.autograd.grad(f_k, image,
                                            retain_graph=True,
                                            create_graph=True)[0]

            f_prime = f_k - f_0
            grad_f_prime = grad_f_k - grad_f_0
            value = torch.abs(f_prime)/torch.norm(grad_f_prime)

            if r is None:
                r = (torch.abs(f_prime)/(torch.norm(grad_f_prime)**2))*grad_f_prime
                min_value = value
            else:
                if min_value > value:
                    r = (torch.abs(f_prime)/(torch.norm(grad_f_prime)**2))*grad_f_prime
                    min_value = value

        image = torch.clamp(image + r, min=0, max=1).detach()
    return image

def DFtest( model, device, test_loader):

    correct = 0
    k=0
    adv_img = []
    adv_img_label=[]

    for data, target in test_loader:
        if k==1000:
          break
        if k%100==0:
          print(k)

        data, target = data.to(device), target.to(device)

        data.requires_grad = True

        output = model(data)

        loss = F.nll_loss(output, target)

        model.zero_grad()

        loss.backward()

        perturbed_data = DeepFool(data, model)
        adv_img.append(perturbed_data[0])
        adv_img_label.append(target[0])

        output = model(perturbed_data)

        final_pred = output.max(1, keepdim=True)[1] 
        if final_pred.item()==target.item():
          correct=correct+1

        k=k+1

    print(k,len(adv_img),correct)
    final_acc = correct/len(adv_img)
    print("Test_Accuracy", final_acc)

    return final_acc, adv_img,adv_img_label

acc, adv_img, adv_img_label = DFtest(model, device, test_loader1)

In [None]:
#CWLA

def get_cuda_state(obj):

    if isinstance(obj, nn.Module):
        try:
            return next(obj.parameters()).is_cuda
        except StopIteration:
            return None
    elif hasattr(obj, 'is_cuda'):
        return obj.is_cuda
    else:
        raise TypeError('unrecognized type ({}) in args'.format(type(obj)))


def is_cuda_consistent(*args):

    result = dict()
    for v in args:
        cur_cuda_state = get_cuda_state(v)
        cuda_state = result.get('cuda', cur_cuda_state)
        if cur_cuda_state is not cuda_state:
            return False
        result['cuda'] = cur_cuda_state
    return True

def make_cuda_consistent(refobj, *args):

    ref_cuda_state = refobj if type(refobj) is bool else get_cuda_state(refobj)
    if ref_cuda_state is None:
        raise ValueError('cannot determine the cuda state of `refobj` ({})'
                .format(refobj))
    move_to_device = methodcaller('cuda' if ref_cuda_state else 'cpu')

    result_args = list()
    for v in args:
        cuda_state = get_cuda_state(v)
        if cuda_state != ref_cuda_state:
            if isinstance(v, nn.Module):
                move_to_device(v)
            else:
                v = move_to_device(v)
        result_args.append(v)
    return tuple(result_args)

def predict(net, inputs):

    inputs = make_cuda_consistent(net, inputs)[0]
    inputs_var = Variable(inputs)
    outputs_var = net(inputs_var)
    predictions = torch.max(outputs_var.data, dim=1)[1]
    return predictions
"""
"""
def _var2numpy(var):
    return var.data.cpu().numpy()


def atanh(x, eps=1e-6):
    x = x * (1 - eps)
    return 0.5 * torch.log((1.0 + x) / (1.0 - x))

def to_tanh_space(x, box):
    _box_mul = (box[1] - box[0]) * 0.5
    _box_plus = (box[1] + box[0]) * 0.5
    return atanh((x - _box_plus) / _box_mul)

def from_tanh_space(x, box):
    _box_mul = (box[1] - box[0]) * 0.5
    _box_plus = (box[1] + box[0]) * 0.5
    return torch.tanh(x) * _box_mul + _box_plus


class L2Adversary(object):
    def __init__(self, targeted=True, confidence=0.0, c_range=(1e-3, 1e10),
                 search_steps=5, max_steps=1000, abort_early=True,
                 box=(-1., 1.), optimizer_lr=1e-2, init_rand=False):
        if len(c_range) != 2:
            raise TypeError('c_range ({}) should be of form '
                            'tuple([lower_bound, upper_bound])'
                            .format(c_range))
        if c_range[0] >= c_range[1]:
            raise ValueError('c_range lower bound ({}) is expected to be less '
                             'than c_range upper bound ({})'.format(*c_range))
        if len(box) != 2:
            raise TypeError('box ({}) should be of form '
                            'tuple([lower_bound, upper_bound])'
                            .format(box))
        if box[0] >= box[1]:
            raise ValueError('box lower bound ({}) is expected to be less than '
                             'box upper bound ({})'.format(*box))
        self.targeted = targeted
        self.confidence = float(confidence)
        self.c_range = (float(c_range[0]), float(c_range[1]))
        self.binary_search_steps = search_steps
        self.max_steps = max_steps
        self.abort_early = abort_early
        self.ae_tol = 1e-4  # tolerance of early abort
        self.box = tuple(map(float, box))  # type: Tuple[float, float]
        self.optimizer_lr = optimizer_lr
        self.init_rand = init_rand
        self.repeat = (self.binary_search_steps >= 10)

    def __call__(self, model, inputs, targets, to_numpy=True):
        # sanity check
        assert isinstance(model, nn.Module)
        assert len(inputs.size()) == 4
        assert len(targets.size()) == 1

        targets_np = targets.clone().cpu().numpy()  # type: np.ndarray

        inputs = make_cuda_consistent(model, inputs)[0]  # type: torch.FloatTensor
        targets = make_cuda_consistent(model, targets)[0]  # type: torch.FloatTensor

        num_classes = model(Variable(inputs[0][None, :], requires_grad=False)).size(1)  # type: int
        batch_size = inputs.size(0)  # type: int

        lower_bounds_np = np.zeros(batch_size)
        upper_bounds_np = np.ones(batch_size) * self.c_range[1]
        scale_consts_np = np.ones(batch_size) * self.c_range[0]

        o_best_l2 = np.ones(batch_size) * np.inf
        o_best_l2_ppred = -np.ones(batch_size)
        o_best_advx = inputs.clone().cpu().numpy()  # type: np.ndarray

        inputs_tanh = self._to_tanh_space(inputs)  # type: torch.FloatTensor
        inputs_tanh_var = Variable(inputs_tanh, requires_grad=False)

        targets_oh = torch.zeros(targets.size() + (num_classes,))  # type: torch.FloatTensor
        targets_oh = make_cuda_consistent(model, targets_oh)[0]
        targets_oh.scatter_(1, targets.unsqueeze(1), 1.0)
        targets_oh_var = Variable(targets_oh, requires_grad=False)

        pert_tanh = torch.zeros(inputs.size())  # type: torch.FloatTensor
        if self.init_rand:
            nn.init.normal(pert_tanh, mean=0, std=1e-3)
        pert_tanh = make_cuda_consistent(model, pert_tanh)[0]
        pert_tanh_var = Variable(pert_tanh, requires_grad=True)

        optimizer = optim.Adam([pert_tanh_var], lr=self.optimizer_lr)
        for sstep in range(self.binary_search_steps):
            if self.repeat and sstep == self.binary_search_steps - 1:
                scale_consts_np = upper_bounds_np
            scale_consts = torch.from_numpy(np.copy(scale_consts_np)).float()  # type: torch.FloatTensor
            scale_consts = make_cuda_consistent(model, scale_consts)[0]
            scale_consts_var = Variable(scale_consts, requires_grad=False)
            print ('Using scale consts:', list(scale_consts_np) ) # FIXME

            best_l2 = np.ones(batch_size) * np.inf

            best_l2_ppred = -np.ones(batch_size)
            prev_batch_loss = np.inf  # type: float
            for optim_step in range(self.max_steps):
                batch_loss, pert_norms_np, pert_outputs_np, advxs_np = \
                    self._optimize(model, optimizer, inputs_tanh_var,
                                   pert_tanh_var, targets_oh_var,
                                   scale_consts_var)
                if optim_step % 10 == 0: print ('batch [{}] loss: {}'.format(optim_step, batch_loss))  # FIXME

                if self.abort_early and not optim_step % (self.max_steps // 10):
                    if batch_loss > prev_batch_loss * (1 - self.ae_tol):
                        break
                    prev_batch_loss = batch_loss
                pert_predictions_np = np.argmax(pert_outputs_np, axis=1)
                comp_pert_predictions_np = np.argmax(
                        self._compensate_confidence(pert_outputs_np,
                                                    targets_np),
                        axis=1)
                for i in range(batch_size):
                    l2 = pert_norms_np[i]
                    cppred = comp_pert_predictions_np[i]
                    ppred = pert_predictions_np[i]
                    tlabel = targets_np[i]
                    ax = advxs_np[i]
                    if self._attack_successful(cppred, tlabel):
                        assert cppred == ppred
                        if l2 < best_l2[i]:
                            best_l2[i] = l2
                            best_l2_ppred[i] = ppred
                        if l2 < o_best_l2[i]:
                            o_best_l2[i] = l2
                            o_best_l2_ppred[i] = ppred
                            o_best_advx[i] = ax

            for i in range(batch_size):
                tlabel = targets_np[i]
                assert best_l2_ppred[i] == -1 or \
                       self._attack_successful(best_l2_ppred[i], tlabel)
                assert o_best_l2_ppred[i] == -1 or \
                       self._attack_successful(o_best_l2_ppred[i], tlabel)
                if best_l2_ppred[i] != -1:
                    if scale_consts_np[i] < upper_bounds_np[i]:
                        upper_bounds_np[i] = scale_consts_np[i]

                    if upper_bounds_np[i] < self.c_range[1] * 0.1:
                        scale_consts_np[i] = (lower_bounds_np[i] + upper_bounds_np[i]) / 2
                else:
                    if scale_consts_np[i] > lower_bounds_np[i]:
                        lower_bounds_np[i] = scale_consts_np[i]
                    if upper_bounds_np[i] < self.c_range[1] * 0.1:
                        scale_consts_np[i] = (lower_bounds_np[i] + upper_bounds_np[i]) / 2
                    else:
                        scale_consts_np[i] *= 10

        if not to_numpy:
            o_best_advx = torch.from_numpy(o_best_advx).float()
        return o_best_advx

    def _optimize(self, model, optimizer, inputs_tanh_var, pert_tanh_var,
                  targets_oh_var, c_var):

        advxs_var = self._from_tanh_space(inputs_tanh_var + pert_tanh_var)  # type: Variable
        # the perturbed activation before softmax
        pert_outputs_var = model(advxs_var)  # type: Variable
        # the original inputs
        inputs_var = self._from_tanh_space(inputs_tanh_var)  # type: Variable

        perts_norm_var = torch.pow(advxs_var - inputs_var, 2)
        perts_norm_var = torch.sum(perts_norm_var.view(
                perts_norm_var.size(0), -1), 1)

        target_activ_var = torch.sum(targets_oh_var * pert_outputs_var, 1)
        inf = 1e4  
        assert (pert_outputs_var.max(1)[0] >= -inf).all(), 'assumption failed'

        maxother_activ_var = torch.max(((1 - targets_oh_var) * pert_outputs_var
                                        - targets_oh_var * inf), 1)[0]

        if self.targeted:
            f_var = torch.clamp(maxother_activ_var - target_activ_var
                                + self.confidence, min=0.0)
        else:
            f_var = torch.clamp(target_activ_var - maxother_activ_var
                                + self.confidence, min=0.0)
        batch_loss_var = torch.sum(perts_norm_var + c_var * f_var)  # type: Variable

        optimizer.zero_grad()
        batch_loss_var.backward()
        optimizer.step()

        batch_loss = batch_loss_var.item()  # type: float
        pert_norms_np = _var2numpy(perts_norm_var)
        pert_outputs_np = _var2numpy(pert_outputs_var)
        advxs_np = _var2numpy(advxs_var)
        return batch_loss, pert_norms_np, pert_outputs_np, advxs_np

    def _attack_successful(self, prediction, target):

        if self.targeted:
            return prediction == target
        else:
            return prediction != target

    # noinspection PyUnresolvedReferences
    def _compensate_confidence(self, outputs, targets):

        outputs_comp = np.copy(outputs)
        rng = np.arange(targets.shape[0])
        if self.targeted:
            outputs_comp[rng, targets] -= self.confidence
        else:
            outputs_comp[rng, targets] += self.confidence
        return outputs_comp

    def _to_tanh_space(self, x):
        return to_tanh_space(x, self.box)

    def _from_tanh_space(self, x):
        return from_tanh_space(x, self.box)

mean=(0.4914, 0.4822, 0.4465)
std=(0.2023, 0.1994, 0.2010)
inputs_box=(min((0 - m) / s for m, s in zip(mean, std)),max((1 - m) / s for m, s in zip(mean, std)))
adversary = L2Adversary(targeted=False,
                           confidence=0.0,
                           search_steps=10,
                           box=inputs_box,
                           optimizer_lr=5e-4)

model.eval()
correct=0
total=0
loss=0
adv_img=[]
adv_img_label=[]
for images,labels in test_loader:
  adv_images=adversary(model, images, labels, to_numpy=False).cuda()
  for i in range(0,len(images)):
    adv_img.append(adv_images[i])
    adv_img_label.append(labels[i])
  output=model(adv_images)
  labels=labels.to(device)
  loss+=F.cross_entropy(output,labels).item()
  pred=output.data.max(1,keepdim=True)[1]
  correct+=pred.eq(labels.data.view_as(pred)).sum()

loss/=len(test_loader.dataset)
print('Loss: '+str(loss)+' Accuracy:'+str((100. * correct / len(test_loader.dataset)).item()))

That was our list of attacks. Now some in between preprocessing first.

In [None]:
adv_img=torch.stack(adv_img)
adv_img_label=torch.stack(adv_img_label)
adv_dataset=torch.utils.data.TensorDataset(adv_img,adv_img_label)
adv_loader=torch.utils.data.DataLoader(adv_dataset,batch_size=100)

Now for our list of defenses: JPEG Compression (this is our best one), K-Means, Gaussian Smoothing, Total Variance Maximization (TVM) and Vector-Quantized Variational Auto Encoder (VQVAE). Run any one of these methods. It will show the final accuracy as well as a plot of 12 images which consists of 4 original images then 4 images of them after being attacked and then the final 4 are the reconstructed version of them after passing through the defense.

In [None]:
#KMeans

def k_means(data,n_clusters=50):
  kmeans = KMeans(n_clusters)
  rows=data.shape[2]
  cols=data.shape[3]
  recon=[]
  for image in data:
    
    image2 = image.reshape(rows*cols, 3)
    kmeans.fit(image2.detach().cpu())
    compressed_image = kmeans.cluster_centers_[kmeans.labels_]
    compressed_image = np.clip(compressed_image.astype('float'), -1, 1)
    compressed_image = compressed_image.reshape(3,rows, cols)
    recon.append(torch.from_numpy(compressed_image))

  recon=torch.stack(recon)
  return recon.float()

recons_img=[]
recons_img_label=[]
loss=0
correct=0
j=0
for images,label in adv_loader:
  if(j==10):
    break
  print(j)
  images = images.to('cuda')
  x_tilde= k_means(images)
  for i in range(0,len(images)):
    recons_img.append(x_tilde[i])
    recons_img_label.append(label[i])
  output=model(x_tilde.cuda())
  label=label.to(device)
  loss+=F.cross_entropy(output,label).item()
  pred=output.data.max(1,keepdim=True)[1]
  correct+=pred.eq(label.data.view_as(pred)).sum()
  j=j+1

loss/=len(recons_img)
print('Loss: '+str(loss)+' Accuracy:'+str((100. * correct / len(recons_img)).item()))

import matplotlib.pyplot as plt
test_iter = test_loader.__iter__()
adv_iter=adv_loader.__iter__()
test_imgs,_=next(test_iter)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
test_imgs=test_imgs.cpu().detach().numpy()
plt.subplot(4,3,1)
plt.imshow(np.transpose(test_imgs[0],(1,2,0)))
plt.subplot(4,3,2)
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,3)
plt.imshow(np.transpose(recons_img[0].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,4)
plt.imshow(np.transpose(test_imgs[1],(1,2,0)))
plt.subplot(4,3,5)
plt.imshow(np.transpose(adv_imgs[1],(1,2,0)))
plt.subplot(4,3,6)
plt.imshow(np.transpose(recons_img[1].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,7)
plt.imshow(np.transpose(test_imgs[2],(1,2,0)))
plt.subplot(4,3,8)
plt.imshow(np.transpose(adv_imgs[2],(1,2,0)))
plt.subplot(4,3,9)
plt.imshow(np.transpose(recons_img[2].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,10)
plt.imshow(np.transpose(test_imgs[3],(1,2,0)))
plt.subplot(4,3,11)
plt.imshow(np.transpose(adv_imgs[3],(1,2,0)))
plt.subplot(4,3,12)
plt.imshow(np.transpose(recons_img[3].cpu().detach().numpy(),(1,2,0)))

In [None]:
#VQVAE

class VectorQuant(Function):
    @staticmethod
    def forward(ctx,input,codebook):
        with torch.no_grad():
            embedding_size=codebook.size(1)
            input_size=input.size()
            input_flatten=input.view(-1,embedding_size)

            codebook_mse=torch.sum(codebook**2,dim=1)
            input_mse=torch.sum(input_flatten**2,dim=1,keepdim=True)

            d=torch.addmm(codebook_mse+input_mse,input_flatten,codebook.t(),alpha=-2.0,beta=1.0)

            _,indice=torch.min(d,dim=1)
            indice=indice.view(*input_size[:-1])
            ctx.mark_non_differentiable(indice)

            return indice

class VectorQuantST(Function):
    @staticmethod
    def forward(ctx,input,codebook):
        indice=vq(input,codebook)
        indice=indice.view(-1)
        ctx.save_for_backward(indice,codebook)
        ctx.mark_non_differentiable(indice)

        codes=torch.index_select(codebook,dim=0,index=indice)
        codes=codes.view_as(input)

        return (codes,indice)

    @staticmethod
    def backward(ctx,grad_out,grad_idx):
        grad_in,grad_cb=None,None

        if ctx.needs_input_grad[0]:
            grad_in=grad_out.clone()
        
        if ctx.needs_input_grad[1]:
            indice,codebook=ctx.saved_tensors
            embedding_size=codebook.size(1)

            grad_out=(grad_out.contiguous().view(-1,embedding_size))
            grad_cb=torch.zeros_like(codebook)
            grad_cb.index_add_(0,indice,grad_out)
        
        return (grad_in,grad_cb)

vq=VectorQuant.apply
vq_st=VectorQuantST.apply

def to_scalar(arr):
    if type(arr)==list:
        return [x.item() for x in arr]
    else:
        return arr.item()

def weights_init(m):
    classname=m.__class__.__name__
    if classname.find('Conv')!=-1:
        try:
            nn.init.xavier_uniform_(m.weight.data)
            m.bias.data.fill_(0)
        except AttributeError:
            print("Skipping initialization of ", classname)

class VQEmbedding(nn.Module):
    def __init__(self, K, D):
        super().__init__()
        self.embedding = nn.Embedding(K, D)
        self.embedding.weight.data.uniform_(-1./K, 1./K)

    def forward(self, z_e_x):
        z_e_x_ = z_e_x.permute(0, 2, 3, 1).contiguous()
        latents = vq(z_e_x_, self.embedding.weight)
        return latents

    def straight_through(self, z_e_x):
        z_e_x_ = z_e_x.permute(0, 2, 3, 1).contiguous()
        z_q_x_, indices = vq_st(z_e_x_, self.embedding.weight.detach())
        z_q_x = z_q_x_.permute(0, 3, 1, 2).contiguous()

        z_q_x_bar_flatten = torch.index_select(self.embedding.weight,
            dim=0, index=indices)
        z_q_x_bar_ = z_q_x_bar_flatten.view_as(z_e_x_)
        z_q_x_bar = z_q_x_bar_.permute(0, 3, 1, 2).contiguous()

        return z_q_x, z_q_x_bar

class ResBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.block = nn.Sequential(
            nn.ReLU(True),
            nn.Conv2d(dim, dim, 3, 1, 1),
            nn.BatchNorm2d(dim),
            nn.ReLU(True),
            nn.Conv2d(dim, dim, 1),
            nn.BatchNorm2d(dim)
        )

    def forward(self, x):
        return x + self.block(x)

class VectorQuantizedVAE(nn.Module):
    def __init__(self, input_dim, dim, K=512):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(input_dim, dim, 4, 2, 1),
            nn.BatchNorm2d(dim),
            nn.ReLU(True),
            nn.Conv2d(dim, dim, 4, 2, 1),
            ResBlock(dim),
            ResBlock(dim),
        )

        self.codebook = VQEmbedding(K, dim)

        self.decoder = nn.Sequential(
            ResBlock(dim),
            ResBlock(dim),
            nn.ReLU(True),
            nn.ConvTranspose2d(dim, dim, 4, 2, 1),
            nn.BatchNorm2d(dim),
            nn.ReLU(True),
            nn.ConvTranspose2d(dim, input_dim, 4, 2, 1),
            nn.Tanh()
        )

        self.apply(weights_init)

    def encode(self, x):
        z_e_x = self.encoder(x)
        latents = self.codebook(z_e_x)
        return latents

    def decode(self, latents):
        z_q_x = self.codebook.embedding(latents).permute(0, 3, 1, 2)  # (B, D, H, W)
        x_tilde = self.decoder(z_q_x)
        return x_tilde

    def forward(self, x):
        z_e_x = self.encoder(x)
        z_q_x_st, z_q_x = self.codebook.straight_through(z_e_x)
        x_tilde = self.decoder(z_q_x_st)
        return x_tilde, z_e_x, z_q_x

testmodel = VectorQuantizedVAE(3, 256, 512).to('cuda')
testmodel.load_state_dict(torch.load('/content/best_cifar.pt'))
testmodel.eval()

recons_img=[]
recons_img_label=[]
loss=0
correct=0
j=0
for images,label in adv_loader:
  if(j==10):
    break

  print(j)
  images = images.to('cuda')
  x_tilde,_,_ = testmodel(images)
  for i in range(0,len(images)):
    recons_img.append(x_tilde[i])
    recons_img_label.append(label[i])
  output=model(x_tilde)
  label=label.to(device)
  loss+=F.cross_entropy(output,label).item()
  pred=output.data.max(1,keepdim=True)[1]
  correct+=pred.eq(label.data.view_as(pred)).sum()
  j=j+1

loss/=len(recons_img)
print('Loss: '+str(loss)+' Accuracy:'+str((100. * correct / len(recons_img)).item()))

import matplotlib.pyplot as plt
test_iter = test_loader.__iter__()
adv_iter=adv_loader.__iter__()
test_imgs,_=next(test_iter)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
test_imgs=test_imgs.cpu().detach().numpy()
plt.subplot(4,3,1)
plt.imshow(np.transpose(test_imgs[0],(1,2,0)))
plt.subplot(4,3,2)
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,3)
plt.imshow(np.transpose(recons_img[0].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,4)
plt.imshow(np.transpose(test_imgs[1],(1,2,0)))
plt.subplot(4,3,5)
plt.imshow(np.transpose(adv_imgs[1],(1,2,0)))
plt.subplot(4,3,6)
plt.imshow(np.transpose(recons_img[1].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,7)
plt.imshow(np.transpose(test_imgs[2],(1,2,0)))
plt.subplot(4,3,8)
plt.imshow(np.transpose(adv_imgs[2],(1,2,0)))
plt.subplot(4,3,9)
plt.imshow(np.transpose(recons_img[2].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,10)
plt.imshow(np.transpose(test_imgs[3],(1,2,0)))
plt.subplot(4,3,11)
plt.imshow(np.transpose(adv_imgs[3],(1,2,0)))
plt.subplot(4,3,12)
plt.imshow(np.transpose(recons_img[3].cpu().detach().numpy(),(1,2,0)))

In [None]:
#JPEG

def jpeg(x, quality=10):
    myimg=x[0].detach().cpu().numpy()
    myimg=np.transpose(myimg,(1,2,0))
    myimg=np.clip(myimg, -1, 1)
    myimg=img_as_ubyte(myimg)
    myimg=tf.image.decode_jpeg(tf.image.encode_jpeg(myimg, format='rgb', quality=quality),channels=3)
    myimg=img_as_float(myimg)
    myimg=np.transpose(myimg,(2,0,1))
    y=torch.FloatTensor(myimg)
    return y

adv_dataset=torch.utils.data.TensorDataset(adv_img,adv_img_label)
adv_loader=torch.utils.data.DataLoader(adv_dataset,batch_size=1)

recons_img=[]
recons_img_label=[]
loss=0
correct=0
j=0
for images,label in adv_loader:
  if(j==1000):
    break
  if j%100==0:
    print(j)
  images = images.to('cuda')
  x_tilde= jpeg(images)
  recons_img.append(x_tilde)
  recons_img_label.append(label)

  x_tilde = x_tilde.reshape(1,3,32,32)
  x_tilde = x_tilde.type(torch.cuda.FloatTensor)
  x_tilde = transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))(x_tilde)
  output=model(x_tilde.cuda())
  label=label.to(device)
  loss+=F.cross_entropy(output,label).item()
  pred=output.data.max(1,keepdim=True)[1]
  correct+=pred.eq(label.data.view_as(pred)).sum()
  j=j+1

loss/=len(recons_img)
print('Loss: '+str(loss)+' Accuracy:'+str((100. * correct / len(recons_img)).item()))

import matplotlib.pyplot as plt
test_iter = test_loader.__iter__()
adv_iter=adv_loader.__iter__()
test_imgs,_=next(test_iter)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
test_imgs=test_imgs.cpu().detach().numpy()

plt.subplot(4,3,1)
plt.imshow(np.transpose(test_imgs[0],(1,2,0)))
plt.subplot(4,3,2)
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,3)
plt.imshow(np.transpose(img_as_ubyte(recons_img[0].cpu().detach().numpy()),(1,2,0)))
plt.subplot(4,3,4)
plt.imshow(np.transpose(test_imgs[1],(1,2,0)))
plt.subplot(4,3,5)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,6)
plt.imshow(np.transpose(img_as_ubyte(recons_img[1].cpu().detach().numpy()),(1,2,0)))
plt.subplot(4,3,7)
plt.imshow(np.transpose(test_imgs[2],(1,2,0)))
plt.subplot(4,3,8)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,9)
plt.imshow(np.transpose(img_as_ubyte(recons_img[2].cpu().detach().numpy()),(1,2,0)))
plt.subplot(4,3,10)
plt.imshow(np.transpose(test_imgs[3],(1,2,0)))
plt.subplot(4,3,11)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,12)
plt.imshow(np.transpose(img_as_ubyte(recons_img[3].cpu().detach().numpy()),(1,2,0)))

In [None]:
# Gaussian Smoothing

def gaussian(data):
  rows=data.shape[2]
  cols=data.shape[3]
  recon=[]
  for image in data:
    image = image.reshape(rows,cols,3)
    image2 = image.cpu()
    image2=image2.detach().numpy()
    blur = cv2.GaussianBlur(np.float32(image2),(5,5),sigmaX=0,sigmaY=0.3)
    compressed_image = blur.reshape(3,rows, cols)
    compressed_image = torch.from_numpy(compressed_image)
    compressed_image = compressed_image.to('cuda')
    recon.append(compressed_image)

  recon=torch.stack(recon)
  return recon

recons_img=[]
recons_img_label=[]
loss=0
correct=0
j=0
for images,label in adv_loader:
  if(j==10):
    break
  print(j)
  images = images.to('cuda')
  x_tilde= gaussian(images)
  for i in range(0,len(images)):
    recons_img.append(x_tilde[i])
    recons_img_label.append(label[i])
  output=model(x_tilde.cuda())
  label=label.to(device)
  loss+=F.cross_entropy(output,label).item()
  pred=output.data.max(1,keepdim=True)[1]
  correct+=pred.eq(label.data.view_as(pred)).sum()
  j=j+1

loss/=len(recons_img)
print('Loss: '+str(loss)+' Accuracy:'+str((100. * correct / len(recons_img)).item()))

import matplotlib.pyplot as plt
test_iter = test_loader.__iter__()
adv_iter=adv_loader.__iter__()
test_imgs,_=next(test_iter)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
test_imgs=test_imgs.cpu().detach().numpy()
plt.subplot(4,3,1)
plt.imshow(np.transpose(test_imgs[0],(1,2,0)))
plt.subplot(4,3,2)
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,3)
plt.imshow(np.transpose(recons_img[0].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,4)
plt.imshow(np.transpose(test_imgs[1],(1,2,0)))
plt.subplot(4,3,5)
plt.imshow(np.transpose(adv_imgs[1],(1,2,0)))
plt.subplot(4,3,6)
plt.imshow(np.transpose(recons_img[1].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,7)
plt.imshow(np.transpose(test_imgs[2],(1,2,0)))
plt.subplot(4,3,8)
plt.imshow(np.transpose(adv_imgs[2],(1,2,0)))
plt.subplot(4,3,9)
plt.imshow(np.transpose(recons_img[2].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,10)
plt.imshow(np.transpose(test_imgs[3],(1,2,0)))
plt.subplot(4,3,11)
plt.imshow(np.transpose(adv_imgs[3],(1,2,0)))
plt.subplot(4,3,12)
plt.imshow(np.transpose(recons_img[3].cpu().detach().numpy(),(1,2,0)))

In [None]:
#TVM

def tvm(data,drop_rate=0.5,weights=0.03):
  rows=data.shape[2]
  cols=data.shape[3]
  recon=[]
  for image in data:
    image2 = image.reshape(rows,cols,3)
    image2 = image2.cpu()
    image2=image2.detach().numpy()
    new_image = denoise_tv_bregman(image2,weight=5)
    compressed_image = new_image.reshape(3,rows, cols)
    compressed_image = torch.from_numpy(compressed_image)
    compressed_image = compressed_image.to('cuda')
    recon.append(compressed_image)

  recon=torch.stack(recon)
  return recon
adv_dataset=torch.utils.data.TensorDataset(adv_img,adv_img_label)
adv_loader=torch.utils.data.DataLoader(adv_dataset,batch_size=100)

recons_img=[]
recons_img_label=[]
loss=0
correct=0
j=0
for images,label in adv_loader:
  if(j==10):
    break
  print(j)
  images = images.to('cuda')
  x_tilde= tvm(images)
  for i in range(0,len(images)):
    recons_img.append(x_tilde[i])
    recons_img_label.append(label[i])
  output=model(x_tilde.cuda())
  label=label.to(device)
  loss+=F.cross_entropy(output,label).item()
  pred=output.data.max(1,keepdim=True)[1]
  correct+=pred.eq(label.data.view_as(pred)).sum()
  j=j+1

loss/=len(recons_img)
print('Loss: '+str(loss)+' Accuracy:'+str((100. * correct / len(recons_img)).item()))

import matplotlib.pyplot as plt
test_iter = test_loader.__iter__()
adv_iter=adv_loader.__iter__()
test_imgs,_=next(test_iter)
adv_imgs,_=next(adv_iter)
adv_imgs=adv_imgs.cpu().detach().numpy()
test_imgs=test_imgs.cpu().detach().numpy()
plt.subplot(4,3,1)
plt.imshow(np.transpose(test_imgs[0],(1,2,0)))
plt.subplot(4,3,2)
plt.imshow(np.transpose(adv_imgs[0],(1,2,0)))
plt.subplot(4,3,3)
plt.imshow(np.transpose(recons_img[0].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,4)
plt.imshow(np.transpose(test_imgs[1],(1,2,0)))
plt.subplot(4,3,5)
plt.imshow(np.transpose(adv_imgs[1],(1,2,0)))
plt.subplot(4,3,6)
plt.imshow(np.transpose(recons_img[1].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,7)
plt.imshow(np.transpose(test_imgs[2],(1,2,0)))
plt.subplot(4,3,8)
plt.imshow(np.transpose(adv_imgs[2],(1,2,0)))
plt.subplot(4,3,9)
plt.imshow(np.transpose(recons_img[2].cpu().detach().numpy(),(1,2,0)))
plt.subplot(4,3,10)
plt.imshow(np.transpose(test_imgs[3],(1,2,0)))
plt.subplot(4,3,11)
plt.imshow(np.transpose(adv_imgs[3],(1,2,0)))
plt.subplot(4,3,12)
plt.imshow(np.transpose(recons_img[3].cpu().detach().numpy(),(1,2,0)))