In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm # Displays a progress bar

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import Dataset, Subset, DataLoader, random_split

In [None]:
# Load the dataset and train, val, test splits
!rm -rf Fashion* 
print("Loading datasets...")
FASHION_transform = transforms.Compose([
    transforms.ToTensor(), # Transform from [0,255] uint8 to [0,1] float
])
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_train = Subset(FASHION_trainval, range(50000))
FASHION_val = Subset(FASHION_trainval, range(50000,60000))
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)
print("Done!")

In [None]:
# Create dataloaders
trainloader = DataLoader(FASHION_train, batch_size=64, shuffle=True)
valloader = DataLoader(FASHION_val, batch_size=64, shuffle=True)
testloader = DataLoader(FASHION_test, batch_size=64, shuffle=True)

In [None]:
class Network(nn.Module):
    def __init__(self):
        super().__init__()

        out1 = 24
        out2 = 64
        kernel1 = (5,5)
        kernel2 = (3,3)

        kernel_pool = (2,2)
        stride_pool = (2,2)

        self.cnn1 = nn.Conv2d(in_channels=1, out_channels=out1, kernel_size=kernel1, padding=0)  # size = 28-kernel1+1
        self.pool1 = nn.MaxPool2d(kernel_size=kernel_pool, stride=stride_pool)        # size = ( (28-kernel1+1)-kernel_pool / stride) + 1
        self.cnn2 = nn.Conv2d(in_channels=out1, out_channels=out2, kernel_size=kernel2) # size = ( (28-kernel1+1)-kernel_pool / stride) + 2 - kernel2
        
        temp = (((28-kernel1[0]+1)-kernel_pool[0]) // stride_pool[0]) + 2 - kernel2[0]
        h_fc3 = int((((temp)-kernel_pool[0]) // stride_pool[0]) + 1)
        
        self.fc3 = nn.Linear(h_fc3*h_fc3*out2, 256)
        self.fc4 = nn.Linear(256, 64)
        self.fc5 = nn.Linear(64, 24)
        self.drop1 = nn.Dropout2d(0.2)
        self.drop2 = nn.Dropout1d(0.2)
        self.fc6 = nn.Linear(24, 10)

        self.batchNorm1 = nn.BatchNorm2d(out1)
        self.batchNorm2 = nn.BatchNorm2d(out2)
        self.batchNorm3 = nn.BatchNorm1d(256)

    def forward(self,x):
        x = self.cnn1(x) 
        x = self.pool1(x)
        x = self.batchNorm1(x)
        x = nn.functional.relu(x)
        x = self.drop1(x)
        

        x = self.cnn2(x)
        x = self.pool1(x)
        x = self.batchNorm2(x)
        x = nn.functional.relu(x)
        

        x = x.view(x.shape[0], -1)
        x = self.fc3(x)
        x = self.batchNorm3(x)
        x = nn.functional.relu(x)
        self.drop2(x)


        x = x.view(x.shape[0], -1)
        x = self.fc4(x)
        x = nn.functional.relu(x)

        x = x.view(x.shape[0], -1)
        x = self.fc5(x)
        x = nn.functional.relu(x)


        x = x.view(x.shape[0], -1)
        x = self.fc6(x)

        return x

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu" # Configure device
model = Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer
optimizer = optim.Adam(model.parameters(), lr=2*1e-3, weight_decay=7*1e-4) # Specify optimizer and assign trainable parameters to it, weight_decay is L2 regularization strength

In [None]:
valid_acc_epoch = []
training_loss_epoch = []
def train(model=None, loader=None, num_epoch = 10, model_name="model1.pt", validation=valloader): # Train the model
    valid_acc_epoch = []
    training_loss_epoch = []
    print("Start training...")
    model.train() # Set the model to training mode
    max_valid_acc = 0
    for i in range(num_epoch):
        running_loss = []
        for batch, label in tqdm(loader):
            batch = batch.to(device)
            label = label.to(device)
            optimizer.zero_grad() # Clear gradients from the previous iteration
            pred = model(batch) # This will call Network.forward() that you implement
            loss = criterion(pred, label) # Calculate the loss
            running_loss.append(loss.item())
            loss.backward() # Backprop gradients to all tensors in the network
            optimizer.step() # Update trainable weights
        print("Epoch {} loss:{}".format(i+1,np.mean(running_loss))) # Print the average loss for this epoch
        valid_acc = evaluate(model, validation)
        valid_acc_epoch.append((i, valid_acc))
        training_loss_epoch.append((i, float(np.mean(running_loss))))
        if valid_acc > max_valid_acc:
          max_valid_acc = valid_acc
          torch.save(model.state_dict(), model_name)
        model.train()
    print("Done!")

In [None]:
def evaluate(model, loader): # Evaluate accuracy on validation / test set
    model.eval() # Set the model to evaluation mode
    correct = 0
    with torch.no_grad(): # Do not calculate grident to speed up computation
        for batch, label in tqdm(loader):
            batch = batch.to(device)
            label = label.to(device)
            pred = model(batch)
            correct += (torch.argmax(pred,dim=1)==label).sum().item()
    acc = correct/len(loader.dataset)
    print("Evaluation accuracy: {}".format(acc))
    return acc

In [None]:
train(model, trainloader, 15)

In [None]:
model.load_state_dict(torch.load('model1.pt'))
print("Evaluate on validation set...")
evaluate(model, valloader)
print("Evaluate on test set")
evaluate(model, testloader)

In [None]:
import random
random_adv_images = []
def test_pgd(model, device, testing, epsilon, alpha, steps):
    correct_predictions = 0
    adversarial = []

    # Loop over all examples in test set
    for index, (batch, label) in enumerate(testing):
        batch = batch.to(device)
        label = label.to(device)
        loss_function = nn.CrossEntropyLoss()
        

        for _ in range(steps):
          batch = batch.detach()
          batch.requires_grad = True
          output = model(batch)
          initial_prediction = output.max(1, keepdim=True)[1]

          loss = loss_function(output, label)
        
          # Zero all existing gradients
          model.zero_grad()

          # Calculate gradients of model in backward pass
          loss.backward()
          # Collect batchgrad
          batch_grad = batch.grad.data

          # FGSM Attack
          sign_batch_grad = batch_grad.sign()
          batch = batch + alpha*sign_batch_grad
          # Adding clipping to maintain [0,1] range
          batch = torch.clamp(batch, 0, 1)
          batch = torch.where(batch > batch + epsilon, batch + epsilon, batch)
          batch = torch.where(batch < batch - epsilon, batch - epsilon, batch)

        # Re-classify the perturbed batch
        output = model(batch)
        # Randomly select 10 images
        if steps == 10 and len(random_adv_images) < 10:
          random_adv_images.append(batch[random.randint(0,10)])

        perturbed_prediction = output.max(1, keepdim=True)[1] # get the index of the max log-probability
        perturbed_prediction_arr = torch.reshape(perturbed_prediction, (perturbed_prediction.size(dim=0), )).numpy()
        label_arr = label.numpy()
        for index in range(label.size(dim=0)):
          if perturbed_prediction_arr[index] == label_arr[index]:
              correct_predictions = correct_predictions + 1

    accuracy = correct_predictions / (len(testing)*64) # multiplying by 64 due to batch size
    print("accuracy after PGD attack for alpha = {}, epsilon = {}, attack steps = {} is {} or {}%".format(alpha, epsilon, attack_steps, accuracy, accuracy*100))


for alpha in (0.01, 0.02):
  for attack_steps in (1, 2, 5, 10):
    test_pgd(model, device, testloader, 25/255, alpha, attack_steps)

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)

train_valloader = DataLoader(FASHION_trainval, batch_size=64, shuffle=True)


In [None]:
adv_train_pgd = []
# get pgd adversarial images for training data
def train_with_pgd(model, device, training, epsilon, alpha, steps):
    # Loop over all examples in training set
    for index, (batch, label) in enumerate(training):
        if index == 2:
          break
        batch = batch.to(device)
        label = label.to(device)
        loss_function = nn.CrossEntropyLoss()
        

        for _ in range(steps):
          batch = batch.detach()
          batch.requires_grad = True
          output = model(batch)
          initial_prediction = output.max(1, keepdim=True)[1]

          loss = loss_function(output, label)
        
          # Zero all existing gradients
          model.zero_grad()

          # Calculate gradients of model in backward pass
          loss.backward()
          # Collect batchgrad
          batch_grad = batch.grad.data

          # FGSM Attack
          sign_batch_grad = batch_grad.sign()
          batch = batch + alpha*sign_batch_grad
          # Adding clipping to maintain [0,1] range
          batch = torch.clamp(batch, 0, 1)
          batch = torch.where(batch > batch + epsilon, batch + epsilon, batch)
          batch = torch.where(batch < batch - epsilon, batch - epsilon, batch)

        # Re-classify the perturbed batch
        # print(len(batch), len(batch[0]), batch)

        label_arr = label.numpy()
        for index, pgd_image in enumerate(batch):
          adv_train_pgd.append((pgd_image, label_arr[index]))

for alpha in (0.01, 0.02):
  for attack_steps in (1, 2, 5, 10):
    train_with_pgd(model, device, train_valloader, 25/255, alpha, attack_steps)

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)


FASHION_trainval_pgd = torch.utils.data.ConcatDataset([FASHION_trainval, adv_train_pgd])
train_len = int(0.8*len(FASHION_trainval_pgd))
FASHION_train_pgd = Subset(FASHION_trainval_pgd, range(train_len))
FASHION_val_pgd = Subset(FASHION_trainval_pgd, range(train_len, len(FASHION_trainval_pgd)))

trainloader_pgd = DataLoader(FASHION_train_pgd, batch_size=64, shuffle=True)
valloader_pgd = DataLoader(FASHION_val_pgd, batch_size=64, shuffle=True)


model_pgd = Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer
optimizer = optim.Adam(model_pgd.parameters(), lr=2*1e-3, weight_decay=7*1e-4)

In [None]:
train(model_pgd, trainloader_pgd, 10, "model_pgd", valloader_pgd)

In [None]:
# run adversarial test images on model trained with pgd images
for alpha in (0.01, 0.02):
  for attack_steps in (1, 2, 5, 10):
    test_pgd(model_pgd, device, testloader, 25/255, alpha, attack_steps)

uap

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)
testloader = DataLoader(FASHION_test, batch_size=64, shuffle=True)

In [None]:
# get pgd adversarial images for training data
def test_with_uap(model, device, testing, epsilon, alpha, steps):
    correct_predictions = 0
    # Loop over all examples in training set
    loss_function = nn.CrossEntropyLoss()
    model.eval()
    for step in range(steps):
      uap = torch.zeros(64, 1, 28, 28).to(device)
      uap.requires_grad = True
      optimizer = optim.SGD([uap], lr=alpha)
      for index, (batch, label) in enumerate(testing):
          if len(batch) != 64: # skipping last batch
            continue
          uap = uap.detach()
          uap = uap.to(device)
          batch = batch.to(device)
          label = label.to(device)
          batch = batch.detach()
          batch.requires_grad = True

          adv_images = torch.clamp(batch + uap, 0, 1)

          output = model(adv_images)
          initial_prediction = output.max(1, keepdim=True)[1]

          loss = loss_function(output, label)
        
          # Zero all existing gradients
          optimizer.zero_grad()
          # Calculate gradients of model in backward pass
          loss.backward()
          uap = uap + alpha*batch.grad.data.sign()
          # Adding clipping to maintain [0,1] range
          uap = torch.clamp(uap, 0, 1)
          uap = torch.where(uap > uap + epsilon, uap + epsilon, uap)
          uap = torch.where(uap < uap - epsilon, uap - epsilon, uap)



      for index, (batch, label) in enumerate(testing):
          batch = batch.to(device)
          label = label.to(device)
          batch = batch.detach()
          batch.requires_grad = True
          if len(batch) != 64:
            continue
          batch = batch + uap
          batch = torch.clamp(batch, 0, 1)
          batch = torch.where(batch > batch + epsilon, batch + epsilon, batch)
          batch = torch.where(batch < batch - epsilon, batch - epsilon, batch)

          if step == steps-1:
            output = model(batch)
            perturbed_prediction = output.max(1, keepdim=True)[1] # get the index of the max log-probability
            perturbed_prediction_arr = torch.reshape(perturbed_prediction, (perturbed_prediction.size(dim=0), )).numpy()
            label_arr = label.numpy()

            for index in range(label.size(dim=0)):
              if perturbed_prediction_arr[index] == label_arr[index]:
                  correct_predictions = correct_predictions + 1

    accuracy = correct_predictions / (len(testing)*64) # multiplying by 64 due to batch size
    print("accuracy after UAP attack for alpha = {}, epsilon = {}, iterations =  is {} or {}%".format(alpha, epsilon, accuracy, accuracy*100))

for alpha in (0.01, 0.02):
  # for attack_steps in (1):
  test_with_uap(model, device, testloader, 25/255, alpha, 1)

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)


train_valloader = DataLoader(FASHION_trainval, batch_size=64, shuffle=True)

In [None]:
adv_train_uap = []
def train_with_uap(model, device, testing, epsilon, alpha, steps):
    # Loop over all examples in training set
    loss_function = nn.CrossEntropyLoss()
    model.eval()
    for step in range(steps):
      uap = torch.zeros(64, 1, 28, 28).to(device)
      uap.requires_grad = True
      optimizer = optim.SGD([uap], lr=alpha)
      for index, (batch, label) in enumerate(testing):
          if len(batch) != 64: # skipping last batch
            continue
          uap = uap.detach()
          uap = uap.to(device)
          batch = batch.to(device)
          label = label.to(device)
          batch = batch.detach()
          batch.requires_grad = True

          adv_images = torch.clamp(batch + uap, 0, 1)

          output = model(adv_images)
          initial_prediction = output.max(1, keepdim=True)[1]

          loss = loss_function(output, label)
        
          # Zero all existing gradients
          optimizer.zero_grad()
          # Calculate gradients of model in backward pass
          loss.backward()
          uap = uap + alpha*batch.grad.data.sign()
          # Adding clipping to maintain [0,1] range
          uap = torch.clamp(uap, 0, 1)
          uap = torch.where(uap > uap + epsilon, uap + epsilon, uap)
          uap = torch.where(uap < uap - epsilon, uap - epsilon, uap)



      for index, (batch, label) in enumerate(testing):
          if index == 4:
            break
          batch = batch.to(device)
          label = label.to(device)
          batch = batch.detach()
          batch.requires_grad = True
          if len(batch) != 64:
            continue
          batch = batch + uap
          batch = torch.clamp(batch, 0, 1)
          batch = torch.where(batch > batch + epsilon, batch + epsilon, batch)
          batch = torch.where(batch < batch - epsilon, batch - epsilon, batch)
          label_arr = label.numpy()
          for index, uap_image in enumerate(batch):
            adv_train_uap.append((uap_image, label_arr[index]))

for alpha in (0.01, 0.02):
  # for attack_steps in (1):
  train_with_uap(model, device, train_valloader, 25/255, alpha, 1)

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)


FASHION_trainval_uap = torch.utils.data.ConcatDataset([FASHION_trainval, adv_train_uap])
train_len = int(0.8*len(FASHION_trainval_uap))
FASHION_train_uap = Subset(FASHION_trainval_uap, range(train_len))
FASHION_val_uap = Subset(FASHION_trainval_uap, range(train_len, len(FASHION_trainval_uap)))

trainloader_uap = DataLoader(FASHION_train_uap, batch_size=64, shuffle=True)
valloader_uap = DataLoader(FASHION_val_uap, batch_size=64, shuffle=True)


model_uap = Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer
optimizer = optim.Adam(model_uap.parameters(), lr=2*1e-3, weight_decay=7*1e-4)

len(FASHION_trainval_uap)

In [None]:
train(model_uap, trainloader_uap, 10, "model_uap", valloader_uap)

In [None]:
for alpha in (0.01, 0.02):
  test_with_uap(model_uap, device, testloader, 25/255, alpha, 1)

deepfool

In [None]:
!pip install torchattacks

In [None]:
import sys

sys.path.insert(0, '..')
import torchattacks
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm # Displays a progress bar

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import Dataset, Subset, DataLoader, random_split

In [None]:
# Load the dataset and train, val, test splits
!rm -rf Fashion* 
print("Loading datasets...")
FASHION_transform = transforms.Compose([
    transforms.ToTensor(), # Transform from [0,255] uint8 to [0,1] float
])
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_train = Subset(FASHION_trainval, range(50000))
FASHION_val = Subset(FASHION_trainval, range(50000,60000))
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)
print("Done!")

In [None]:
# Create dataloaders
trainloader = DataLoader(FASHION_train, batch_size=64,shuffle=True)
valloader = DataLoader(FASHION_val, batch_size=64, shuffle=True)
testloader = DataLoader(FASHION_test, batch_size=64, shuffle=True)

In [None]:
images1, labels1 = next(iter(trainloader))

In [None]:
model = Network().to(device)
model.load_state_dict(torch.load("model1.pt"))
model.eval()

In [None]:
from torchattacks import DeepFool, Square
from utils import imshow, get_pred

In [None]:
atk = DeepFool(model)
atk2 = Square(model)

In [None]:
def test_with_DeepFool(model, device, testing, attack):
  correct_predictions = 0

  for index, (batch, label) in enumerate(testing):
      batch = batch.to(device)
      label = label.to(device)
      # if index == 10: # testing
      #   break
      adv_image = attack(batch, label)
      output = model(adv_image)
      perturbed_prediction = output.max(1, keepdim=True)[1] # get the index of the max log-probability
      perturbed_prediction_arr = torch.reshape(perturbed_prediction, (perturbed_prediction.size(dim=0), ))
      label_arr = label
      for index in range(label.size(dim=0)):
        if perturbed_prediction_arr[index] == label_arr[index]:
          correct_predictions = correct_predictions + 1
  accuracy = correct_predictions / (len(testing)*64) # multiplying by 64 due to batch size
  print("accuracy after DeepFool attack is {} or {}%".format(accuracy, accuracy*100))
test_with_DeepFool(model, device, testloader, atk)

In [None]:
adv_images = atk(images1, labels1)
idx = 21
pre = get_pred(model, adv_images[idx:idx+1], device)
imshow(adv_images[idx:idx+1], title="True:%d, Pre:%d"%(labels1[idx], pre))

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)


train_valloader = DataLoader(FASHION_trainval, batch_size=64, shuffle=True)

In [None]:
adv_train_DeepFool = []

def train_with_DeepFool(model, device, training, attack):
    # Loop over all examples in training set
    for index, (batch, label) in enumerate(training):
        if index == 8:
          break
        batch = batch.to(device)
        label = label.to(device)
        adv_image = attack(batch, label)
        for index, DeepFool_image in enumerate(adv_image):
          adv_train_DeepFool.append((DeepFool_image, label[index]))
train_with_DeepFool(model, device, train_valloader, atk)  

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)

FASHION_trainval_DeepFool = torch.utils.data.ConcatDataset([FASHION_trainval, adv_train_DeepFool])
train_len = int(0.8*len(FASHION_trainval_DeepFool))
FASHION_train_DeepFool = Subset(FASHION_trainval_DeepFool, range(train_len))
FASHION_val_DeepFool = Subset(FASHION_trainval_DeepFool, range(train_len, len(FASHION_trainval_DeepFool)))

trainloader_DeepFool = DataLoader(FASHION_train_DeepFool, batch_size=64, shuffle=True)
valloader_DeepFool = DataLoader(FASHION_val_DeepFool, batch_size=64, shuffle=True)


model_DeepFool = Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer
optimizer = optim.Adam(model_DeepFool.parameters(), lr=2*1e-3, weight_decay=7*1e-4)

THIS THROWS ERROR

In [None]:
# train(model_DeepFool, trainloader_DeepFool, 10, "model_DeepFool", valloader_DeepFool)

Square Attack

In [None]:
# Load the dataset and train, val, test splits
!rm -rf Fashion* 
print("Loading datasets...")
FASHION_transform = transforms.Compose([
    transforms.ToTensor(), # Transform from [0,255] uint8 to [0,1] float
])
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_train = Subset(FASHION_trainval, range(50000))
FASHION_val = Subset(FASHION_trainval, range(50000,60000))
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)
print("Done!")

In [None]:
# Create dataloaders
trainloader = DataLoader(FASHION_train, batch_size=64,shuffle=True)
valloader = DataLoader(FASHION_val, batch_size=64, shuffle=True)
testloader = DataLoader(FASHION_test, batch_size=64, shuffle=True)

In [None]:
atk3 = Square(model)

In [None]:
def test_with_Square(model, device, testing, attack):
  correct_predictions = 0


  for index, (batch, label) in enumerate(testing):
      batch = batch.to(device)
      label = label.to(device)
      # if index == 10: # testing
      #   break
      print("Index = ",index)
      adv_image = attack(batch, label)
      output = model(adv_image)
      perturbed_prediction = output.max(1, keepdim=True)[1] # get the index of the max log-probability
      perturbed_prediction_arr = torch.reshape(perturbed_prediction, (perturbed_prediction.size(dim=0), ))
      label_arr = label
      for index in range(label.size(dim=0)):
        if perturbed_prediction_arr[index] == label_arr[index]:
          correct_predictions = correct_predictions + 1
  accuracy = correct_predictions / (len(testing)*64) # multiplying by 64 due to batch size
  print("accuracy after DeepFool attack is {} or {}%".format(accuracy, accuracy*100))
test_with_DeepFool(model, device, testloader, atk3)


In [None]:
#Generate adv image

idx = 0
pre = get_pred(model, adv_images[idx:idx+1], device)
imshow(adv_images[idx:idx+1], title="True:%d, Pre:%d"%(labels1[idx], pre))

In [None]:
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)

train_valloader = DataLoader(FASHION_trainval, batch_size=64, shuffle=True)

In [None]:
adv_train_Square = []
def train_with_Square(model, device, training, attack):
    # Loop over all examples in training set
    for index, (batch, label) in enumerate(training):
        if index == 8:
          break
        batch = batch.to(device)
        label = label.to(device)
        adv_image = attack(batch, label)
        for index, Square_image in enumerate(adv_image):
          
          adv_train_Square.append((Square_image, label[index]))
train_with_Square(model, device, train_valloader,atk3)  

In [None]:
Create databse
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)

FASHION_trainval_Square = torch.utils.data.ConcatDataset([FASHION_trainval, adv_train_Square])
train_len = int(0.8*len(FASHION_trainval_Square))
FASHION_train_Square = Subset(FASHION_trainval_Square, range(train_len))
FASHION_val_Square = Subset(FASHION_trainval_Square, range(train_len, len(FASHION_trainval_Square)))

In [None]:
trainloader_Square = DataLoader(FASHION_train_Square, batch_size=64, shuffle=True)
valloader_Square = DataLoader(FASHION_val_Square, batch_size=64, shuffle=True)

model_Square = Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer
optimizer = optim.Adam(model_Square.parameters(), lr=2*1e-3, weight_decay=7*1e-4)

In [None]:
valid_acc_epoch = []
training_loss_epoch = []
def train(model, loader, num_epoch = 10, model_name="model1", validation=valloader): # Train the model
    valid_acc_epoch = []
    training_loss_epoch = []
    print("Start training...")
    model.train() # Set the model to training mode
    max_valid_acc = 0
    for i in range(num_epoch):
        running_loss = []
        for batch, label in tqdm(loader):
            batch = batch.to(device)
            label = label.to(device)
            optimizer.zero_grad() # Clear gradients from the previous iteration
            pred = model(batch) # This will call Network.forward() that you implement
            loss = criterion(pred, label) # Calculate the loss
            running_loss.append(loss.item())
            loss.backward() # Backprop gradients to all tensors in the network
            optimizer.step() # Update trainable weights
        print("Epoch {} loss:{}".format(i+1,np.mean(running_loss))) # Print the average loss for this epoch
        valid_acc = evaluate(model, validation)
        print("val acc = ",valid_acc)
        valid_acc_epoch.append((i, valid_acc))
        # print("line 2")
        training_loss_epoch.append((i, float(np.mean(running_loss))))
        if valid_acc > max_valid_acc:
          max_valid_acc = valid_acc
          torch.save(model.state_dict(), model_name)
        model.train()
    print("Done!")

In [None]:
def evaluate(model, loader): # Evaluate accuracy on validation / test set
    model.eval() # Set the model to evaluation mode
    correct = 0
    print("inside eval")
    with torch.no_grad(): # Do not calculate grident to speed up computation
        print("between with and for")
        for batch, label in tqdm(loader):
            # print("label = ",label)
            print(len(batch))
            batch = batch.to(device)
            
            # print("batch = ",batch)
            label = label.to(device)
            pred = model(batch)
            # if label!=pred:
            #   print("label = ",label)
            #   print("pred = ",pred)
            correct += (torch.argmax(pred,dim=1)==label).sum().item()
    acc = correct/len(loader.dataset)
    print("Evaluation accuracy: {}".format(acc))
    return acc

THIS THROWS ERROR

In [None]:
# train(model_Square, trainloader_Square, 10, "model_Square", valloader_Square)

Carlini-Wagner

In [None]:
import torch
import torch.nn.functional as F
import numpy as np

def cw_attack(model, device, images, labels, targeted=False, num_classes=10, max_iterations=1000, learning_rate=0.01, c=1e-4, kappa=0):
    # Convert images and labels to tensors and move to device
    images = images.to(device)
    labels = labels.to(device)

    # Initialize the perturbation
    batch_size = images.shape[0]
    perturbation = torch.zeros_like(images, requires_grad=True)

    # Initialize binary search parameters
    low_confidence = 0
    high_confidence = 1e10

    # Define the loss function
    def cw_loss_fn(outputs, labels, confidence):
        # Calculate the logits for the target class
        if targeted:
            target_logits = outputs[:, labels]
            other_logits = torch.max(outputs - target_logits.unsqueeze(1), dim=1)[0]
        else:
            target_logits = torch.max(outputs - 1e4*labels, dim=1)[0]
            other_logits = torch.max(outputs - target_logits.unsqueeze(1), dim=1)[0]

        # Calculate the margin loss
        margin_loss = torch.clamp(other_logits - target_logits + kappa, min=0)
        
        # Calculate the L2 distance between the perturbation and the original image
        l2_distance = torch.norm(perturbation.view(batch_size, -1), p=2, dim=1)
        
        # Calculate the final loss
        loss = confidence*margin_loss.mean() + c*l2_distance.mean()

        return loss

    # Define the optimizer
    optimizer = torch.optim.Adam([perturbation], lr=learning_rate)

    # Perform the attack
    for i in range(max_iterations):
        # Forward pass
        outputs = model(images + perturbation)
        outputs = outputs.max(1, keepdim=True)[1]
        loss = cw_loss_fn(outputs, labels, high_confidence)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Project the perturbation onto the L-infinity ball of radius epsilon
        perturbation.data = torch.clamp(perturbation.data, -1, 1)
        perturbation.data *= torch.clamp((1.0 - 1e-10) * torch.ones_like(images) - torch.abs(images + perturbation.data - images), 0, 1)
        # perturbation.data *= torch.clamp((1.0 - 1e-10) * torch.ones_like(torch.from_numpy(images)) - torch.abs(torch.from_numpy(images) + perturbation.data - torch.from_numpy(images)), 0, 1)
        perturbation.grad.zero_()

        # Check if the attack is successful
        confidence = F.softmax(outputs.float(), dim=1)
        predicted_labels = torch.argmax(outputs, dim=1)
        correct_predictions = (predicted_labels == labels)
        correct_and_confident = correct_predictions & (confidence > 0.99)
        success = correct_and_confident.sum().item() == 0

        # Update the binary search parameters
        if targeted:
            if success:
                high_confidence = confidence[~correct_predictions].min().item()
            else:
                low_confidence = confidence[correct_and_confident].max().item()
        else:
            if success:
                low_confidence = confidence[correct_predictions].max().item()
            else:
                high_confidence = confidence[~correct_predictions].min().item()

        # Check if the attack is successful and within the desired confidence level
        if success and (high_confidence - low_confidence) < 1e-6:
            break

    # Return the perturbed images
    adv_images = images + perturbation.detach()

    return adv_images


correct_predictions = 0
for index, (batch, label) in enumerate(testloader):
  batch = batch.to(device)
  label = label.to(device)
  if len(batch) != 64:
    continue
  batch_adv = cw_attack(model, device, batch, label)
  output = model(batch_adv)
  perturbed_prediction = output.max(1, keepdim=True)[1] # get the index of the max log-probability
  perturbed_prediction_arr = torch.reshape(perturbed_prediction, (perturbed_prediction.size(dim=0), )).numpy()
  label_arr = label.numpy()
  for index in range(label.size(dim=0)):
    if perturbed_prediction_arr[index] == label_arr[index]:
        correct_predictions = correct_predictions + 1

accuracy = correct_predictions / (len(testloader)*64) # multiplying by 64 due to batch size
print("accuracy after CW attack is {} or {}%".format(accuracy, accuracy*100))


In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)


train_valloader = DataLoader(FASHION_trainval, batch_size=64, shuffle=True)

In [None]:
adv_train_cw = []
for index, (batch, label) in enumerate(train_valloader):
  batch = batch.to(device)
  label = label.to(device)
  if index == 8:
    break
  batch_adv = cw_attack(model, device, batch, label)
  for index, pgd_image in enumerate(batch):
    adv_train_cw.append((pgd_image, label_arr[index]))

In [None]:
!rm -rf Fashion* 
FASHION_trainval = datasets.FashionMNIST('.', download=True, train=True, transform=FASHION_transform)
FASHION_test = datasets.FashionMNIST('.', download=True, train=False, transform=FASHION_transform)


FASHION_trainval_cw = torch.utils.data.ConcatDataset([FASHION_trainval, adv_train_cw])
train_len = int(0.8*len(FASHION_trainval_cw))
FASHION_train_cw = Subset(FASHION_trainval_cw, range(train_len))
FASHION_val_cw = Subset(FASHION_trainval_cw, range(train_len, len(FASHION_trainval_cw)))

trainloader_cw = DataLoader(FASHION_train_cw, batch_size=64, shuffle=True)
valloader_cw = DataLoader(FASHION_val_cw, batch_size=64, shuffle=True)


model_cw = Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer
optimizer = optim.Adam(model_pgd.parameters(), lr=2*1e-3, weight_decay=7*1e-4)

len(FASHION_trainval_cw)

In [None]:
train(model_cw, trainloader_cw, 10, "model_cw", valloader_cw)