Example Github repo

https://github.com/A-LinCui/Adversarial_Patch_Attack/blob/master/Attack.py


https://github.com/jhayes14/adversarial-patch/tree/master/pretrained_models_pytorch 

https://uvadlc-notebooks.readthedocs.io/en/latest/tutorial_notebooks/tutorial10/Adversarial_Attacks.html

In [22]:
## Standard libraries
import os
import json
import math
import time
import numpy as np 
import scipy.linalg

## Imports for plotting
import matplotlib.pyplot as plt
%matplotlib inline 
from IPython.display import set_matplotlib_formats
set_matplotlib_formats('svg', 'pdf') # For export
from matplotlib.colors import to_rgb
import matplotlib
matplotlib.rcParams['lines.linewidth'] = 2.0
import seaborn as sns
sns.set()

## Progress bar
from tqdm.notebook import tqdm

## PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim

# Torchvision
import torchvision
from torchvision.datasets import CIFAR10
from torchvision import transforms

# from tools
%cd ..
from tools.resnet20 import ResNetCIFAR
from tools.train_util import *

/


  set_matplotlib_formats('svg', 'pdf') # For export


In [23]:
# set the device to GPU if available, else CPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cpu'

In [57]:
net = ResNetCIFAR(num_layers=20, Nbits=None)
net = net.to(device)
net.load_state_dict(torch.load("/workspaces/Adversarial_Patch_Attack/tools/pretrained_model_resnet20.pt",map_location=torch.device('cpu')))

<All keys matched successfully>

In [58]:
def test(net):
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),

    ])

    testset = torchvision.datasets.CIFAR10(root="/workspaces/Adversarial_Patch_Attack/data", train=False, download=True, transform=transform_test)
    testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

    classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

    criterion = nn.CrossEntropyLoss()

    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    num_val_steps = len(testloader)
    val_acc = correct / total
    print("Test Loss=%.4f, Test accuracy=%.4f" % (test_loss / (num_val_steps), val_acc))

In [59]:
test(net)

Files already downloaded and verified
Test Loss=0.3231, Test accuracy=0.9151


In [64]:
root_dir = "/workspaces/Adversarial_Patch_Attack/data"

In [74]:
def gen_mask(img, patch, h_pos=None, w_pos=None):
    if (h_pos==None): h_pos = np.random.randint(0,img.shape[2]-patch.shape[1]-1)
    if (w_pos==None): w_pos = np.random.randint(0,img.shape[3]-patch.shape[2]-1)
    applied_patch = torch.tensor(np.zeros(img.shape[1:]))
    for i in range(img.shape[0]):
        # if (h_pos==None): h_pos = np.random.randint(0,img.shape[2]-patch.shape[1]-1)
        # if (w_pos==None): w_pos = np.random.randint(0,img.shape[3]-patch.shape[2]-1)
        # img[i,:,h_pos:h_pos+patch.shape[1],w_pos:w_pos+patch.shape[2]] = patch
        applied_patch[:, h_pos:h_pos+patch.shape[1], w_pos:w_pos+patch.shape[2]] = patch
        mask = applied_patch.clone()
        mask[mask != 0] = 1.0
    return applied_patch, mask #img 

def place_patch(img, patch, h_pos=None, w_pos=None):
    for i in range(img.shape[0]):
        if (h_pos==None): h_pos = np.random.randint(0,img.shape[2]-patch.shape[1]-1)
        if (w_pos==None): w_pos = np.random.randint(0,img.shape[3]-patch.shape[2]-1)
        img[i,:,h_pos:h_pos+patch.shape[1],w_pos:w_pos+patch.shape[2]] = patch
    return img 

def train_untargeted_patch(net, epochs, patch_size=7, log_every_n=100, batch_size=128, rand_start=False):
    transform_train = transforms.Compose([
        # transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    trainset = torchvision.datasets.CIFAR10(root=root_dir, train=True, download=True, transform=transform_train)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=8)
    testset = torchvision.datasets.CIFAR10(root=root_dir, train=False, download=True, transform=transform_test)
    testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

    net.eval()

    patch_min = -torch.tensor([0.4914,0.4822,0.4465])[:,None,None].expand(-1,patch_size,patch_size)/torch.tensor([0.2023,0.1994,0.2010])[:,None,None].expand(-1,patch_size,patch_size)
    patch_max = (1.0 - torch.tensor([0.4914,0.4822,0.4465])[:,None,None].expand(-1,patch_size,patch_size))/torch.tensor([0.2023,0.1994,0.2010])[:,None,None].expand(-1,patch_size,patch_size)

    if rand_start:
        patch = nn.Parameter(torch.FloatTensor(3, patch_size, patch_size).uniform_(torch.max(patch_min),torch.min(patch_max)), requires_grad=True)
    else:
        patch = nn.Parameter(torch.zeros(3, patch_size, patch_size, dtype=torch.float64), requires_grad=True)

    print(patch.grad)
    optimizer = torch.optim.Adam([patch], lr=1e-1)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, threshold=0.005, threshold_mode='abs', factor=0.1)
    criterion = nn.CrossEntropyLoss()

    global_steps = 0
    best_acc = 1
    start = time.time()

    for epoch in range(epochs):
        print('\nEpoch: %d\tLearning Rate:%f' % (epoch, scheduler.optimizer.param_groups[0]['lr']))
        train_loss = 0
        success = 0
        total = 0
        for batch_idx, (inputs, targets) in enumerate(trainloader):
            # Normalize patch using stats of CIFAR-10
            # patch_normalized = (torch.clamp(patch,min=0.0,max=1.0) - torch.tensor([0.4914,0.4822,0.4465])[:,None,None].expand(-1,patch_size,patch_size)) / (torch.tensor([0.2023,0.1994,0.2010])[:,None,None].expand(-1,patch_size,patch_size))
            patch_normalized = torch.clamp(patch, min=patch_min, max=patch_max)
            # Place patch
            applied_patch, mask = gen_mask(inputs, patch_normalized)
            adv_inputs = torch.mul(mask.type(torch.FloatTensor), applied_patch.type(torch.FloatTensor)) + torch.mul((1 - mask.type(torch.FloatTensor)), inputs.type(torch.FloatTensor))
            # adv_inputs = place_patch(inputs, patch_normalized)
            adv_inputs, targets = adv_inputs.to(device), targets.to(device)
            outputs = net(adv_inputs)
            optimizer.zero_grad()
            loss = -criterion(outputs, targets)
            loss.backward()
            #patch_grad = patch.grad.data.clone().cpu()
            #patch = scheduler.optimizer.param_groups[0]['lr'] * patch_grad + patch
            optimizer.step()

            train_loss -= loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            success += predicted.eq(targets).sum().item()
            global_steps += 1

            if global_steps % log_every_n == 0:
                end = time.time()
                num_examples_per_second = log_every_n * batch_size / (end - start)
                print("[Step=%d]\tLoss=%.4f\tSuccess=%.4f\t%.1f examples/second"
                      % (global_steps, train_loss / (batch_idx + 1), 1.0-(success / total), num_examples_per_second))
                start = time.time()

        

        test_loss = 0
        success = 0
        total = 0
        with torch.no_grad():
            for batch_idx, (inputs, targets) in enumerate(testloader):
                patch_normalized = torch.clamp(patch, min=patch_min, max=patch_max)
                # adv_inputs = place_patch(inputs, patch_normalized)
                applied_patch, mask = gen_mask(inputs, patch_normalized)
                adv_inputs = torch.mul(mask.type(torch.FloatTensor), applied_patch.type(torch.FloatTensor)) + torch.mul((1 - mask.type(torch.FloatTensor)), inputs.type(torch.FloatTensor))
                adv_inputs, targets = adv_inputs.to(device), targets.to(device)
                outputs = net(adv_inputs)
                loss = -criterion(outputs, targets)

                test_loss -= loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                success += predicted.eq(targets).sum().item()
        num_val_steps = len(testloader)
        val_acc = success / total
        print("Test Loss=%.4f, Test Success=%.4f" % (test_loss / (num_val_steps), 1.0-val_acc))

        scheduler.step(1.0-success/total)

        if val_acc < best_acc:
            best_acc = val_acc
            print("Saving...")
            best_patch = (torch.tanh(patch.cpu().clone().detach())+1)/2

    return best_patch, best_acc

def train_targeted_patch(net, epochs, target, patch_size=7, log_every_n=100, batch_size=128, rand_start=False):

    transform_train = transforms.Compose([
        # transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])  

    trainset = torchvision.datasets.CIFAR10(root=root_dir, train=True, download=True, transform=transform_train)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=8)
    testset = torchvision.datasets.CIFAR10(root=root_dir, train=False, download=True, transform=transform_test)
    testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

    net.eval()

    if rand_start:
        patch = nn.Parameter(torch.FloatTensor(3, patch_size, patch_size).uniform_(-0.5,0.5), requires_grad=True)
    else:
        patch = nn.Parameter(torch.zeros(3, patch_size, patch_size, dtype=torch.float64), requires_grad=True)

    optimizer = torch.optim.SGD([patch], lr=1e-1, momentum=0.9, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=3, threshold=0.005, threshold_mode='abs', factor=0.1)
    criterion = nn.CrossEntropyLoss()

    global_steps = 0
    best_acc = 1
    start = time.time()

    for epoch in range(epochs):
        print('\nTargeted Attack\nEpoch: %d\tLearning Rate:%f' % (epoch, scheduler.optimizer.param_groups[0]['lr']))
        train_loss = 0
        success = 0
        total = 0
        for batch_idx, (inputs, _) in enumerate(trainloader):
            # Normalize patch using stats of CIFAR-10
            patch_normalized = (torch.tanh(patch) + 1 - 2*torch.tensor([0.4914,0.4822,0.4465])[:,None,None].expand(-1,patch_size,patch_size)) / (2*torch.tensor([0.2023,0.1994,0.2010])[:,None,None].expand(-1,patch_size,patch_size))
            # Place patch
            inputs = place_patch(inputs, patch_normalized)
            targets = torch.zeros(inputs.shape[0], dtype=torch.long).fill_(target)
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            optimizer.zero_grad()
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            success += predicted.eq(targets).sum().item()
            global_steps += 1

            if global_steps % log_every_n == 0:
                end = time.time()
                num_examples_per_second = log_every_n * batch_size / (end - start)
                print("[Step=%d]\tLoss=%.4f\tSuccess=%.4f\t%.1f examples/second"
                      % (global_steps, train_loss / (batch_idx + 1), (success / total), num_examples_per_second))
                start = time.time()

        test_loss = 0
        success = 0
        total = 0
        with torch.no_grad():
            for batch_idx, (inputs, _) in enumerate(testloader):
                inputs = place_patch(inputs, patch_normalized)
                targets = torch.zeros(inputs.shape[0], dtype=torch.long).fill_(target)
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = net(inputs)
                loss = criterion(outputs, targets)

                test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                success += predicted.eq(targets).sum().item()
        num_val_steps = len(testloader)
        val_acc = success / total
        print("Test Loss=%.4f, Test Success=%.4f" % (test_loss / (num_val_steps), val_acc))
        scheduler.step(success/total)

        if val_acc < best_acc:
            best_acc = val_acc
            print("Saving...")
            best_patch = (torch.tanh(patch.cpu().clone().detach())+1)/2

    return best_patch, best_acc

In [75]:
def random_noise_attack(model, device, dat, eps):
    # Add uniform random noise in [-eps,+eps]
    x_adv = dat.clone().detach() + torch.FloatTensor(dat.shape).uniform_(-eps, eps).to(device)
    # Clip the perturbed datapoints to ensure we are in bounds [0,1]
    # x_adv = torch.clamp(x_adv.clone().detach(), 0., 1.)
    # Return perturbed samples
    return x_adv

# Compute the gradient of the loss w.r.t. the input data
def gradient_wrt_data(model,device,data,lbl):
    dat = data.clone().detach()
    dat.requires_grad = True
    out = model(dat)
    loss = F.cross_entropy(out,lbl)
    model.zero_grad()
    loss.backward()
    data_grad = dat.grad.data
    return data_grad.data.detach() #Q: Why .data again? (dat.grad is already a tensor; infinite .data?)

def PGD_attack(model, device, dat, lbl, eps, alpha, iters, rand_start):
    # TODO: Implement the PGD attack
    # - dat and lbl are tensors
    # - eps and alpha are floats
    # - iters is an integer
    # - rand_start is a bool

    # x_nat is the natural (clean) data batch, we .clone().detach()
    # to copy it and detach it from our computational graph
    x_nat = dat.clone().detach()

    # If rand_start is True, add uniform noise to the sample within [-eps,+eps],
    # else just copy x_nat 
    if rand_start:
        x_nat_gd = random_noise_attack(model, device, x_nat, eps)

    else: 
        x_nat_gd = x_nat.clone()


    # Make sure the sample is projected into original distribution bounds [0,1]
    # x_nat_gd = torch.clamp(x_nat_gd, min=0, max=1)

    # Iterate over iters
    for iter_num in range(iters):
        # Compute gradient w.r.t. data (we give you this function, but understand it)
        grad = gradient_wrt_data(model, device, x_nat_gd, lbl)
        # Perturb the image using the gradient
        x_nat_gd = x_nat_gd + alpha * torch.sign(grad)
        # Clip the perturbed datapoints to ensure we still satisfy L_infinity constraint
        x_nat_gd = torch.clamp(x_nat_gd, min=x_nat-eps, max=x_nat+eps)
        # Clip the perturbed datapoints to ensure we are in bounds [0,1]
        # x_nat_gd = torch.clamp(x_nat_gd, min=0, max=1)

    # Return the final perturbed samples
    return x_nat_gd

def PGD_attack_example(whitebox, EPS, ITS):
    correct = 0.
    running_total = 0.

    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])  

    trainset = torchvision.datasets.CIFAR10(root=root_dir, train=True, download=True, transform=transform_train)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=8)
    testset = torchvision.datasets.CIFAR10(root=root_dir, train=False, download=True, transform=transform_test)
    testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

    for batch_idx,(data,labels) in enumerate(trainloader):
        data = data.to(device) 
        labels = labels.to(device)

        # TODO: Perform adversarial attack here
        ALP = 1.85*(EPS/ITS)
        adv_data = PGD_attack(net, device, data, labels, EPS, ALP, ITS, True)
        # Sanity checking if adversarial example is "legal"
        assert(torch.max(torch.abs(adv_data-data)) <= (EPS + 1e-5) )
        # assert(adv_data.max() == 1.) #Why has to be 1
        # assert(adv_data.min() == 0.)
        
        # Compute accuracy on perturbed data
        with torch.no_grad():
            # Stat keeping - whitebox
            outputs = whitebox(adv_data)
            _,preds = outputs.max(1)
            correct += preds.eq(labels).sum().item()
            running_total += labels.size(0)
        
            # Plot some samples
        if batch_idx == 1:
            plt.figure(figsize=(15,5))
            for jj in range(12):
                plt.subplot(2,6,jj+1);plt.imshow(adv_data[jj,0].cpu().numpy(),cmap='gray');plt.axis("off")
            plt.tight_layout()
            plt.show()

        return correct/running_total

In [76]:
PGD_attack_example(net,0.00,10)

Files already downloaded and verified




Files already downloaded and verified


0.984375

In [None]:
# generate patch with input sizes and class label
def generate_patch(class_labels, patch_sizes):
    # return a dictionary of patches
    patches = {}
    for class_label in class_labels:
        patches[class_label] = dict()
        for patch_size in patch_sizes:
            # if pretrained patch is available, load it
            if os.path.exists('patch_{}_{}.pt'.format(class_label, patch_size)):
                patches[class_label][patch_size] = torch.load('patch_{}_{}.pt'.format(class_label, patch_size))
            # otherwise, train a new patch
            else:
                patches[class_label][patch_size] = train_patch(class_label, patch_size)
                torch.save(patches[class_label][patch_size], 'patch_{}_{}.pt'.format(class_label, patch_size))
            # 
    return patches

In [None]:
def get_patches(class_names, patch_sizes):
    result_dict = dict()

    # Loop over all classes and patch sizes
    for name in class_names:
        result_dict[name] = dict()
        for patch_size in patch_sizes:
            # Get target class index
            c = label_names.index(name)
            file_name = os.path.join(CHECKPOINT_PATH, f"{name}_{patch_size}_patch.pt")
            # Load patch if pretrained file exists, otherwise start training
            if not os.path.isfile(file_name):
                patch, val_results = patch_attack(pretrained_model, target_class=c, patch_size=patch_size, num_epochs=5)
                print(f"Validation results for {name} and {patch_size}:", val_results)
                torch.save(patch, file_name)
            else:
                patch = torch.load(file_name)
            # Load evaluation results if exist, otherwise manually evaluate the patch
            if name in json_results:
                results = json_results[name][str(patch_size)]
            else:
                results = eval_patch(pretrained_model, patch, data_loader, target_class=c)    
            
            # Store results and the patches in a dict for better access
            result_dict[name][patch_size] = {
                "results": results,
                "patch": patch
            }
        
    return result_dict

In [1]:
class_names = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
patch_sizes = [3,5,7,16]

In [77]:
patch, acc = train_untargeted_patch(net,30)
plt.imshow(torch.mean(patch,0),cmap='gray')
plt.show()
print(patch)

Files already downloaded and verified
Files already downloaded and verified
None

Epoch: 0	Learning Rate:0.100000
[Step=100]	Loss=1.7410	Success=0.3115	503.1 examples/second
[Step=200]	Loss=2.3562	Success=0.3814	531.2 examples/second
[Step=300]	Loss=2.7906	Success=0.4200	535.0 examples/second
Test Loss=3.8536, Test Success=0.5084
Saving...

Epoch: 1	Learning Rate:0.100000
[Step=400]	Loss=3.0686	Success=0.4540	394.9 examples/second
[Step=500]	Loss=3.2885	Success=0.4751	508.2 examples/second
[Step=600]	Loss=3.3154	Success=0.4715	451.4 examples/second
[Step=700]	Loss=3.3019	Success=0.4699	449.9 examples/second
Test Loss=3.5786, Test Success=0.4976

Epoch: 2	Learning Rate:0.100000
[Step=800]	Loss=2.9888	Success=0.4431	347.8 examples/second
[Step=900]	Loss=3.2397	Success=0.4680	468.1 examples/second
[Step=1000]	Loss=3.2933	Success=0.4710	477.1 examples/second
[Step=1100]	Loss=3.3654	Success=0.4759	470.1 examples/second
Test Loss=3.5435, Test Success=0.4865

Epoch: 3	Learning Rate:0.100000
[

In [2]:
# train targeted patch
patch, acc = train_targeted_patch(net,30)
plt.imshow(torch.mean(patch,0),cmap='gray')
plt.show()
print(patch)
print(acc)

NameError: name 'train_targeted_patch' is not defined