# German Traffic Sign Recognition Benchmark (GTSRB) Classifier

We will use the [German Traffic Sign Recognition Benchmark (GTSRB)](http://benchmark.ini.rub.de/?section=gtsrb&subsection=dataset) dataset to train a classifier to recognize traffic signs. 

GTSRB is a multi-class, single-image classification challenge. Our goal is to correctly identify traffic signs from their dataset containing more than 50,000 images of 43 classes.

## Fast Gradient Sign Method (FGSM) Attack

This notebook will introduce you to the Fast Gradient Sign Method (FGSM) attack and help you understand how to implement it. \

*Please make a copy of this notebook before you get started so that your work can be saved!!*

## Setup

In [None]:
# py(torch): our machine learning library!
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import Dataset
# Pillow: Python Image Library
from PIL import Image
# pandas: data analysis library
import pandas as pd

import os

PyTorch uses CPU by default. The following cell checks whether you have GPU available and if so, uses GPU. \
***For people doing local setup** If you want to use GPU, you need to install CUDA and CuDNN from NVIDIA and use the following cell to set the device to GPU.*

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")  # check if we are on CPU or GPU

### Mount drive

uncomment the following lines to mount drive if you need to

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

## Get the training and test data

We will download the training and test data from the GTSRB website. 

The training data is a zip file that contains 43 subdirectories (one for each class) containing the images. \
The test data contains a single directory with the images, and it is for some reason unlabeled, so we will need to download the label contained in a csv file. 

We will use the `wget` command to download the data and the `unzip` command to unzip the data.

In [None]:
!pip install wget  # wget: for downloading model weights
import wget

# train data
train_data_url = "https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Training_Images.zip"
wget.download(train_data_url)

# test data
test_data_url = "https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Test_Images.zip"
wget.download(test_data_url)

# the test data comes unlabeled so we need to download the labels as well
test_label_url = "https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Test_GT.zip"
wget.download(test_label_url)


In [None]:
!unzip -u "/content/GTSRB_Final_Training_Images.zip"

In [None]:
!unzip -u "/content/GTSRB_Final_Test_Images.zip"

In [None]:
!unzip -u "/content/GTSRB_Final_Test_GT.zip" -d "/content/GTSRB/Final_Test/Images"

### Load Training Data

You may notice that the images are in .ppm format, which you're probably not familiar with. But don't worry! Using `PIL`, the Python Image Library, we can process them just fine.

We need to first **resize** the images because they come in dimensions that vary between 15x15 to 250x250 pixels. AND they are not necessarily square. We will resize them to 224x224 pixels because thats what's commonly used in image classification tasks. 

We will also **normalize** the images so that the pixel values are between 0 and 1, which is easier for our model to process.

`Dataloader` is a class that helps us load the data in batches. We will use it to load the training data.

In [None]:
TRAIN_PATH = "GTSRB/Final_Training/Images"

# normalize images in the dataset
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# we create a dataset object with the images in the training folder, and with the dataset we create a dataloader
train_dataset = datasets.ImageFolder(TRAIN_PATH, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)


### Load test data

We need a separate `CustomTestDataset` function to load the test data because it is not in the same format as the training data where the images are in separate folders according to their label.

The test data is in a single folder and the labels are in a separate csv file. We will use the `csv` library to read the csv file and the `PIL` library to process the images.

In [None]:
TEST_PATH = "GTSRB/Final_Test/Images/"
CSV_PATH = "GTSRB/Final_Test/Images/GT-final_test.csv"
class CustomTestDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        # read csv file, seperator is ';'
        self.dataframe = pd.read_csv(csv_file, sep=';')
        # look at columns of dataframe
        print(self.dataframe.columns)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.dataframe.iloc[idx, 0])
        image = Image.open(img_name)
        label = int(self.dataframe.iloc[idx, -1])

        if self.transform:
            image = self.transform(image)

        return image, label

# normalize images in the dataset
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_dataset = CustomTestDataset(csv_file=CSV_PATH,
                                 img_dir=TEST_PATH,
                                 transform=test_transform)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

## Training and Testing

This is your typical train and test functions.

In [None]:
def train(model, train_loader, criterion, optimizer, device, epochs=10, print_every=200):
    model.train()
    total_loss = 0.0
    for epoch in range(epochs):
        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            inputs = data[0].to(device)
            labels = data[1].to(device)  # Move data to GPU
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            total_loss += loss.item()
        if epoch % print_every == print_every - 1:
            print(f"[{epoch + 1}, {i + 1}] loss: {running_loss / print_every:.3f}")
            running_loss = 0.0
    return total_loss / len(train_loader)

In [None]:
def evaluate(model, test_loader):
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)  # Move data to GPU
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

## Define Model Architecture

*Skip to the next section if you want to use our pretrained model*

We are using **transfer learning** to train our model. For our base model, we use a pretrained ResNet18 model and replace the last layer with a fully connected layer with 43 outputs (one for each class).

### ResNet18

ResNet18 is a convolutional neural network that is 18 layers deep. It is trained on the ImageNet dataset which contains 1.2 million images with 1000 classes. The original ResNet18 model has 1000 outputs, but we only have 43 classes, hence we need to replace the last layer with a fully connected layer with 43 outputs

In [None]:
# We use a pretrained ResNet18 model and finetune it for our task
model = torchvision.models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 43)  # GTSRB has 43 classes so we need to change the last layer
model = model.to(device) # move model to GPU if available

### Training and Testing Loop

We will use the `CrossEntropyLoss` function to calculate the loss and `SGD` (Stochastic Gradient Descent) optimizer to optimize the model.

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

train(model, train_loader, criterion, optimizer, device, epochs=10, print_every=200)

In [None]:
evaluate(model, test_loader)

## Save model and weights

We are on a runtime, meaning that when we close the notebook, we will lose all are variables, including our model. 
So to make sure you don't lose your work, we will save the model and the weights. You typically only need one of them, but we will save both just in case.

***Colab users look here:** you will need to download the model and weights manually from the files tab on the left.*

In [None]:
# save the weights
torch.save(model.state_dict(), 'initial_model_weights.pth')

In [None]:
# save the entire model
torch.save(model, 'initial_model.pth')

## Loading pretrained model

The pretrained model is a model finetuned from the ResNet18 model with the last layer replaced with a linear layer with 43 outputs.
Its trained on the GTSRB dataset and achieves a validation accuracy of 98.8%.

We will use `wget` to download the model and weights from our github repository.

In [None]:
!pip install wget  # wget: for downloading model weights
import wget

# I have pre-trained a CNN model (with the architecture above) and saved the weights to the below
# GitHub link, and we can load it using wget
model_file = wget.download("https://github.com/LiptonJumboTeaBag/GTSRB-Classifier/blob/0e4ee766e25d2e787aa1382f08d4e1b710da91f9/initial_model.pth?raw=true")

Now we can load the model and weights using `torch.load()` and `model.load_state_dict()`.

`torch.load()` loads the model and weights into a dictionary. \
`model.load_state_dict()` only loads the weights into the model, so you would need to predefine the model.

In [None]:
model = torch.load(model_file)
print(model)

### Sanity check

We will do a sanity check to make sure the model is working as expected.
Feel free to skip this section if you want to get to the attack or if you are on a CPU as it might take a while to run.

In [None]:
print(f'Accuracy: {100*(evaluate(model=model, test_loader=test_loader)):.2f}%')

## Visualizing our dataset

To get a better understanding of our dataset, we will visualize it using the `matplotlib` library.

The following code will display 1 image from each class in our dataset.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Initialize a dictionary to hold one sample per class
class_samples = {}

# Function to unnormalize and convert tensor to numpy for plotting
def imshow(img_tensor):
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    # unnormalize
    for t, m, s in zip(img_tensor, mean, std):
        t.mul_(s).add_(m)

    npimg = img_tensor.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))

# Collect one sample per class from the training set
for images, labels in train_loader:
    for i in range(len(labels)):
        label = labels[i].item()
        if label not in class_samples:
            class_samples[label] = images[i]
    if len(class_samples) == 43:  # Assuming there are 43 classes in GTSRB
        break

# sort the dictionary by keys
class_samples = dict(sorted(class_samples.items()))

# Plot one image from each class
fig = plt.figure(figsize=(15, 15))
for i, (label, img) in enumerate(class_samples.items()):
    ax = fig.add_subplot(7, 7, i + 1)  # Assuming a grid size that can fit all classes
    ax.set_title(f"Class {label}")
    ax.axis("off")

    # process the image and show it
    imshow(img)
plt.show()

# FGSM Attack!

Fast Gradient Sign Method (FGSM) is a white box attack that uses the gradients of the loss function to create an adversarial example. FGSM is a fast way to generate adversarial examples but it is not very strong.

### FGSM Implementation

We will implement FGSM using the following formula: \
`adversarial_image = original_image + epsilon * sign(gradient)`
where epsilon is a hyperparameter that controls the magnitude of the perturbation.

Learn more about FGSM [here](https://www.tensorflow.org/tutorials/generative/adversarial_fgsm#:~:text=The%20fast%20gradient%20sign%20method%20works%20by%20using%20the%20gradients,is%20called%20the%20adversarial%20image.).

In [None]:
import torch.nn.functional as F

def adversarial_attack(image, model, epsilon):
    # TODO: 1. Get the gradients of the loss w.r.t the input image
    pass

    # TODO: 2. Add the gradients to the input image
    pass


## Run fgsm attack on random image from test set

We will run the fgsm attack on a random image from the test set and see how our model performs on the adversarial image.

In [None]:
import random

model.eval()  # Set model to evaluation mode

# Load a random image from the test loader
test_iter = iter(test_loader)
images, labels = next(test_iter)
idx = random.randint(0, len(labels) - 1)
image, label = images[idx:idx+1], labels[idx:idx+1]
image, label = image.to(device), label.to(device)

# Print the ground truth label
print(f"Ground truth class label: {label.item()}")

# TODO: Experiment with different epsilon values
epsilons = [0, 0.1, 0.5]
ep_count = len(epsilons)

# Display original and perturbed images
PLOT_WIDTH = ep_count//2 + 1
PLOT_HEIGHT = 2

plt.figure(figsize=(PLOT_WIDTH*5, PLOT_HEIGHT*5))

# Run FGSM attack with different epsilon values
for i, eps in enumerate(epsilons):
    # Generate perturbed image
    perturbed_image = adversarial_attack(image, model, eps)
    # Classify the perturbed image
    perturbed_output = model(perturbed_image)
    _, perturbed_pred = perturbed_output.max(1)

    # Get confidence of the perturbed image using softmax
    confidence = F.softmax(perturbed_output, dim=1)[0, perturbed_pred].item()


    # Convert tensors to NumPy arrays for visualization
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])

    # we detach the image from the graph and move it to the CPU
    perturbed_image = perturbed_image.detach()

    # unnormalize
    for t, m, s in zip( perturbed_image, mean, std):
        t.mul_(s).add_(m)

    perturbed_image = perturbed_image.squeeze().cpu().numpy() # remove batch dimension and move to CPU

    perturbed_image = perturbed_image.clip(0,1).transpose((1, 2, 0)) # clip to [0,1] and move channel to last dimension

    plt.subplot(PLOT_HEIGHT, PLOT_WIDTH, i + 1)
    plt.title(f"Eps {eps}: Classified as {perturbed_pred.item()} with {confidence:.2f} confidence")
    plt.imshow(perturbed_image)
    plt.axis('off')

plt.show()


## Run fgsm attack on YOUR image!!!

Here you can upload your own image and run the fgsm attack on it!
Replace the `image path` with your own image path and run the cell.

In [None]:
# Load and preprocess the image
image_path = "PATH_TO_UR_IMAGE.smth"  # Replace with the path to your image

# To ensure the image is loaded correctly, we use the same preprocessing as we used for the training data
input_image = Image.open(image_path)
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
input_tensor = preprocess(input_image)
input_batch = input_tensor.unsqueeze(0)  # Create a mini-batch as expected by the model
image = input_batch.to(device)

epsilons = [0.1]
ep_count = len(epsilons)

# Display original and perturbed images
PLOT_WIDTH = ep_count//2 + 1
PLOT_HEIGHT = 2

plt.figure(figsize=(PLOT_WIDTH*5, PLOT_HEIGHT*5))

# Run FGSM attack with different epsilon values
for i, eps in enumerate(epsilons):
    # Generate perturbed image
    perturbed_image = adversarial_attack(image, model, eps)
    # Classify the perturbed image
    perturbed_output = model(perturbed_image)
    _, perturbed_pred = perturbed_output.max(1)
    confidence = F.softmax(perturbed_output, dim=1)[0, perturbed_pred].item()


    # Convert tensors to NumPy arrays for visualization
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])

    perturbed_image = perturbed_image.detach()


    for t, m, s in zip( perturbed_image, mean, std):
        t.mul_(s).add_(m)

    perturbed_image = perturbed_image.squeeze().cpu().numpy()


    perturbed_image = perturbed_image.clip(0,1).transpose((1, 2, 0))

    plt.subplot(PLOT_HEIGHT, PLOT_WIDTH, i + 1)
    plt.title(f"Eps {eps}: Classified as {perturbed_pred.item()} with {confidence:.2f} confidence")
    plt.imshow(perturbed_image)
    plt.axis('off')

plt.show()
