<a href="https://colab.research.google.com/github/zubejda/Advanced_DL/blob/main/Light_ViT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![alternatvie text](https://www.doc.zuv.fau.de//M/FAU-Logo/01_FAU_Kernmarke/Web/FAU_Kernmarke_Q_RGB_blue.svg)


In [1]:
!pip install torchvision

Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch==2.3.0->torchvision)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch==2.3.0->torchvision)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch==2.3.0->torchvision)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch==2.3.0->torchvision)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch==2.3.0->torchvision)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch==2.3.0->torchvision)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl (121.6 MB)
Collecting nvidia-curand-cu12=

In [2]:
!pip install comet_ml > /dev/null 2>&1

# Assignment 2: Visual Transformers
<center><img src="https://production-media.paperswithcode.com/methods/Screen_Shot_2021-01-26_at_9.43.31_PM_uI4jjMq.png" alt="Alternative text"/></center>
<center><figcaption>Fig 1. Dosovitskiy, Alexey, et al. "An image is worth 16x16 words: Transformers for image recognition at scale."https://arxiv.org/pdf/2010.11929.pdf. </figcaption></center>                 


In the lecture, transformers have been studied in the context of sequence-to-sequence modelling applications like natural language processing (NLP). Their superior performance to LSTM-based Recurrant neural network gained them a powerful reputation, thanks to their ability to model long sequences. A couple of years ago, transformers have been adapted to the [visual domain](https://arxiv.org/abs/2010.11929) and suprisingly demonstrated better performance compared to the long standing convolutional neural networks conditioned to large-scale datasets. Thanks to their ability to capture global semantic relationships in an image, unlike, CNNs which capture local information within the vicinty of the convolutional kernel window.

In this assignment, you'll be asked first to implement the building blocks of visual transformers (LightViT). Afterwards, you'll train them on classification task using MNIST and Fashion-MNIST datasets.


In [3]:
import os
import comet_ml
from getpass import getpass
import torch.nn as nn
import torch, math
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from torchvision.transforms.functional import resize

api_key = getpass('Enter your Comet API key: ')

os.environ['COMET_API_KEY'] = api_key

# Retrieve the API key from the environment variable
COMET_API_KEY = os.getenv('COMET_API_KEY')

Enter your Comet API key: ··········


In [4]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Assuming that we are on a CUDA machine, this should print a CUDA device:

print(device)

cuda:0


In [5]:
class LightViT(nn.Module):
    def __init__(self, image_dim, n_patches=7, n_blocks=2, d=8, n_heads=2, num_classes=10):
        super(LightViT, self).__init__()

        ## Class Members
        self.image_dim = image_dim
        self.n_patches = n_patches
        self.n_blocks = n_blocks
        self.d = d
        self.n_heads = n_heads
        self.num_classes = num_classes

        ## 1B) Linear Mapping
        self.patch_size = self.image_dim // self.n_patches
        self.linear_map = nn.Linear(self.patch_size**2, self.d)

        ## 2A) Learnable Parameter
        self.cls_token = nn.Parameter(torch.randn(1, 1, d));

        ## 2B) Positional embedding
        self.pos_embed = nn.Parameter(self.generate_positional_encoding(n_patches * n_patches + 1, d), requires_grad=False)

        ## 3) Encoder blocks
        self.encoder_blocks = ViTEncoder(self.d, self.n_heads)

        # 5) Classification Head
        self.classifier = nn.Linear(d, num_classes)

    def forward(self, images):
        b = images.shape[0]

        ## Extract patches
        pat = self.patches(images, p=self.n_patches)

        ## Linear mapping
        lin_out = self.linear_map(pat)

        ## Add classification token
        add_token = self.cls_token.expand(b, -1, -1)  # Expand to batch size
        token_out = torch.cat((add_token, lin_out), dim=1)

        ## Add positional embeddings
        pos_emb = token_out + self.pos_embed[:, :token_out.size(1), :]
        # pos_emb = self.get_pos_embeddings(token_out)

        ## Pass through encoder
        encoder_out = self.encoder_blocks(pos_emb)

        ## Get classification token

        ## Pass through classifier
        res = self.classifier(encoder_out[:, 0])

        return res

    def patches(self, x, p=7):
        b, c, h, w = x.shape
        x = x.unfold(2, p, p).unfold(3, p, p)  # Unfold to patches
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()  # Rearrange dimensions
        x = x.view(b, c * p * p, -1)  # Flatten each patch
        return x

    def get_pos_embeddings(self, x):
        pe = torch.zeros(self.n_patches, self.d)
        position = torch.arange(0, self.n_patches, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d, 2).float() * (-torch.log(torch.tensor(10000.0)) / self.d))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        att = nn.Embedding(x.numel(), x.shape[1])
        ll = att(x)
        print(ll.shape)

        pe = pe.unsqueeze(0)  # Add a batch dimension
        print("embeddings:", pe.shape)
        print("image:", x.shape)
        pe = x + pe[:, :x.size(1)]
        return pe

    def generate_positional_encoding(self, length, d):
        pos_encoding = torch.zeros(length, d)
        for pos in range(length):
            for i in range(0, d, 2):
                pos_encoding[pos, i] = math.sin(pos / (10000 ** (i / d)))
                pos_encoding[pos, i + 1] = math.cos(pos / (10000 ** ((i + 1) / d)))
        return pos_encoding.unsqueeze(0)

## 1. Image Patches and Linear Mapping

### A) Image Patches
Transfomers were initially created to process sequential data. In case of images, a sequence can be created through extracting patches. To do so, a crop window should be used with a defined window height and width. The dimension of data is originally in the format of *(B,C,H,W)*, when transorfmed into patches and then flattened we get *(B, PxP, (HxC/P)x(WxC/P))*, where *B* is the batch size and *PxP* is total number of patches in an image. In this example, you can set P=7.


*Output*: A function that extracts image patches. The output format should have a shape of (B,49,16). The function will be used inside *LightViT* class.

In [6]:
def patches(x, p=7):
    b, c, h, w = x.shape
    x = x.unfold(2, p, p).unfold(3, p, p)  # Unfold to patches
    x = x.permute(0, 2, 3, 1, 4, 5).contiguous()  # Rearrange dimensions
    x = x.view(b, c * p * p, -1)  # Flatten each patch
    return x

### B) Linear Mapping

Afterwards, the input are mapped using a linear layer to an output with dimension *d* i.e. *(B, PxP, (HxC/P)x(WxC/P))* &rarr; *(B, PxP, d)*. The variable d can be freely chosen, however, we set here to 8.

*Output*: A linear layer should be added inside *LightViT* class with the correct input and output dimensions, the output from the linear layer should have a dimension of (B,49,8).

## 2. Insert Classifier Token and Positional embeddings

### A) Classifier Token

Beside the image patches, also known as tokens, an additional special token is appended to the the input to capture desired information about other tokens to learn the task at hand. Lateron, this token will be used as input to the classifier to determine the class of the input image. To add the token to the input is equivilant to concatentating a learnable parameter with a vector of the same dimension *d* to the image tokens.

*Output* A randomly initialised learnable parameter to be implemented inside *LightViT* class. You can use [PyTorch built-in function](https://pytorch.org/docs/stable/generated/torch.nn.parameter.Parameter.html) to create a PyTorch parameter.

### B) Positional Embedding

To preserve the context of an image, positional embeddings are associated with each image patch. Positional embeddings encodes the patch positions using sinusoidal waves, however, there are other techniques. We follow the definition of positional encoding in the original transformer paper of [Vaswani et. al](https://arxiv.org/abs/1706.03762), which sinusoidal waves. You'll be required to implement a function that creates embeddings for each coordinate of every image patch.

*Output* Inside *LightViT* class, implement a function that fetches the embedding and encapuslate it inside a non-learnable parameter.

In [7]:
def get_pos_embeddings(self, x):
    pe = torch.zeros(self.n_patches, self.d)
    position = torch.arange(0, self.n_patches, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, self.d, 2).float() * (-torch.log(torch.tensor(10000.0)) / self.d))

    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)

    pe = pe.unsqueeze(0)  # Add a batch dimension
    pe = x + pe[:, :x.size(1)]
    return pe

## 3. Encoder Block

<center><img src="https://machinelearningmastery.com/wp-content/uploads/2021/08/attention_research_1.png" alt="Alternative text" width="400" height="500"/></center>
<center><figcaption>Fig 2. Transformer Encoder."https://arxiv.org/pdf/2010.11929.pdf. </figcaption></center>  

This is the challenging part of the assignment as it will be required from you to implement the main elements of an encoder block. A single block contains layer normalization (LN), multi-head self-attention (MHSA), and a residual connection.  

### A) Layer Normalization
[Layer normailzation](https://arxiv.org/abs/1607.06450), similar to other techniques, normalizes an input across the layer dimension by subtracting mean and dividing by standard deviation. You can instantiate layer normalization which has a dimension *d* using [PyTorch built-in function](https://pytorch.org/docs/stable/generated/torch.nn.LayerNorm.html).
### B) MHSA
<center><img src="https://production-media.paperswithcode.com/methods/multi-head-attention_l1A3G7a.png" alt="Alternative text" width="300" height="400"/></center>
<center><figcaption>Fig 2. Multi-Head Self Attention."https://arxiv.org/pdf/1706.03762v5.pdf. </figcaption></center>  
  
 The attention module derives an attention value by measuring similarity between one patch and the other patches. To this end, an image patch with dimension *d* is linearly mapped to three vectors; query **q**, key **k**, and value **v** , hence a distint linear layer should be instantiated to get each of the three vectors. To quantify attention for a single patch, first, the dot product is computed between its **q** and all of the **k** vectors and divide by the square root of the vector dimension i.e. *d* = 8. The result is passed through a softmax layer to get *attention features* and finally multiple with **v** vectors associated with each of the **k** vectors and sum up to get the result. This allows to get an attention vector for each patch by measuring its similarity with other patches.

  Note that this process should be repeated **N** times on each of the **H** sub-vectors of the 8-dimensional patch, where **N** is the total number of attention blocks. In our case, let **N** = 2, hence, we have 2 sub-vectors, each of length 4. The first sub-vector is processed by the first head and the second sub-vector is process by the second head, each head has distinct Q,K, and V mapping functions of size 4x4.

 For more information about MHSA, you may refer to this [post](https://data-science-blog.com/blog/2021/04/07/multi-head-attention-mechanism/).

 It is highly recommended to define a seperate class for MHSA as it contains several operations.



In [8]:
class MHSA(nn.Module):
    def __init__(self, d, n_heads=2): # d: dimension of embedding spacr, n_head: dimension of attention heads
        super(MHSA, self).__init__()
        self.d = d
        self.n_heads = n_heads
        self.token_dim = d // n_heads
        self.scale = self.token_dim ** -0.5
        self.q_linear = nn.Linear(self.d, self.d)
        self.k_linear = nn.Linear(self.d, self.d)
        self.v_linear = nn.Linear(self.d, self.d)
        self.out = nn.Linear(d, d)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, sequences):
        # Sequences has shape (N, seq_length, token_dim)
        query = self.q_linear(sequences)
        key = self.k_linear(sequences)
        value = self.v_linear(sequences)
        matrix = torch.matmul(query, key.transpose(-2, -1)) * self.scale
        soft = self.softmax(matrix)
        context = torch.matmul(soft, value)
        out = self.out(context)
        return out

        # Shape is transformed to   (N, seq_length, n_heads, token_dim / n_heads)
        # And finally we return back    (N, seq_length, item_dim)  (through concatenation)

### C) Residual Connection

Residual connections (also know as skip connections) add the original input to the processed output by a network layer e.g. encoder. They have proven to be useful in deep neural networks as they mitigate problems like exploding / vanishing gradients. In transformer, the residual connection is adding the original input to the output from LN &rarr; MHSA. All of the previous operations could be implemented inside a seperate encoder class.

The last part of an encoder, is to a inser another residual connection between the input to the encoder and the output from the encoder passed through another layer of LN &rarr; MLP. The MLP consists of 2 layers with hidden size 4 times larger than *d*.

*output*: The output from a single encoder block should have the same dimension as input.

In [9]:
class ViTEncoder(nn.Module):
    def __init__(self, hidden_d, n_heads):
        super(ViTEncoder, self).__init__()
        self.hidden_d = hidden_d
        self.n_heads = n_heads

        self.norm1 = nn.LayerNorm(self.hidden_d) # Add Layer-Norm
        self.mhsa = MHSA(hidden_d, n_heads)
        self.norm2 = nn.LayerNorm(self.hidden_d) # Add another Layer-Norm
        self.mlp = nn.Sequential(
            nn.Linear(hidden_d, 4 * hidden_d),
            nn.GELU(),
            nn.Linear(4 * hidden_d, hidden_d)
        )


    def forward(self, x):
        out = x + self.mhsa(self.norm1(x)) #Residual connection here
        out = out + self.mlp(self.norm2(out))
        return out

### C) Test Encoder
It is highly recommended to test the encoder with a tensor of random values as input.

In [10]:
if __name__ == '__main__':
  model = ViTEncoder(hidden_d=8, n_heads=2)

  x = torch.randn(7, 50, 8)
  print(model(x).shape)

torch.Size([7, 50, 8])


## 4. Classification Head

The final part of implemeting a transformer is adding a classification head to the model inside *LightViT* class. You can simply use a linear classifier i.e. a linear layer that accepts input of dimension *d* and outputs logits with dimension set to the number of classes for the classification problem at hand.

## 5a. Model Train for MNIST

At this point you have completed the major challenge of the assignment. Now all you need to do is to implement a standard script for training and testing the model. We recommend to use Adam optimizer with 0.005 learning rate and train for 5 epochs.

In [11]:
class PatchDataset(Dataset):
    def __init__(self, dataset, patch_size, transform=None):
        self.dataset = dataset
        self.patch_size = patch_size
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        if self.transform:
            image = self.transform(image)
        patches = self.create_patches(image)
        return patches, label

In [27]:
params = dict(
  num_training_iterations = 30,
  batch_size = 8,
  learning_rate = 0.0005,
  momentum=1e-4,
  image_dim = 28,
)

transform = transforms.Compose([
    transforms.ToTensor(),  # Convert PIL image to tensor
    transforms.Normalize((0.5,), (0.5,)),  # Normalize the image
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.GaussianBlur(kernel_size=(3,3), sigma=0.5),
])

train_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)

## Define Dataloader
train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False)

In [28]:
### Create a Comet experiment to track our training run ###

def create_experiment():
  # end any prior experiments
  if 'experiment' in locals():
    experiment.end()

  # initiate the comet experiment for tracking
  experiment = comet_ml.Experiment(
                  api_key=COMET_API_KEY,
                  project_name="ADL_LightViT",)
  # log our hyperparameters, defined above, to the experiment
  for param, value in params.items():
    experiment.log_parameter(param, value)
  experiment.flush()

  return experiment

In [31]:
%%time
## Define Model
model = LightViT(image_dim=params["image_dim"],)
if str(device) == 'cuda:0':
    model.to(device)

# Define Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=params["learning_rate"])

# Define Loss
criterion = nn.CrossEntropyLoss()

# Train
experiment = create_experiment()
i=0
for epoch in range(params["num_training_iterations"]):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        # Zero the parameter gradients
        optimizer.zero_grad()

        # Move data to device
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        output = model(images)
        loss_value = criterion(output, labels)  # Compute the loss

        # Backward pass and optimize
        loss_value.backward()
        optimizer.step()

        running_loss += loss_value.item()
        if i % 20 == 0:
            experiment.log_metric("loss", running_loss, step=i)
        i+=1

    print(f'Epoch [{epoch+1}/{params["num_training_iterations"]}], Loss: {running_loss/len(train_loader):.4f}', loss_value.item())

experiment.flush()

[1;38;5;39mCOMET INFO:[0m ---------------------------------------------------------------------------------------
[1;38;5;39mCOMET INFO:[0m Comet.ml Experiment Summary
[1;38;5;39mCOMET INFO:[0m ---------------------------------------------------------------------------------------
[1;38;5;39mCOMET INFO:[0m   Data:
[1;38;5;39mCOMET INFO:[0m     display_summary_level : 1
[1;38;5;39mCOMET INFO:[0m     name                  : careful_detail_5010
[1;38;5;39mCOMET INFO:[0m     url                   : https://www.comet.com/zubejda/adl-lightvit/9ec49a4293384d149f9bbe83838668f0
[1;38;5;39mCOMET INFO:[0m   Metrics [count] (min, max):
[1;38;5;39mCOMET INFO:[0m     loss [774] : (0.5270895957946777, 8420.09829121828)
[1;38;5;39mCOMET INFO:[0m   Parameters:
[1;38;5;39mCOMET INFO:[0m     batch_size              : 8
[1;38;5;39mCOMET INFO:[0m     image_dim               : 28
[1;38;5;39mCOMET INFO:[0m     learning_rate           : 0.0005
[1;38;5;39mCOMET INFO:[0m     momentum

Epoch [1/30], Loss: 1.4126 1.203905463218689
Epoch [2/30], Loss: 1.0500 0.7795031666755676


KeyboardInterrupt: 

In [32]:
## Test
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Test Accuracy: {100 * correct / total:.2f}%')

Test Accuracy: 64.72%


In [21]:
username = 'zubejda'
token = getpass('Enter your github api key: ')
repo = 'Advanced_DL'

# Clone the repository
!git clone https://{username}:{token}@github.com/{username}/{repo}.git
%cd /content/Advanced_DL
os.environ['COMET_GIT_DIRECTORY'] = os.getcwd()

Enter your github api key: ··········
Cloning into 'Advanced_DL'...
remote: Enumerating objects: 12, done.[K
remote: Counting objects: 100% (12/12), done.[K
remote: Compressing objects: 100% (8/8), done.[K
remote: Total 12 (delta 3), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (12/12), 14.07 KiB | 14.07 MiB/s, done.
Resolving deltas: 100% (3/3), done.
/content/Advanced_DL


## 5b. Model Training for FashionMNIST
For this task you may reuse the LightViT transformer that you already implemented before. Plot the accuracies for various hyperparameters of $0.01,0.001,0.0001$ and select the besst performing model.