# Import Libraries

In [1]:
from torch.utils.data import Dataset,DataLoader,random_split
from typing import Any,Tuple,Optional,Callable
import PIL
import csv
import pathlib
import torch
import torch.nn as nn
from torch.optim import Adam,lr_scheduler
from torchvision.transforms import ToTensor,Resize,Compose,ColorJitter,RandomRotation,AugMix,RandomCrop,GaussianBlur,RandomEqualize,RandomHorizontalFlip,RandomVerticalFlip
import matplotlib.pyplot as plt
import pickle
from sklearn.metrics import accuracy_score
import tqdm
import torch.nn.functional as F
import random

### Select device to use

In [2]:
device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')

## GTSRB Dataset class

In [3]:
class GTSRB(Dataset):
    def __init__(self,
                 root: str,
                 split: str,
                 transform: Optional[Callable] = None):
       
        
        
        self.base_folder = pathlib.Path(root)
        self.csv_file = self.base_folder / ('Train.csv' if split =='train' else 'Test.csv')
        
        
        with open(str(self.csv_file)) as csvfile:
           samples = [(str(self.base_folder / row['Path']),int(row['ClassId'])) 
            for row in csv.DictReader(csvfile,delimiter=',',skipinitialspace=True)
                ]


        self.samples = samples
        self.split = split
        self.transform = transform

    def __len__(self):
        # return 20
        return len(self.samples)
    
    def __getitem__(self, index: int) -> Tuple:
        path,classId =  self.samples[index]
        sample = PIL.Image.open(path).convert('RGB')
        if self.transform is not None:
            sample = self.transform(sample)
        return sample,classId
     

## Load pretrained Model

In [4]:
class GTSRB_MODEL(nn.Module):
    def __init__(self,input_dim,output_dim):
        super(GTSRB_MODEL,self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        
      
        self.metrics = {}
        
        self.flatten = nn.Flatten()
        
        self.dropout2 = nn.Dropout(0.2)
        self.dropout3 = nn.Dropout(0.3)
       

        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(2)
        

        self.conv1 = nn.Conv2d(in_channels=3,out_channels=32,kernel_size=3,padding=1)
        self.conv2 = nn.Conv2d(in_channels=32,out_channels=64,kernel_size=3,padding=1)
        self.batchnorm1 = nn.BatchNorm2d(64)

        self.conv3 = nn.Conv2d(in_channels=64,out_channels=128,kernel_size=3,padding=1)
        self.conv4 = nn.Conv2d(in_channels=128,out_channels=256,kernel_size=3,padding=1)
        self.batchnorm2 = nn.BatchNorm2d(256)


        
        self.conv5 = nn.Conv2d(in_channels=256,out_channels=512,kernel_size=3)
        self.conv6 = nn.Conv2d(in_channels=512,out_channels=1024,kernel_size=3)
        self.batchnorm3 = nn.BatchNorm2d(1024)
        
       
       

        self.l1 = nn.Linear(1024*4*4,512)
        self.l2 = nn.Linear(512,128)
        self.batchnorm4 = nn.LayerNorm(128)
        self.l3 = nn.Linear(128,output_dim)
        
        
    def forward(self,input):
        
        conv = self.conv1(input)
        conv = self.conv2(conv)
        batchnorm = self.relu(self.batchnorm1(conv))
        maxpool = self.maxpool(batchnorm)

        conv = self.conv3(maxpool)
        conv = self.conv4(conv)
        batchnorm = self.relu(self.batchnorm2(conv))
        maxpool = self.maxpool(batchnorm)

        conv = self.conv5(maxpool)
        conv = self.conv6(conv)
        batchnorm = self.relu(self.batchnorm3(conv))
        maxpool = self.maxpool(batchnorm)
        
        
        
        
       
        
        flatten = self.flatten(maxpool)
        
        dense_l1 = self.l1(flatten)
        dropout = self.dropout3(dense_l1)
        dense_l2 = self.l2(dropout)
        batchnorm = self.batchnorm4(dense_l2)
        dropout = self.dropout2(batchnorm)
        output = self.l3(dropout)
        
       
        return output
    
    def training_metrics(self,positives,data_size,loss):
        acc = positives/data_size
        return loss,acc
    
    def validation_metrics(self,validation_data,loss_function):
       data_size = len(validation_data)
       correct_predictions = 0
       total_samples = 0
       val_loss = 0

       model = self.eval()
       with torch.no_grad() : 
        for step,(input,label) in enumerate(validation_data):
            input,label = input.to(device),label.to(device)
            prediction = model.forward(input)
            loss = loss_function(prediction,label)
            val_loss = loss.item()
            _,predicted = torch.max(prediction,1)
            correct_predictions += (predicted == label).sum().item()
            total_samples += label.size(0)

       val_acc = correct_predictions/total_samples

       return val_loss,val_acc

    def history(self):
        return self.metrics

            


    def compile(self,train_data,validation_data,epochs,loss_function,optimizer,learning_rate_scheduler):
        val_acc_list = []
        val_loss_list = []

        train_acc_list = []
        train_loss_list = []

        learning_rate_list = []

        print('training started ...')
        STEPS = len(train_data)
        for epoch in range(epochs):
            lr = optimizer.param_groups[0]["lr"]
            learning_rate_list.append(lr)
            correct_predictions = 0
            total_examples = 0
            loss = 0
            with tqdm.trange(STEPS) as progress:

                for step,(input,label) in enumerate(train_loader):

                    input,label = input.to(device),label.to(device)
                    prediction = self.forward(input)

                    _, predicted = torch.max(prediction, 1)
                    correct_predictions += (predicted == label).sum().item()
                    total_examples += label.size(0)
                    l = loss_function(prediction,label)
                    loss = l.item()
                    l.backward()
                    optimizer.step()
                    optimizer.zero_grad()

                    progress.colour = 'green'
                    progress.desc = f'Epoch [{epoch}/{EPOCHS}], Step [{step}/{STEPS}], Learning Rate [{lr}], Loss [{"{:.4f}".format(l)}], Accuracy [{"{:.4f}".format(correct_predictions/total_examples)}]'
                    progress.update(1)

            training_loss,training_acc = self.training_metrics(correct_predictions,total_examples,loss)
            train_acc_list.append(training_acc)
            train_loss_list.append(training_loss)

            val_loss, val_acc = self.validation_metrics(validation_data,loss_function)
            val_acc_list.append(val_acc)
            val_loss_list.append(val_loss)
            
            print(f'val_accuracy [{val_acc}], val_loss [{val_loss}]')

            
            learning_rate_scheduler.step()
        
        metrics_dict = {
                'train_acc':train_acc_list,
                'train_loss':train_loss_list,
                'val_acc':val_acc_list,
                'val_loss':val_loss_list,
                'learning_rate':optimizer.param_groups[0]["lr"]
            }
        self.metrics = metrics_dict
        print('training complete !')    

        

In [5]:
f = open('gstrb-99-saved_model.pkl','rb')
model = pickle.load(f)
f.close()

model.to(device)
model.eval()

GTSRB_MODEL(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (dropout2): Dropout(p=0.2, inplace=False)
  (dropout3): Dropout(p=0.3, inplace=False)
  (relu): ReLU()
  (maxpool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (batchnorm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (batchnorm2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1))
  (conv6): Conv2d(512, 1024, kernel_size=(3, 3), stride=(1, 1))
  (batchnorm3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (l1): Linear(in_fe

## FGSM Attack Functions

In [6]:
def fgsm_attack_l2(image, epsilon, data_grad):
    # Collect the element-wise sign of the data gradient
    normalized_data_grad = data_grad / data_grad.norm()
    # Create the perturbed image by adjusting each pixel of the input image
    perturbed_image = image + epsilon*normalized_data_grad
    # Adding clipping to maintain [0,1] range
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image

## L2 FGSM

In [7]:
def generate_adversarial_dataset_fgsm_l2(model, device, test_dataloader, epsilon):
    adv_examples = []

    with tqdm.tqdm(colour='red',total=len(test_dataloader)) as progress:
        # Loop over all examples in test set
        for data, target in test_dataloader:
            # Send the data and label to the device
            data, target = data.to(device), target.to(device)
            # Set requires_grad attribute of tensor. Important for Attack
            data.requires_grad = True
            # Forward pass the data through the model
            output = model(data)
            init_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
            # If the initial prediction is wrong, don't bother attacking, just move on
            # if init_pred.item() != target.item():
            #     adv_examples.append((data.squeeze().detach().cpu(), target))
            #     progress.update(1)
            #     continue
            # Calculate the loss
            loss = F.nll_loss(output, target)
            # Zero all existing gradients
            model.zero_grad()
            # Calculate gradients of model in backward pass
            loss.backward()
            # Collect ``datagrad``
            data_grad = data.grad.data
            # Call FGSM Attack
            perturbed_data = fgsm_attack_l2(data, epsilon, data_grad)
            # Re-classify the perturbed image
            output = model(perturbed_data)
            adv_ex = perturbed_data.squeeze().detach().cpu()
            adv_examples.append((adv_ex, target.detach().cpu().item()))
            progress.desc = f'Progress: '
            progress.update(1)

    save_loc = f"./pkl2/L2_epsilon2_train_advdata.pkl"
    with open(save_loc,'wb') as output_file:
        pickle.dump(adv_examples,output_file)
    return adv_examples

transforms = Compose([
    Resize([50,50]),
    ToTensor(),
    
])
traindataset = GTSRB(root='dataset',split="train", transform=transforms)

train_dataloader = DataLoader(traindataset)

adv_data = generate_adversarial_dataset_fgsm_l2(model, device, train_dataloader, 2)

Progress: 100%|[31m███████████████████████████████████[0m| 39209/39209 [09:42<00:00, 67.26it/s][0m


In [8]:
import tqdm
EPOCHS = 10
LEARNING_RATE = 0.0008
INPUT_DIM = 3*50*50
OUTPUT_DIM = 43
model = GTSRB_MODEL(INPUT_DIM,OUTPUT_DIM).to(device)

optimizer = Adam(params=model.parameters(),lr=LEARNING_RATE)
lr_s = lr_scheduler.LinearLR(optimizer,start_factor=1.0,end_factor=0.5,total_iters=10)
loss = nn.CrossEntropyLoss()


In [9]:
def train_test_split(dataset,train_size):

    train_size = int(train_size * len(dataset))
    test_size = int(len(dataset) - train_size)
    return random_split(dataset,[train_size,test_size])

In [10]:
train_transforms = Compose([
    ColorJitter(brightness=1.0, contrast=0.5, saturation=1, hue=0.1),
    RandomEqualize(0.4),
    AugMix(),
    RandomHorizontalFlip(0.3),
    RandomVerticalFlip(0.3),
    GaussianBlur((3,3)),
    RandomRotation(30),

    Resize([50,50]),
    ToTensor(),
    
])
validation_transforms =  Compose([
    Resize([50,50]),
    ToTensor(),
    
])
dataset = GTSRB(root='dataset',split="train")
train_set,validation_set = train_test_split(dataset,train_size=0.8)
train_set.dataset.transform = train_transforms
validation_set.dataset.transform = validation_transforms
BATCH_SIZE = 64
train_loader = DataLoader(dataset=train_set,batch_size=BATCH_SIZE,shuffle=True)
validation_loader = DataLoader(dataset=validation_set,batch_size=BATCH_SIZE)

In [11]:
model.compile(train_data=train_loader,validation_data=validation_loader,epochs=EPOCHS,loss_function=loss,optimizer=optimizer,learning_rate_scheduler=lr_s)

training started ...


Epoch [0/10], Step [490/491], Learning Rate [0.0008], Loss [0.1597], Accuracy [0.6535]: 


val_accuracy [0.9817648559041061], val_loss [0.16771015524864197]


Epoch [1/10], Step [490/491], Learning Rate [0.00076], Loss [0.0213], Accuracy [0.9882]:


val_accuracy [0.9838051517470033], val_loss [0.05901744216680527]


Epoch [2/10], Step [490/491], Learning Rate [0.00072], Loss [0.0034], Accuracy [0.9975]:


val_accuracy [0.9803621525121142], val_loss [0.014535217545926571]


Epoch [3/10], Step [490/491], Learning Rate [0.00068], Loss [0.0032], Accuracy [0.9970]:


val_accuracy [0.9964294822749299], val_loss [0.0073509011417627335]


Epoch [4/10], Step [490/491], Learning Rate [0.00064], Loss [0.0005], Accuracy [0.9999]:


val_accuracy [0.998342259627646], val_loss [0.0011955213267356157]


Epoch [5/10], Step [490/491], Learning Rate [0.0006000000000000001], Loss [0.0014], Accu


val_accuracy [0.9987248150981892], val_loss [0.0005142829613760114]


Epoch [6/10], Step [490/491], Learning Rate [0.0005600000000000001], Loss [0.0001], Accu


val_accuracy [0.9988523335883703], val_loss [0.00031794901588000357]


Epoch [7/10], Step [490/491], Learning Rate [0.0005200000000000001], Loss [0.0001], Accu


val_accuracy [0.9991073705687324], val_loss [0.00024619593750685453]


Epoch [8/10], Step [490/491], Learning Rate [0.00048000000000000007], Loss [0.0001], Acc


val_accuracy [0.9991073705687324], val_loss [0.00019575665646698326]


Epoch [9/10], Step [490/491], Learning Rate [0.00044000000000000007], Loss [0.0001], Acc


val_accuracy [0.9992348890589136], val_loss [0.0001479537895647809]
training complete !


In [12]:
adv_data_file = open('./pkl2/L2_epsilon2_train_advdata.pkl','rb')
adv_data = pickle.load(adv_data_file)
adv_data_file.close()

In [13]:
class AdversarialDataset:
    def __init__(self,data):
        self.data = data
    def __len__(self):
        return len(self.data)
    def __getitem__(self,idx):
        return self.data[idx][0],self.data[idx][1]

In [14]:
train_adv_dataset = AdversarialDataset(adv_data[:31000])
val_adv_dataset = AdversarialDataset(adv_data[31000:])
train_loader = DataLoader(train_adv_dataset, batch_size = 64,shuffle=True)
val_loader = DataLoader(val_adv_dataset, batch_size = 64,shuffle = True)

In [15]:
model.compile(train_data=train_loader,validation_data=val_loader,epochs=EPOCHS,loss_function=loss,optimizer=optimizer,learning_rate_scheduler=lr_s)

training started ...


Epoch [0/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0429], Accu


val_accuracy [0.22316969180168109], val_loss [2.06939435005188]


Epoch [1/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0294], Accu


val_accuracy [0.18443172128152027], val_loss [3.6758553981781006]


Epoch [2/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0095], Accu


val_accuracy [0.17005725423315873], val_loss [3.152139186859131]


Epoch [3/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0106], Accu


val_accuracy [0.1381410646851017], val_loss [3.3734188079833984]


Epoch [4/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0002], Accu


val_accuracy [0.13643561944207577], val_loss [4.236685276031494]


Epoch [5/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0001], Accu


val_accuracy [0.129613838469972], val_loss [3.366760730743408]


Epoch [6/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0001], Accu


val_accuracy [0.12985747350468999], val_loss [4.53563928604126]


Epoch [7/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0001], Accu


val_accuracy [0.13034474357412595], val_loss [4.035635948181152]


Epoch [8/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0001], Accu


val_accuracy [0.13387745157753686], val_loss [2.862488269805908]


Epoch [9/10], Step [484/485], Learning Rate [0.0004000000000000001], Loss [0.0000], Accu


val_accuracy [0.1354610793032038], val_loss [4.397650241851807]
training complete !


In [16]:
with open('robust_model.pkl','wb') as outfile:
    pickle.dump(model.cpu(),outfile)

## Test accuracy on original dataset

In [17]:
transforms = Compose([
    Resize([50,50]),
    ToTensor(),
    
])

testdata = GTSRB(root='./dataset',split='test',transform=transforms)
print('testing size :',len(testdata))
test_dataloader = DataLoader(testdata)

y_pred = []
y_true = []
model = model.eval().to(device)
with tqdm.tqdm(colour='red',total=len(test_dataloader)) as progress:
  
  with torch.no_grad() : 
    for id,(input,label) in enumerate(iter(test_dataloader)):
        input,label = input.to(device),label.to(device)
        y_true.append(label.item())
        prediction = model.forward(input)
        _,prediction = torch.max(prediction,1)
        y_pred.append(prediction.item())
        
        progress.desc = f'Test Accuracy : {accuracy_score(y_true,y_pred)} '
        progress.update(1)

testing size : 12630


Test Accuracy : 0.8038796516231196 : 100%|[31m███████[0m| 12630/12630 [00:47<00:00, 267.11it/s][0m


### Dataset class for loading adversarial data

In [18]:
class GTSRB_adv(Dataset):
    def __init__(self,file):
        f = open(file,'rb')
        self.data = pickle.load(f)
        f.close()
    def __len__(self):
        return len(self.data)
    def __getitem__(self,idx):
        return self.data[idx][0],self.data[idx][1]

## Test accuracy on L2 adversarial dataset

In [19]:
testdata = GTSRB_adv('./pkl/2_generate_adversarial_dataset_fgsm_l2_{}.pkl')
print('testing size :',len(testdata))
test_dataloader = DataLoader(testdata)

y_pred = []
y_true = []
model = model.eval().to(device)
with tqdm.tqdm(colour='red',total=len(test_dataloader)) as progress:
  
  with torch.no_grad() : 
    for id,(input,label) in enumerate(iter(test_dataloader)):
        input,label = input.to(device),label.to(device)
        y_true.append(label.item())
        prediction = model.forward(input)
        _,prediction = torch.max(prediction,1)
        y_pred.append(prediction.item())
        
        progress.desc = f'Test Accuracy : {accuracy_score(y_true,y_pred)} '
        progress.update(1)

testing size : 12630


Test Accuracy : 0.7992874109263658 : 100%|[31m███████[0m| 12630/12630 [00:43<00:00, 290.63it/s][0m


## Test accuracy on small multistep adversarial dataset

In [20]:
testdata = GTSRB_adv("./pkl/2_generate_adversarial_dataset_fgsm_multistep_{'num_epochs': 20}.pkl")
print('testing size :',len(testdata))
test_dataloader = DataLoader(testdata)

y_pred = []
y_true = []
model = model.eval().to(device)
with tqdm.tqdm(colour='red',total=len(test_dataloader)) as progress:
  
  with torch.no_grad() : 
    for id,(input,label) in enumerate(iter(test_dataloader)):
        input,label = input.to(device),label.to(device)
        y_true.append(label.item())
        prediction = model.forward(input)
        _,prediction = torch.max(prediction,1)
        y_pred.append(prediction.item())
        
        progress.desc = f'Test Accuracy : {accuracy_score(y_true,y_pred)} '
        progress.update(1)

testing size : 12630


Test Accuracy : 0.7904196357878068 : 100%|[31m███████[0m| 12630/12630 [00:42<00:00, 294.13it/s][0m
