# Code for problem 1

Note: As training runs a long time, the code was exported to .py and executed under nohup before having the results pasted back and model checkpoints loaded in the notebook for evaluation.

In [1]:
%%capture
!pip install torch torchvision --index-url https://download.pytorch.org/whl/cu128

In [14]:
# !pip install tensorboardX

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import time
import matplotlib.pyplot as plt
from tqdm import tqdm

from torchvision import datasets, transforms
# from tensorboardX import SummaryWriter

use_cuda = True
device = torch.device("cuda" if use_cuda else "cpu")
batch_size = 256

np.random.seed(42)
torch.manual_seed(42)


## Dataloaders
train_dataset = datasets.CIFAR10('cifar10_data/', train=True, download=True, transform=transforms.Compose(
    [transforms.ToTensor()]
))
test_dataset = datasets.CIFAR10('cifar10_data/', train=False, download=True, transform=transforms.Compose(
    [transforms.ToTensor()]
))

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [15]:
def tp_relu(x, delta=1.):
    ind1 = (x < -1. * delta).float()
    ind2 = (x > delta).float()
    return .5 * (x + delta) * (1 - ind1) * (1 - ind2) + x * ind2

def tp_smoothed_relu(x, delta=1.):
    ind1 = (x < -1. * delta).float()
    ind2 = (x > delta).float()
    return (x + delta) ** 2 / (4 * delta) * (1 - ind1) * (1 - ind2) + x * ind2

class Normalize(nn.Module):
    def __init__(self, mu, std):
        super(Normalize, self).__init__()
        self.mu, self.std = mu, std

    def forward(self, x):
        return (x - self.mu) / self.std

class IdentityLayer(nn.Module):
    def forward(self, inputs):
        return inputs
    
class PreActBlock(nn.Module):
    '''Pre-activation version of the BasicBlock.'''
    expansion = 1

    def __init__(self, in_planes, planes, bn, learnable_bn, stride=1, activation='relu'):
        super(PreActBlock, self).__init__()
        self.collect_preact = True
        self.activation = activation
        self.avg_preacts = []
        self.bn1 = nn.BatchNorm2d(in_planes, affine=learnable_bn) if bn else IdentityLayer()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=not learnable_bn)
        self.bn2 = nn.BatchNorm2d(planes, affine=learnable_bn) if bn else IdentityLayer()
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=not learnable_bn)

        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=not learnable_bn)
            )

    def act_function(self, preact):
        if self.activation == 'relu':
            act = F.relu(preact)
        elif self.activation[:6] == '3prelu':
            act = tp_relu(preact, delta=float(self.activation.split('relu')[1]))
        elif self.activation[:8] == '3psmooth':
            act = tp_smoothed_relu(preact, delta=float(self.activation.split('smooth')[1]))
        else:
            assert self.activation[:8] == 'softplus'
            beta = int(self.activation.split('softplus')[1])
            act = F.softplus(preact, beta=beta)
        return act

    def forward(self, x):
        out = self.act_function(self.bn1(x))
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x  # Important: using out instead of x
        out = self.conv1(out)
        out = self.conv2(self.act_function(self.bn2(out)))
        out += shortcut
        return out

class PreActResNet(nn.Module):
    def __init__(self, block, num_blocks, n_cls, cuda=True, half_prec=False,
        activation='relu', fts_before_bn=False, normal='none'):
        super(PreActResNet, self).__init__()
        self.bn = True
        self.learnable_bn = True  # doesn't matter if self.bn=False
        self.in_planes = 64
        self.avg_preact = None
        self.activation = activation
        self.fts_before_bn = fts_before_bn
        if normal == 'cifar10':
            self.mu = torch.tensor((0.4914, 0.4822, 0.4465)).view(1, 3, 1, 1)
            self.std = torch.tensor((0.2471, 0.2435, 0.2616)).view(1, 3, 1, 1)
        else:
            self.mu = torch.tensor((0.0, 0.0, 0.0)).view(1, 3, 1, 1)
            self.std = torch.tensor((1.0, 1.0, 1.0)).view(1, 3, 1, 1)
            print('no input normalization')
        if cuda:
            self.mu = self.mu.cuda()
            self.std = self.std.cuda()
        if half_prec:
            self.mu = self.mu.half()
            self.std = self.std.half()

        self.normalize = Normalize(self.mu, self.std)
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=not self.learnable_bn)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.bn = nn.BatchNorm2d(512 * block.expansion)
        self.linear = nn.Linear(512*block.expansion, n_cls)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, self.bn, self.learnable_bn, stride, self.activation))
            # layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x, return_features=False):
        for layer in [*self.layer1, *self.layer2, *self.layer3, *self.layer4]:
            layer.avg_preacts = []

        out = self.normalize(x)
        out = self.conv1(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        if return_features and self.fts_before_bn:
            return out.view(out.size(0), -1)
        out = F.relu(self.bn(out))
        if return_features:
            return out.view(out.size(0), -1)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)

        return out


def PreActResNet18(n_cls, cuda=True, half_prec=False, activation='relu', fts_before_bn=False,
    normal='none'):
    #print('initializing PA RN-18 with act {}, normal {}'.format())
    return PreActResNet(PreActBlock, [2, 2, 2, 2], n_cls=n_cls, cuda=cuda, half_prec=half_prec,
        activation=activation, fts_before_bn=fts_before_bn, normal=normal)



In [16]:
def pgd_linf_untargeted(model, x, labels, k, eps, eps_step):
    # model.eval()
    ce_loss = torch.nn.CrossEntropyLoss()
    adv_x = x.clone().detach()
    adv_x.requires_grad_(True) 
    for _ in range(k):
        adv_x.requires_grad_(True)
        model.zero_grad()
        output = model(adv_x)
        # TODO: Calculate the loss
        loss = ce_loss(output, labels)
        loss.backward()
        # TODO: compute the adv_x
        # find delta, clamp with eps          
        delta = torch.clamp(adv_x + eps_step * adv_x.grad.sign() - x.data, min=-eps, max=eps)
        adv_x = torch.clamp(x.data + delta, min=0, max=1).detach_()
   
    return adv_x


In [17]:
def pgd_l2_untargeted(model, x, labels, k, eps, eps_step):
    # model.eval()
    ce_loss = torch.nn.CrossEntropyLoss()
    adv_x = x.clone().detach()
    adv_x.requires_grad_(True) 
    for _ in range(k):
        adv_x.requires_grad_(True)
        model.zero_grad()
        output = model(adv_x)
        batch_size = x.size()[0]
        # TODO: Calculate the loss
        loss = ce_loss(output, labels)
        loss.backward()
        grad = adv_x.grad.data
        # TODO: compute the adv_x
        # find delta, clamp with eps, project delta to the l2 ball
        # HINT: https://github.com/Harry24k/adversarial-attacks-pytorch/blob/master/torchattacks/attacks/pgdl2.py
        eps_for_division = 1e-10
        grad_norms = (
            torch.norm(grad.view(batch_size, -1), p=2, dim=1)
            + eps_for_division
        )  # nopep8
        grad = grad / grad_norms.view(batch_size, 1, 1, 1)
        adv_x = adv_x.detach() + eps_step * grad
        
        delta = adv_x  - x.data
        delta_norms = torch.norm(delta.view(batch_size, -1), p=2, dim=1)
        factor = eps / delta_norms
        factor = torch.min(factor, torch.ones_like(delta_norms))
        delta = delta * factor.view(-1, 1, 1, 1)
        
        adv_x = torch.clamp(x.data + delta, min=0, max=1).detach_()
   
    return adv_x

In [21]:
def train_model(model, num_epochs):
    criterion = nn.CrossEntropyLoss()
    # optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
    for epoch in tqdm(range(num_epochs)):
        model.train()
        running_loss = 0.0
        # tbar = tqdm(total=len(train_loader), desc="items of one epoch:")
        for i, data in enumerate(train_loader, 0):
            print(f"{i}/{len(train_loader)}", end="\r")
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        #     tbar.update(1)
        # tbar.close()
        scheduler.step()
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.3f}')
        
def train_model_adv(model, num_epochs, eps=8/255):
    criterion = nn.CrossEntropyLoss()
    # optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
        
    for epoch in tqdm(range(num_epochs)):
        model.train()
        running_loss = 0.0           
        for i, data in enumerate(train_loader, 0):
            print(f"{i}/{len(train_loader)}", end="\r")
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            # optimizer.zero_grad()
            # outputs = model(images)
            # loss = criterion(outputs, labels)
            # loss.backward()
            # optimizer.step()
            # running_loss += loss.item()
            
            # craft adversarial examples without updating BN running stats
            was_training = model.training
            model.eval()
            with torch.enable_grad():
                adv_data = pgd_linf_untargeted(model, images, labels, k=10, eps=eps, eps_step=eps/4)
            if was_training:
                model.train()

            optimizer.zero_grad()
            outputs = model(adv_data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        #     tbar.update(1)
        # tbar.close()
        scheduler.step()
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.3f}')


In [31]:
def test_model(model):
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        # tbar = tqdm(total=len(test_loader), desc="items of test:")
        for data in test_loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        #     tbar.update(1)
        # tbar.close()
        print(f'Accuracy on images: {100 * correct / total}')

def test_model_robust(model, attack='pgd', eps=8/255, k=10):
    model.eval()
    # with torch.no_grad():abs
    correct = 0
    total = 0
    # tbar = tqdm(total=len(test_loader), desc="items of test:")
    for data in test_loader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        if attack == 'fgsm':
            adv_images = pgd_linf_untargeted(model, images, labels, k=1, eps=eps, eps_step=eps)
        else:  # pgd
            adv_images = pgd_linf_untargeted(model, images, labels, k=k, eps=eps, eps_step=eps/4)
        outputs = model(adv_images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    #     tbar.update(1)
    # tbar.close()
    print(f'Accuracy on images: {100 * correct / total}')



In [None]:
epsilon_values = [0, 1/255, 2/255, 4/255, 8/255, 16/255, 32/255]

# intialize the model
model_adv = PreActResNet18(10, cuda=True, activation='softplus1', normal='cifar10').to(device)
train_model_adv(model_adv, num_epochs=100, eps=8/255)
# model_adv = torch.load("adv_model.pth", map_location="cuda" if torch.cuda.is_available() else "cpu", weights_only=False)

print("Testing adversarily trained model acuracy:")
test_model(model_adv)

for eps in epsilon_values:
    print(f"Testing adversarily trained model robustness, eps {eps}:")
    test_model_robust(model_adv, attack='pgd', eps=eps, k=10)

for eps in epsilon_values:
    print(f"Testing adversarily FGSM, eps {eps}:")
    test_model_robust(model_adv, attack='pgd', eps=eps, k=1)

In [30]:
# torch.save(model_adv, 'adv_model.pth')

In [33]:
epsilon_values = [0, 1/255, 2/255, 4/255, 8/255, 16/255, 32/255]

# intialize the model
model = PreActResNet18(10, cuda=True, activation='softplus1', normal='cifar10').to(device)
train_model(model, num_epochs=100)
# model = torch.load("base_model.pth", map_location="cuda" if torch.cuda.is_available() else "cpu", weights_only=False)

print("Testing adversarily trained model acuracy:")
test_model(model)

for eps in epsilon_values:
    print(f"Testing adversarily trained model robustness, eps {eps}:")
    test_model_robust(model, attack='pgd', eps=eps, k=10)

for eps in epsilon_values:
    print(f"Testing adversarily FGSM, eps {eps}:")
    test_model_robust(model, attack='pgd', eps=eps, k=1)

Testing adversarily trained model acuracy:
Accuracy on images: 83.27
Testing adversarily trained model robustness, eps 0:
Accuracy on images: 83.27
Testing adversarily trained model robustness, eps 0.00392156862745098:
Accuracy on images: 12.16
Testing adversarily trained model robustness, eps 0.00784313725490196:
Accuracy on images: 0.1
Testing adversarily trained model robustness, eps 0.01568627450980392:
Accuracy on images: 0.0
Testing adversarily trained model robustness, eps 0.03137254901960784:
Accuracy on images: 0.0
Testing adversarily trained model robustness, eps 0.06274509803921569:
Accuracy on images: 0.0
Testing adversarily trained model robustness, eps 0.12549019607843137:
Accuracy on images: 0.0
Testing adversarily FGSM, eps 0:
Accuracy on images: 83.27
Testing adversarily FGSM, eps 0.00392156862745098:
Accuracy on images: 66.86
Testing adversarily FGSM, eps 0.00784313725490196:
Accuracy on images: 48.05
Testing adversarily FGSM, eps 0.01568627450980392:
Accuracy on imag

In [None]:
torch.save(model, 'base_model.pth')