<a href="https://colab.research.google.com/github/miltondp/k99_courses/blob/main/cis_522/classes/02_21-week6/adversarial_attack.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torchvision
import torch.nn as nn
from tqdm import tqdm
import torchvision.transforms as transforms
import torch.optim as optim
import matplotlib.pyplot as plt

In [2]:
class View(nn.Module):
    def __init__(self,o):
        super().__init__()
        self.o = o

    def forward(self,x):
        return x.view(-1, self.o)

## Define Network

In [11]:
class CNN(nn.Module):
    def __init__(self, c1=96, c2= 192):
        super().__init__()
        dropout_prob = 0.5

        def convbn(channel_in, channel_out, kernel_sz, stride_sz=1, padding=0):
            '''
            build a conv + bn block
            operations are in the following order:
            1. conv2d
            2. relu
            3. batchnorm

            use specified input channels (channel_in), output channels (channel_out), kernel size (kernel_sz),
            stride size (stride_sz), padding (padding)
            '''
            return nn.Sequential(
                nn.Conv2d(
                    in_channels=channel_in,
                    out_channels=channel_out,
                    kernel_size=kernel_sz,
                    stride=stride_sz,
                    padding=padding
                ),
                nn.ReLU(),
                nn.BatchNorm2d(num_features=channel_out),
            )

        '''
        construct the network with the following layers:
        block 1: 
        input channel: 3, output channel: c1, kernel_size: 3, stride: 1, padding: 1
        block 2: 
        input channel: c1, output channel: c1, kernel_size: 3, stride: 1, padding: 1
        block 3: 
        input channel: c1, output channel: c1, kernel_size: 3, stride: 2, padding: 1
        DROPOUT
        
        block 4: 
        input channel: c1, output channel: c2, kernel_size: 3, stride: 1, padding: 1
        block 5: 
        input channel: c2, output channel: c2, kernel_size: 3, stride: 1, padding: 1
        block 6: 
        input channel: c2, output channel: c2, kernel_size: 3, stride: 2, padding: 1
        DROPOUT

        block 4: 
        input channel: c2, output channel: c2, kernel_size: 3, stride: 1, padding: 1
        block 5: 
        input channel: c2, output channel: c2, kernel_size: 3, stride: 1, padding: 1
        block 6: 
        input channel: c2, output channel: 10, kernel_size: 10, stride: 1, padding: 1
        AVGPOOL
        View(10)
        '''
        self.m = nn.Sequential(
            convbn(3, c1, 3, 1, 1),
            convbn(c1, c1, 3, 1, 1),
            convbn(c1, c1, 3, 2, 1),
            nn.Dropout(0.2),

            convbn(c1, c2, 3, 1, 1),
            convbn(c2, c2, 3, 1, 1),
            convbn(c2, c2, 3, 2, 1),
            nn.Dropout(0.5),

            convbn(c2, c2, 3, 1, 1),
            convbn(c2, c2, 3, 1, 1),
            convbn(c2, 10, 1, 1, 1),
            nn.AvgPool2d(8),
            View(10),
          )

        # print the number of parameters in this network
        print('Num parameters: ', sum([p.numel() for p in self.m.parameters()]))

    def forward(self, x):
        return self.m(x)

## Training Loop

In [19]:
def train_nn(net, optimizer, criterion, train_loader, test_loader, epochs, model_name, plot, device):
    model = net.to(device)
    total_step = len(train_loader)
    train_loss_values = []
    train_error = []
    val_loss_values = []
    val_error = []
    best_acc = 0
    total_iter = 0
    flag1, flag2, flag3 = True, True, True

    for epoch in range(epochs):

        correct = 0
        total = 0
        running_loss = 0.0

        for i, (images, labels) in enumerate(train_loader):
            # Move tensors to configured device
            images = images.to(device)
            labels = labels.to(device)

            #Forward Pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            total_iter += labels.size(0)

            # manual learning rate scheduling
            if total_iter > 50000 and flag1:
                for op_params in optimizer.param_groups:
                    op_params['lr'] = 0.01
                    flag1 = False
                    print('lr_rate adjusted to 0.01')
            elif total_iter > 100000 and flag2:
                for op_params in optimizer.param_groups:
                    op_params['lr'] = 0.001
                    flag2 = False
                    print('lr_rate adjusted to 0.001')
            elif total_iter > 500000 and flag3:
                for op_params in optimizer.param_groups:
                    op_params['lr'] = 0.0001
                    flag3 = False
                    print('lr_rate adjusted to 0.0001')

            correct += (predicted == labels).sum().item()

            optimizer.step()

            if (i+1) % 1000 == 0:
                print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, epochs, i+1, total_step, loss.item()))
            if plot:
                info = { ('loss_' + model_name): loss.item() }

        train_loss_values.append(running_loss)
        train_error.append(100-100*correct/total)

        model.eval()
        with torch.no_grad():
          best_model = None
          best_model_acc = 0.0
          best_model_loss = 0.0

          for i, (images, labels) in enumerate(test_loader):
              # Move tensors to configured device
              images = images.to(device)
              labels = labels.to(device)

              #Forward Pass
              outputs = model.forward(images)

              # val_running_loss += loss.item()
              _, predicted = torch.max(outputs.data, 1)

              correct += (predicted == labels).sum().item()
              if (i+1) % 1000 == 0:
                  # print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, epochs, i+1, total_step, loss.item()))
                  print ('Epoch [{}/{}], Step [{}/{}]'.format(epoch+1, epochs, i+1, total_step))
              # if plot:
              #     info = { ('loss_' + model_name): loss.item() }

        acc = 100 * correct/total

        if acc > best_model_acc:
          print(f"New best model with accuracy: {acc}")
          best_model_acc = acc

          output_path = "./best_model.pth"
          torch.save(model.state_dict(), output_path)

          best_model = model.state_dict()

        # val_loss_values.append(val_running_loss)
        val_error.append(100-acc)

        # write an evaluation script
        # compute the overall testing acc of the current model
        # save the best model state_dict
    return val_error,val_loss_values,train_error,train_loss_values

## Train Network

In [20]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')   
print(device)

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=16, shuffle=True)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=16, shuffle=False)

classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

model = CNN().to(device)
epochs = 5
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
    model.parameters(), 
    lr=0.1, 
    momentum=0.9, 
    weight_decay=1e-3, 
    nesterov=True
)
val_error,val_loss_values,train_error,train_loss_values= train_nn(
    model, 
    optimizer, 
    criterion, 
    trainloader, 
    testloader, 
    epochs, 
    'cnn_curve', 
    True,
    device
)

cuda
Files already downloaded and verified
Files already downloaded and verified
Num parameters:  1667166
Epoch [1/5], Step [1000/3125], Loss: 1.7793
Epoch [1/5], Step [2000/3125], Loss: 1.3729
Epoch [1/5], Step [3000/3125], Loss: 1.8135
New best model with accuracy: 47.126
lr_rate adjusted to 0.01
Epoch [2/5], Step [1000/3125], Loss: 1.6715
Epoch [2/5], Step [2000/3125], Loss: 1.5117
Epoch [2/5], Step [3000/3125], Loss: 0.8063
New best model with accuracy: 74.666
lr_rate adjusted to 0.001
Epoch [3/5], Step [1000/3125], Loss: 0.9498
Epoch [3/5], Step [2000/3125], Loss: 1.2127
Epoch [3/5], Step [3000/3125], Loss: 1.0684
New best model with accuracy: 84.012
Epoch [4/5], Step [1000/3125], Loss: 0.8407
Epoch [4/5], Step [2000/3125], Loss: 0.8878
Epoch [4/5], Step [3000/3125], Loss: 1.1024
New best model with accuracy: 85.894
Epoch [5/5], Step [1000/3125], Loss: 0.9064
Epoch [5/5], Step [2000/3125], Loss: 0.9617
Epoch [5/5], Step [3000/3125], Loss: 0.9806
New best model with accuracy: 87.17

## Adversarial Attack

In [None]:
model = CNN().to(device)
model.load_state_dict(torch.load('./best_model.pth'))

criterion = nn.CrossEntropyLoss()

eps = 2e-4
correct = 0
total = 0

for img, lab in tqdm(testset):
    lab = torch.tensor(lab).type(torch.LongTensor)
    
    # move data to cuda
    img = ...
    lab = ...

    # require grad for the input (img)
    ...

    # get gradient w.r.t. in input (img)
    # hint: use img.grad.data.clone() to get the gradient w.r.t. the input (img)
    # define the loss w.r.t. input as 'dldx'
    ...

    # apply purturbation to the input (img)
    img += eps * torch.sign(dldx)

    # feed the purturbed image back to the model
    y_perturbed = model(img.unsqueeze(0))
    if torch.argmax(y_perturbed) == lab:
        correct += 1
    total += 1

print(f"\nAccuracy after 1-step perturbation: {correct / total}")