# Dependencies

In [1]:
# EXTERNAL LIBRARIES
import numpy as np 
import re

import torch
import torch.nn as nn 
import torch.optim as optim 

import matplotlib.pyplot as plt
import torchvision
import torchvision.transforms as transforms

import os
import sys
import shutil

from torchvision.utils import save_image

In [2]:
# mister_ed
import recoloradv.mister_ed.loss_functions as lf 
import recoloradv.mister_ed.utils.pytorch_utils as utils
import recoloradv.mister_ed.utils.image_utils as img_utils
import recoloradv.mister_ed.cifar10.cifar_loader as cifar_loader
import recoloradv.mister_ed.cifar10.cifar_resnets as cifar_resnets
import recoloradv.mister_ed.adversarial_training as advtrain
import recoloradv.mister_ed.utils.checkpoints as checkpoints
import recoloradv.mister_ed.adversarial_perturbations as ap 
import recoloradv.mister_ed.adversarial_attacks as aa
import recoloradv.mister_ed.spatial_transformers as st
import recoloradv.mister_ed.config as config

# ReColorAdv
import recoloradv.perturbations as pt
import recoloradv.color_transformers as ct
import recoloradv.color_spaces as cs
from recoloradv import norms
from recoloradv.utils import load_pretrained_cifar10_model, get_attack_from_name

In [3]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Helper Functions

In [4]:
class HidePrint:
    def __enter__(self):
        self._temp_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._temp_stdout

In [5]:
def prepare_folders(level):
    if os.path.exists(f"../datasets/adv_{level}"):
        shutil.rmtree(f"../datasets/adv_{level}")
    for i in range(len(class_names)):
        os.makedirs(f"../datasets/adv_{level}/train/{i}")
        os.makedirs(f"../datasets/adv_{level}/test/{i}")

In [6]:
# For updating learning rate
def update_lr(optimizer, lr):
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

# Loading Data

In [7]:
# Batch size
batch_size = 100

# Normalize
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

# Transform
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Pad(4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32),
    transforms.ToTensor(),
    normalize
])

In [8]:
# CIFAR-10 Dataset
train_dataset = torchvision.datasets.CIFAR10(
    root='../datasets/',
    train=True,
    transform=transforms.ToTensor(),
    download=True
)

test_dataset = torchvision.datasets.CIFAR10(
    root='../datasets/',
    train=False,
    transform=transforms.ToTensor(),
    download=True
)

# Data loader
train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=False
)

test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False
)

Files already downloaded and verified
Files already downloaded and verified


In [9]:
class_names = [
    "airplane",
    "automobile",
    "bird",
    "cat",
    "deer",
    "dog",
    "frog",
    "horse",
    "ship",
    "truck",
]

# Model

In [10]:
# 3x3 convolution
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3,
                     stride=stride, padding=1, bias=False)

# Residual block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

# ResNet
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv3x3(3, 16)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0])
        self.layer2 = self.make_layer(block, 32, layers[1], 2)
        self.layer3 = self.make_layer(block, 64, layers[2], 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [11]:
model = ResNet(ResidualBlock, [2, 2, 2]).to(device)

# Hyper Parameters

In [12]:
# Hyper-parameters
num_epochs = 10
learning_rate = 0.001

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Image Batch Attack

In [13]:
_, normalizer = load_pretrained_cifar10_model('pretrained_models/normal.resnet32.pt')

if utils.use_gpu():
    model.cuda()

In [14]:
# This threat model defines the regularization parameters of the attack.
recoloradv_threat = ap.ThreatModel(pt.ReColorAdv, {
    'xform_class': ct.FullSpatial, 
    'cspace': cs.CIELUVColorSpace(), # controls the color space used
    'lp_style': 'inf',
    'lp_bound': [0.06, 0.06, 0.06],  # [epsilon_1, epsilon_2, epsilon_3]
    'xform_params': {
      'resolution_x': 16,            # R_1
      'resolution_y': 32,            # R_2
      'resolution_z': 32,            # R_3
    },
    'use_smooth_loss': True,
})


# Now, we define the main optimization term (the Carlini & Wagner f6 loss).
adv_loss = lf.CWLossF6(model, normalizer)

# We also need the smoothness loss.
smooth_loss = lf.PerturbationNormLoss(lp=2)

# We combine them with a RegularizedLoss object.
attack_loss = lf.RegularizedLoss({'adv': adv_loss, 'smooth': smooth_loss}, 
                                 {'adv': 1.0,      'smooth': 0.05},   # lambda = 0.05
                                 negate=True) # Need this true for PGD type attacks

# PGD is used to optimize the above loss.
pgd_attack_obj = aa.PGD(model, normalizer, recoloradv_threat, attack_loss)

In [15]:
def attack_images(images, labels):
    if utils.use_gpu():
        images = images.cuda()
        labels = labels.cuda()

    # We run the attack for 10 iterations at learning rate 0.01.
    with HidePrint():
        perturbation = pgd_attack_obj.attack(images, labels, num_iterations=10, signed=False, 
                                             optimizer=optim.Adam, optimizer_kwargs={'lr': 0.01},
                                             verbose=True)

    # Now, we can collect the successful adversarial examples and display them.
    successful_advs, successful_origs, idxs = perturbation.my_collect_successful(model, normalizer)

    ori = torch.stack([transform(img) for img in images])
    adv = torch.stack([transform(img) for img in successful_advs])
        
    return torch.cat((ori, adv), 0)

# Train Model

In [26]:
# Train the model
total_step = len(train_loader)
curr_lr = learning_rate
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = attack_images(images, labels)
        labels = torch.cat((labels, labels), 0)
        
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 100 == 0:
            print ("Epoch [{}/{}], Step [{}/{}] Loss: {:.4f}"
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))

    # Decay learning rate
    if (epoch+1) % 20 == 0:
        curr_lr /= 3
        update_lr(optimizer, curr_lr)

Epoch [1/10], Step [100/500] Loss: 1.9659
Epoch [1/10], Step [200/500] Loss: 1.9216
Epoch [1/10], Step [300/500] Loss: 1.5656
Epoch [1/10], Step [400/500] Loss: 1.5689
Epoch [1/10], Step [500/500] Loss: 1.6555
Epoch [2/10], Step [100/500] Loss: 1.5189
Epoch [2/10], Step [200/500] Loss: 1.4986
Epoch [2/10], Step [300/500] Loss: 1.2944
Epoch [2/10], Step [400/500] Loss: 1.2572
Epoch [2/10], Step [500/500] Loss: 1.3478
Epoch [3/10], Step [100/500] Loss: 1.3000
Epoch [3/10], Step [200/500] Loss: 1.3908
Epoch [3/10], Step [300/500] Loss: 1.1363
Epoch [3/10], Step [400/500] Loss: 1.1941
Epoch [3/10], Step [500/500] Loss: 1.2677
Epoch [4/10], Step [100/500] Loss: 1.1718
Epoch [4/10], Step [200/500] Loss: 1.2766
Epoch [4/10], Step [300/500] Loss: 0.9847
Epoch [4/10], Step [400/500] Loss: 1.0264
Epoch [4/10], Step [500/500] Loss: 1.1744
Epoch [5/10], Step [100/500] Loss: 1.1082
Epoch [5/10], Step [200/500] Loss: 1.2135
Epoch [5/10], Step [300/500] Loss: 0.8726
Epoch [5/10], Step [400/500] Loss:

In [27]:
# Save the model checkpoint
torch.save(model, 'pretrained_models/resnet_def_10_epochs')

In [None]:
model = torch.load('pretrained_models/resnet_10_epochs')

# Evaluation

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [35]:
# Transform
transform = transforms.Compose([
    transforms.Pad(4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32),
    transforms.ToTensor(),
    normalize
])

# Dataset
test_dataset = torchvision.datasets.CIFAR10(
    root='../datasets/',
    train=False,
    transform=transform,
    download=True
)

test_dataset_adv = torchvision.datasets.ImageFolder(root='../datasets/adv_iter/test', transform=transform)

# Data Loader
test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False
)

test_loader_adv = torch.utils.data.DataLoader(
    dataset=test_dataset_adv,
    batch_size=batch_size,
    shuffle=False
)

Files already downloaded and verified


In [32]:
def test(model, test_loader):
    model.eval()
    
    predictions = []
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            prediction = torch.max(model(images), 1)[1].data.squeeze()
            predictions += (prediction.tolist())
            correct += (prediction == labels).sum().item()
            total += len(labels)
    
    return correct / total, predictions

In [34]:
test(model, test_loader)[0]

0.7641

In [36]:
test(model, test_loader_adv)[0]

0.5989

# Lab

In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import recoloradv.mister_ed.utils.image_utils as img_utils

In [None]:
img_utils.show_images([successful_origs[:15], successful_advs[:15]])

In [None]:
img_utils.show_images(examples[:15])

In [None]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    
    outputs = model(successful_origs)
    _, predicted_ori = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted_ori == labels).sum().item()

    print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))

In [None]:
print([class_names[y] for y in labels[:15]])

In [None]:
idxs[:15]

In [None]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    
    outputs = model(successful_advs)
    _, predicted_adv = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted_adv == labels).sum().item()

    print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))

In [None]:
correct_idx = predicted_ori == labels
diff_idx = predicted_ori != predicted_adv
success_atk_idx = correct_idx & diff_idx

In [None]:
successful_ori = torch.stack([image for image, idx in zip(successful_origs, success_atk_idx) if idx])
successful_adv = torch.stack([image for image, idx in zip(successful_advs, success_atk_idx) if idx])
img_utils.show_images([successful_ori, successful_adv])

In [None]:
len(successful_ori)

In [None]:
labels = [label for label, idx in zip(predicted_ori, success_atk_idx) if idx]
print([class_names[y] for y in labels])

In [None]:
labels = [label for label, idx in zip(predicted_adv, success_atk_idx) if idx]
print([class_names[y] for y in labels])

In [None]:
successful_adv = torch.stack([successful_advs[i] for i in false_idx])
img_utils.show_images(successful_adv)

In [None]:
labels_adv = [predicted_adv[i] for i in false_idx]
print([class_names[y] for y in labels_adv])