# Set up for dataset and model

Package installation, loading, and dataloaders. There's also a resnet18 model defined.

In [None]:
import sys
print(sys.executable)
import certifi, ssl
#ssl._create_default_https_context = ssl._create_unverified_context


/usr/bin/python3


In [4]:
# Install dependencies (Colab already has torch/torchvision, but just in case)
!pip install torch torchvision tqdm matplotlib tensorboardX -q

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
from torchvision import datasets, transforms

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size = 64

np.random.seed(42)
torch.manual_seed(42)

# Dataloaders
train_dataset = datasets.CIFAR10(
    root="/content/cifar10_data",
    train=True,
    download=True,
    transform=transforms.ToTensor()
)
test_dataset = datasets.CIFAR10(
    root="/content/cifar10_data",
    train=False,
    download=True,
    transform=transforms.ToTensor()
)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

100%|██████████| 170M/170M [00:08<00:00, 19.4MB/s]


Device in use: cuda


In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device in use: {device}")

def tp_relu(x, delta=1.):
    ind1 = (x < -1. * delta).float()
    ind2 = (x > delta).float()
    return .5 * (x + delta) * (1 - ind1) * (1 - ind2) + x * ind2

def tp_smoothed_relu(x, delta=1.):
    ind1 = (x < -1. * delta).float()
    ind2 = (x > delta).float()
    return (x + delta) ** 2 / (4 * delta) * (1 - ind1) * (1 - ind2) + x * ind2

class Normalize(nn.Module):
    def __init__(self, mu, std):
        super(Normalize, self).__init__()
        self.mu, self.std = mu, std

    def forward(self, x):
        return (x - self.mu.to(x.device)) / self.std.to(x.device)

class IdentityLayer(nn.Module):
    def forward(self, inputs):
        return inputs

class PreActBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, bn, learnable_bn, stride=1, activation='relu'):
        super(PreActBlock, self).__init__()
        self.activation = activation
        self.bn1 = nn.BatchNorm2d(in_planes, affine=learnable_bn) if bn else IdentityLayer()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=not learnable_bn)
        self.bn2 = nn.BatchNorm2d(planes, affine=learnable_bn) if bn else IdentityLayer()
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=not learnable_bn)

        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=not learnable_bn)
            )

    def act_function(self, preact):
        if self.activation == 'relu':
            return F.relu(preact)
        elif self.activation[:6] == '3prelu':
            return tp_relu(preact, delta=float(self.activation.split('relu')[1]))
        elif self.activation[:8] == '3psmooth':
            return tp_smoothed_relu(preact, delta=float(self.activation.split('smooth')[1]))
        else:
            beta = int(self.activation.split('softplus')[1])
            return F.softplus(preact, beta=beta)

    def forward(self, x):
        out = self.act_function(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.conv2(self.act_function(self.bn2(out)))
        out += shortcut
        return out

class PreActResNet(nn.Module):
    def __init__(self, block, num_blocks, n_cls, activation='relu', fts_before_bn=False, normal='none'):
        super(PreActResNet, self).__init__()
        self.bn = True
        self.learnable_bn = True
        self.in_planes = 64
        self.activation = activation
        self.fts_before_bn = fts_before_bn

        if normal == 'cifar10':
            self.mu = torch.tensor((0.4914, 0.4822, 0.4465)).view(1, 3, 1, 1)
            self.std = torch.tensor((0.2471, 0.2435, 0.2616)).view(1, 3, 1, 1)
        else:
            self.mu = torch.tensor((0.0, 0.0, 0.0)).view(1, 3, 1, 1)
            self.std = torch.tensor((1.0, 1.0, 1.0)).view(1, 3, 1, 1)
            print('no input normalization')

        self.normalize = Normalize(self.mu, self.std)
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=not self.learnable_bn)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.bn = nn.BatchNorm2d(512 * block.expansion)
        self.linear = nn.Linear(512*block.expansion, n_cls)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, self.bn, self.learnable_bn, stride, self.activation))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x, return_features=False):
        out = self.normalize(x)
        out = self.conv1(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        if return_features and self.fts_before_bn:
            return out.view(out.size(0), -1)
        out = F.relu(self.bn(out))
        if return_features:
            return out.view(out.size(0), -1)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def PreActResNet18(n_cls, activation='relu', fts_before_bn=False, normal='none'):
    return PreActResNet(PreActBlock, [2, 2, 2, 2], n_cls=n_cls,
                        activation=activation, fts_before_bn=fts_before_bn, normal=normal)

Device in use: cuda


# Implement the Attacks

Functions are given a simple useful signature that you can start with. Feel free to extend the signature as you see fit.

You may find it useful to create a 'batched' version of PGD that you can use to create the adversarial attack.

In [6]:
def pgd_linf_untargeted(model, x, labels, k, eps, eps_step):
    model.eval()
    ce_loss = torch.nn.CrossEntropyLoss()
    adv_x = x.clone().detach()
    adv_x.requires_grad_(True)
    for _ in range(k):
        adv_x.requires_grad_(True)
        model.zero_grad()
        output = model(adv_x)
        # TODO: Calculate the loss
        loss = ce_loss(output, labels)
        loss.backward()
        # TODO: compute the adv_x
        grad = adv_x.grad
        adv_x = adv_x  + (eps_step * torch.sign(grad))
        # find delta, clamp with eps
        #linf so can check values individually
        # change has to be eps bubble at max: delta = [-eps, eps]
        delta = torch.clamp(adv_x - x, min = -eps, max= eps)
        # calmp to image domain: adv_x = [0,1]
        adv_x = torch.clamp(x + delta, min = 0.0, max = 1.0).detach().requires_grad_()

    return adv_x

In [7]:
def pgd_l2_untargeted(model, x, labels, k, eps, eps_step):
    model.eval()
    ce_loss = torch.nn.CrossEntropyLoss()
    adv_x = x.clone().detach() # [B, C, H, W]
    adv_x.requires_grad_(True)

    # first take random step:
    # need to generate a tensor of same size as x but with range [-1,1]
    # then normalize and multiply to eps and apply to x
    delta = torch.randn_like(x) # [B, C, H, W]
    # tkae l2 norm for each image
    delta_norms = torch.norm(delta.view(x.size(0), -1), p = 2, dim = 1) # [B, ]
    # normalize delta of each image
    delta = delta/ (delta_norms.view(-1,1,1,1) + 1e-10)
    # scale to step size
    delta = delta * eps
    # we know that it will stay within limits becuase th emost it can move is eps_step < eps
    # so no need to project back onto bubble
    adv_x = torch.clamp(x + delta, min = 0.0 , max = 1.0).detach().requires_grad_()

    for _ in range(k):
        adv_x.requires_grad_(True)
        model.zero_grad()
        output = model(adv_x) # size = [B, 10]
        '''we are given x that can be multiple images [B,C,H,W] '''
        '''batch_size = B = number of images in batch'''
        '''1. Find New adv_x
           2. Project Onto x bubble'''
        batch_size = x.size(0)
        # TODO: Calculate the loss
        loss = ce_loss(output, labels )
        loss.backward()

        '''1. Find New Adv_X'''
        # TODO: compute the adv_x
        grad = adv_x.grad.data # [B, C, H, W] --> each value represnets gardient at pixel (H,W) of color channel C of picture B
        # reshape grad using view() to [B, C*H*W] so each row includes all pixels for an image
        # calculate l2 norm
        # resize to [B,1,1,1] to do computation with grad
        grad_norms = torch.norm(grad.view(batch_size, -1), p = 2, dim =1) # [B]
        grad_norms = grad_norms.view(batch_size,1,1,1) # [B,1,1,1]
        grad = grad/ (grad_norms +1e-10) #[B,C,H,W] --> back to gradient of each pixel-channel, add 1e-10 for divide by zero
        # increment adv_x by grad
        adv_x = adv_x + eps_step * grad

        '''2. Project Onto x Bubble'''
        # find delta, clamp with eps, project delta to the l2 ball
        '''x_proj = x + epsilong * ( delta )/ (max ( l2 norm of delta, epsilon  ))'''
        delta = adv_x - x #[B,C,H,W]
        # get l2 norms of delta
        delta_norms = torch.norm(delta.view(batch_size,-1), p = 2, dim = 1)
        # get tensor of ratios between epsilon and l2 norms
        factor = eps/ delta_norms
        # if l2 norm is greater than eps, we wnat to cap at eps (overall factor = 1), else we use l2 (overall factor = eps/delta_nroms)
        # factor = min(eps/delta_norms, eps/eps)
        factor = torch.min(factor,torch.ones_like(factor)) #[B, C, H, W]

        delta = delta * factor.view(-1,1,1,1)

        adv_x = torch.clamp(x + delta, min = 0.0 , max = 1.0).detach().requires_grad_()

        # HINT: https://github.com/Harry24k/adversarial-attacks-pytorch/blob/master/torchattacks/attacks/pgdl2.py <-- THANKS :D

    return adv_x

# Evaluate Single and Multi-Norm Robust Accuracy

In this section, we evaluate the model on the Linf and L2 attacks as well as union accuracy.

In [8]:
def test_model_on_single_attack(model, attack='pgd_linf', eps=0.1, k = 10):
    model.eval()
    tot_test, tot_acc = 0.0, 0.0
    for batch_idx, (x_batch, y_batch) in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating"):
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        #print('On batch: ', batch_idx)
        if attack == 'pgd_linf':
            # TODO: get x_adv untargeted pgd linf with eps, and eps_step=eps/4
            adv_x = pgd_linf_untargeted(model,x_batch,y_batch,k,eps = eps,eps_step= eps/4 )
            #
            with torch.no_grad():
                logits = model(adv_x)
                predicts = logits.argmax(dim =1 )


        elif attack == 'pgd_l2':
            # TODO: get x_adv untargeted pgd l2 with eps, and eps_step=eps/4
            adv_x = pgd_l2_untargeted(model,x_batch,y_batch,k,eps = eps,eps_step= eps/4)
            with torch.no_grad():
                logits = model(adv_x)
                predicts = logits.argmax(dim=1)
        else:
            pass

        # get the testing accuracy and update tot_test and tot_acc
        '''num got right = sum(predicts = y_batch)'''
        tot_acc += (predicts == y_batch).sum().item()
        '''num total'''
        tot_test += y_batch.size(0)
        #print('Accuracy So Far: %.5lf' % (tot_acc/tot_test), f'on {attack} attack with eps = {eps}')

    print('Robust accuracy %.5lf' % (tot_acc/tot_test), f'on {attack} attack with eps = {eps}')

## Single-Norm Robust Accuracy

####L_inf Adverserial Attack

In [None]:
# Evaluate on Linf attack with different models with eps = 8/255
model.load_state_dict(torch.load('models/pretr_Linf.pth', map_location=torch.device('cpu')))
# Evaluate on Linf attack with model 1 with eps = 8/255
test_model_on_single_attack(model=model, attack = 'pgd_linf', eps = 8/225)
'''Robust accuracy 0.46400 on pgd_linf attack with eps = 0.035555555555555556'''

In [None]:

model.load_state_dict(torch.load('models/pretr_L2.pth', map_location=torch.device('cpu')))
# Evaluate on Linf attack with model 2 with eps = 8/255
test_model_on_single_attack(model=model, attack = 'pgd_linf', eps = 8/225)
''' Robust accuracy 0.23940 on pgd_linf attack with eps = 0.035555555555555556'''


In [None]:
model.load_state_dict(torch.load('models/pretr_RAMP.pth', map_location=torch.device('cpu')))
# Evaluate on Linf attack with model 3 with eps = 8/255
test_model_on_single_attack(model=model, attack = 'pgd_linf', eps = 8/225)
'''Robust accuracy 0.44610 on pgd_linf attack with eps = 0.0355'''

#### L2 Adverserial ATtack

In [None]:
# Evaluate on L2 attack with different models with eps = 0.75
model.load_state_dict(torch.load('models/pretr_Linf.pth', map_location=torch.device('cpu')))
# Evaluate on Linf attack with model 1 with eps = 0.75
test_model_on_single_attack(model=model, attack = 'pgd_l2', eps = 0.75)
'''Robust accuracy 0.49990 on pgd_l2 attack with eps = 0.75'''

In [None]:

model.load_state_dict(torch.load('models/pretr_L2.pth', map_location=torch.device('cpu')))
# Evaluate on Linf attack with model 2 with eps = 0.75
test_model_on_single_attack(model=model, attack = 'pgd_l2', eps = 0.75)
'''Robust accuracy 0.55520 on pgd_l2 attack with eps = 0.75'''

In [None]:
model.load_state_dict(torch.load('models/pretr_RAMP.pth', map_location=torch.device('cpu')))
# Evaluate on Linf attack with model 3 with eps = 0.75
test_model_on_single_attack(model=model, attack = 'pgd_l2', eps = 0.75)
'''Robust accuracy 0.60250 on pgd_l2 attack with eps = 0.75'''

## Multi-Norm Robust Accuracy

In [None]:
def test_model_on_multi_attacks(model, eps_linf=8./255., eps_l2=0.75):
    k = 10
    model.eval()
    tot_test, tot_acc = 0.0, 0.0
    for batch_idx, (x_batch, y_batch) in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating"):
        #print("On batch: ", batch_idx)
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        # TODO: get x_adv_linf and x_adv_l2 untargeted pgd linf and l2 with eps, and eps_step=eps/4
        x_adv_linf = pgd_linf_untargeted(model, x_batch, y_batch, k, eps_linf,eps_linf/4)
        x_adv_l2 = pgd_l2_untargeted(model, x_batch, y_batch, k, eps_l2, eps_l2/4)

        ## calculate union accuracy: correct only if both attacks are correct

        out = model(x_adv_linf)
        pred_linf = torch.max(out, dim=1)[1]
        out = model(x_adv_l2)
        pred_l2 = torch.max(out, dim=1)[1]

        # TODO: get the testing accuracy with multi-norm robustness and update tot_test and tot_acc
        tot_acc += ((pred_linf == y_batch) & (pred_l2 == y_batch)).sum().item()
        tot_test += y_batch.size(0)
        #print('accuracy so far %.5lf' % (tot_acc/tot_test), f'on multi attacks')

    print('Robust accuracy %.5lf' % (tot_acc/tot_test), f'on multi attacks')

In [None]:
# Evaluate on L2 attack with different models with eps_linf = 8./255, eps_l2 = 0.75
model.load_state_dict(torch.load('models/pretr_Linf.pth'))
# Evaluate on multi attacks with model 1
test_model_on_multi_attacks(model)
'''Robust accuracy 0.47840 on multi attack'''


In [None]:

model.load_state_dict(torch.load('models/pretr_L2.pth'))
# Evaluate on multi attacks with model 2
test_model_on_multi_attacks(model)
'''Robust accuracy 0.30850 on multi attacks'''


In [None]:
model.load_state_dict(torch.load('models/pretr_RAMP.pth'))
# Evaluate on multi attacks with model 3
test_model_on_multi_attacks(model)
'''Robust accuracy 0.49750 on multi attacks'''


## Normal Accuracy

In [None]:
def test_model_standard(model):
    model.eval()
    tot_test, tot_acc = 0.0, 0.0
    for batch_idx, (x_batch, y_batch) in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating Standard"):
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        with torch.no_grad():
            # predict
            logits = model(x_batch)
            # find highest prob
            predicts = logits.argmax(dim=1)


        tot_acc += (predicts == y_batch).sum().item()
        tot_test += y_batch.size(0)

    acc = tot_acc / tot_test
    print(f'Standard accuracy {acc:.5f}')
    return acc

In [None]:
# get normal accuracy of model
model.load_state_dict(torch.load('models/pretr_Linf.pth', map_location=torch.device('cpu')))
test_model_standard(model)
#0.828

In [None]:
model.load_state_dict(torch.load('models/pretr_L2.pth', map_location=torch.device('cpu')))
test_model_standard(model)

In [None]:
model.load_state_dict(torch.load('models/pretr_RAMP.pth', map_location=torch.device('cpu')))
test_model_standard(model)