# Boilerplate

Package installation, loading, and dataloaders. There's also a simple model defined. You can change it your favourite architecture if you want.

In [1]:
# !pip install tensorboardX

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import time
import matplotlib.pyplot as plt

import torchtext
import torchdata
from torchtext.vocab import GloVe
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer


from torchvision import datasets, transforms

use_cuda = False
device = torch.device("cuda" if use_cuda else "cpu")
batch_size = 32

np.random.seed(42)
torch.manual_seed(42)


class Net(nn.Module):
    def __init__(self, pretrained_embeddings):
        super(Net, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(pretrained_embeddings, freeze=True)

        self.fc = nn.Linear(50, 100)
        self.fc2 = nn.Linear(100, 2)

    def forward(self, x):
        x = self.embedding(x)
        x = x.mean(dim=1)
        x = self.fc(x)
        x = F.relu(x)
        x = self.fc2(x)
        return x

# class Normalize(nn.Module):
#     def forward(self, x):
#         return (x - 0.1307)/0.3081

# Add the data normalization as a first "layer" to the network
# this allows us to search for adverserial examples to the real image, rather than
# to the normalized image
# model = Net()#nn.Sequential(Normalize(), Net())

# model = model.to(device)
# model.train()

ModuleNotFoundError: No module named 'torchtext'

# Implement Training

In [1]:
def get_worst_case_logits(data, label, model, eps_test):
    lb = data - eps_test
    ub = data + eps_test
    
    lb = model.embedding(lb.long())
    ub = model.embedding(ub.long())

    lb = lb.mean(dim=1)
    ub = ub.mean(dim=1)

    #propogate through each layer based on linear or ReLU
    for layer in model.children():
        if isinstance(layer, nn.Embedding):
            continue
        lb = layer(lb)
        ub = layer(ub)

    dim = lb.shape[-1]
    true_classes = label.unsqueeze(-1) == torch.arange(dim).to(device)
    # print("true classes", true_classes)
    # print(true_classes.shape, lb.shape)
    logits = torch.where(true_classes, lb, ub)
    # print("logits", logits)
    return logits

In [2]:
def train_model_IBP(model, num_epochs, train_loader):
    #  trains a given model on the MNIST dataset.
    model.train()
    optimizer = optim.SGD(model.parameters(), lr=0.01)

    # setup eps_train and k steps for each epoch
    eps_test = 0 
    eps_test_step = .1 / num_epochs
    
    k = 1
    k_step = -0.5 / num_epochs


    for epoch in range(num_epochs):

        for data, label in train_loader:
            data = data.to(device)
            label = label.to(device)

            optimizer.zero_grad()
            out = model(data)
            z_hat = get_worst_case_logits(data, label, model, eps_test)

            #TODO edit loss function
            loss_fit = F.cross_entropy(out, label)
            loss_spec = F.cross_entropy(z_hat, label) 
            # print("loss_fit:", loss_fit, "loss_spec:", loss_spec)
            loss = (k*loss_fit) + ((1-k)*loss_spec)
            loss.backward()
            optimizer.step()
        
        eps_test += eps_test_step
        k += k_step

In [3]:
#Interval analysis
def interval_analysis(model, input, eps):

    lb = input - eps
    ub = input + eps

    lb = torch.clamp(lb, 0, 1)
    ub = torch.clamp(ub, 0, 1)
    
    lb_out = model(lb)
    ub_out = model(ub)

    return lb_out, ub_out

In [4]:
def test_model(model):
    # TODO: implement this function to test the robust accuracy of the given model
    # use pgd_untargeted() within this function

    model.eval()

    correct, total = 0, 0

    for data, label in test_loader:
        data = data.to(device)
        label = label.to(device)

        out = model(data)
        _, predicted = torch.max(out.data, 1)
        
        # print(label.size(0))
        # print(data.size(0))
        # print(out.size(0))
        total += label.size(0)
        # print(predicted)
        correct += (predicted == label).sum().item()

    print("accuracy", 100 * correct / total)
    

In [5]:
def test_robustness(model):

    model.eval()

    for eps in [0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1]:
        correct, total = 0, 0

        for data, label in test_loader:
            data = data.to(device)
            label = label.to(device)

            out_lb, out_ub = interval_analysis(model, data, eps)
            out_lb, out_ub = out_lb.argmax(dim=1), out_ub.argmax(dim=1) #choose class for each image
            

            total += label.size(0)
            correct += (out_lb == out_ub).sum().item()
        print("eps:", eps)
        print("percent robust", 100 * correct / total)
        print()
    
        

# Study Accuracy, Quality, etc.

Compare the various results and report your observations on the submission.

In [6]:
# The last argument 'targeted' can be used to toggle between a targeted and untargeted attack.
def fgsm(model, x, y, eps):
    #TODO: implement this as an intermediate step of PGD
    # Notes: put the model in eval() mode for this function
    model.eval()                   

    x.requires_grad = True

    #get gradient loss
    output = model(x)
    loss = F.cross_entropy(output,y)
    model.zero_grad()
    loss.backward()

    sign_x = x.grad.sign()

    #get eta with e * sign(loss grad) 
    n = eps * sign_x

    x_prime = x + n
    x_prime = torch.clamp(x_prime,0,1)

    return x_prime

def pgd_untargeted(model, x, y, k, eps, eps_step):
    #TODO: implement this 
    # Notes: put the model in eval() mode for this function
    # x: input image
    # y: ground truth label for x
    # k: steps of FGSM
    # eps: projection region for PGD (note the need for normalization before projection, as eps values are for inputs in [0,1])
    # eps_step: step for one iteration of FGSM
    model.eval()

    x_init = x.clone().detach()
    ball_max = x_init + eps
    ball_min = x_init - eps

    for _ in range(k):

        x.requires_grad = True
        x_new = fgsm(model, x, y, eps_step)

        x_new = torch.clamp(x_new, ball_min, ball_max)
        x_new = torch.clamp(x_new, 0, 1)

        x = x_new.detach()

    return x

In [7]:
def test_model_on_attacks(model, attack='pgd', k=10, eps=0.1):
    # TODO: implement this function to test the robust accuracy of the given model
    # use pgd_untargeted() within this function

    eps_step = eps/k
    model.eval()

    correct, correct_second, total = 0, 0, 0

    for data, label in test_loader:
        data = data.to(device)
        label = label.to(device)

        if attack == 'pgd':
            #TODO implement
            adversarial_data = pgd_untargeted(model, data, label, k, eps, eps_step)

            full_data = torch.cat((data, adversarial_data))
            label = torch.cat((label, label))
        else:
            full_data = data

        out = model(full_data)
        _, predicted = torch.max(out.data, 1)
        
        # print(label.size(0))
        # print(data.size(0))
        # print(out.size(0))
        total += label.size(0)
        # print(predicted)
        correct += (predicted[:data.size(0)] == label[:data.size(0)]).sum().item()
        if attack == 'pgd':
            correct_second += (predicted[data.size(0):] == label[:data.size(0)]).sum().item()

    if attack == 'pgd':
        print("for eps", eps)
        print("robust accuracy", 100 * (correct+correct_second) / total)
        print("standard accuracy", 100 * 2*correct / total)
        print("adversarial accuracy", 100 * 2*correct_second / total)
    else:
        print("accuracy", 100 * correct / total)
    

In [8]:
#Interval analysis
def interval_analysis(model, input, eps):

    lb = input - eps
    ub = input + eps

    lb = torch.clamp(lb, 0, 1)
    ub = torch.clamp(ub, 0, 1)
    
    lb_out = model(lb)
    ub_out = model(ub)

    return lb_out, ub_out

In [9]:
def test_robustness(model):

    model.eval()

    for eps in [0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1]:
        correct, total = 0, 0

        for data, label in test_loader:
            data = data.to(device)
            label = label.to(device)

            out_lb, out_ub = interval_analysis(model, data, eps)
            out_lb, out_ub = out_lb.argmax(dim=1), out_ub.argmax(dim=1) #choose class for each image
            

            total += label.size(0)
            correct += (out_lb == out_ub).sum().item()
        print("eps:", eps, "percent robust", 100 * correct / total)

    

In [11]:
#SETUP
from torchtext.vocab import vocab

tokenizer = get_tokenizer("basic_english")
counter = collections.Counter()

train_iter = IMDB(split='train')
for label, line in train_iter:
    counter.update(tokenizer(line))
counter['<unk>'] = 1
counter['<pad>'] = 1

vocab_obj = vocab(counter)
vocab_obj.set_default_index(vocab_obj['<unk>'])
glove = GloVe(name="6B", dim=50)

vocab_size = len(vocab_obj)
embedding_dim = glove.dim
pretrained_embeddings = torch.zeros(vocab_size, embedding_dim)
for i, token in enumerate(vocab_obj.get_itos()):
    if token in glove.stoi:
        pretrained_embeddings[i] = glove[token]

max_length = 256

class IMDBDataset(torch.utils.data.Dataset):
    def __init__(self, split):
        self.data = list(IMDB(split=split))
        self.vocab = vocab_obj
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        label, text = self.data[idx]
        tokens = self.tokenizer(text)
        token_ids = [self.vocab[token] for token in tokens[:max_length]]
        # Pad if shorter than max_length
        if len(token_ids) < max_length:
            token_ids.extend([self.vocab['<pad>']] * (max_length-len(token_ids)))
        label = 1 if label == 'pos' else 0
        return torch.tensor(token_ids, dtype=torch.long), torch.tensor(label, dtype=torch.long)

batch_size = 32
train_dataset = IMDBDataset(split='train')
test_dataset = IMDBDataset(split='test')
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


model = Net(pretrained_embeddings)
model = model.to(device)

start = time.time()
train_model_IBP(model, 15, train_loader)
end = time.time()
print("IBP train time: ", end - start)

ModuleNotFoundError: No module named 'torchtext'

In [7]:
# !pip install ruamel_yaml==0.11.14
!pip install torchtext
# import torch
# import torchtext
# !pip install torchdata
# import torchdata
# !conda update conda -y
# !pip install torchtext==0.9.0
# !pip install torch==1.13.1
# !pip show torchtext
# !pip install --force-reinstall --no-deps torch==2.2.1
# !pip install --upgrade torch


# !pip uninstall torch

Collecting torch==2.2.2 (from torchtext)
  Using cached torch-2.2.2-cp38-none-macosx_10_9_x86_64.whl.metadata (25 kB)
Using cached torch-2.2.2-cp38-none-macosx_10_9_x86_64.whl (150.6 MB)
[0mInstalling collected packages: torch
  Attempting uninstall: torch
[0m    Found existing installation: torch 2.2.1
[1;31merror[0m: [1muninstall-no-record-file[0m

[31m×[0m Cannot uninstall torch 2.2.1
[31m╰─>[0m The package's contents are unknown: no RECORD file was found for torch.

[1;36mhint[0m: You might be able to recover from this via: [32mpip install --force-reinstall --no-deps torch==2.2.1[0m


In [None]:
## train the original model
model = nn.Sequential(Normalize(), Net())
model = model.to(device)
model.train()

start = time.time()
train_model_IBP(model, 15)
end = time.time()
print("IBP train time: ", end - start)

torch.save(model.state_dict(), 'weights_IBP2.pt')

In [None]:
model = nn.Sequential(Normalize(), Net())
model = model.to(device)
model.train()

start = time.time()
train_model(model, 15)
end = time.time()
print("standard train time: ", end - start)


torch.save(model.state_dict(), 'weights.pt')

In [46]:



model = Net(pretrained_embeddings)
model.load_state_dict(torch.load('weights_IBP2.pt'))

test_model(model)

accuracy 89.91
accuracy 89.87


In [None]:
## robust test
model = nn.Sequential(Normalize(), Net())
model.load_state_dict(torch.load('weights.pt'))

for eps in [.05]:# [0.05, 0.1, 0.15, 0.2]:
    test_model_on_attacks(model, attack='pgd', k=10, eps=eps)

In [None]:
## robust test
model = nn.Sequential(Normalize(), Net())
model.load_state_dict(torch.load('weights_IBP.pt'))

for eps in [.05]:# [0.05, 0.1, 0.15, 0.2]:
    test_model_on_attacks(model, attack='pgd', k=10, eps=eps)

In [None]:
#robustness test
test_robustness(model)

    