<a href="https://colab.research.google.com/github/prabal5ghosh/UCA-M2-SEMESTER1/blob/main/deep%20learning/TP6_Transformer_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<center><h1>TP6: Classification with Transformers</h1></center>

# Warning :
# "File -> Save a copy in Drive" before starting to modify the notebook, otherwise changes won't be saved.

## Setup


Below, we import some standard libraries.

In [None]:
# Standard libraries
import math
import os
import urllib.request
from functools import partial
from urllib.error import HTTPError
from tqdm.notebook import tqdm
import random

# Plotting
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data


%matplotlib inline

# Path to the folder where the datasets are/should be downloaded (e.g. CIFAR10)
DATASET_PATH = "data/"
# Path to the folder where the pretrained models are saved
CHECKPOINT_PATH = "saved_models/"

# Set seed to ensure that all operations are deterministic for reproducibility
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)
torch.backends.cudnn.determinstic = True
torch.backends.cudnn.benchmark = False

device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print("Device:", device)

Remember the scaled dot product? Well, here is a refresher exercise!

In [None]:
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]

    #######################
    ### YOUR CODE HERE! ###
    #######################

    # Compute attn_logits
    attn_logits = None

    # Apply mask if not None
    if mask is not None:
        attn_logits = None

    # Pass through softmax
    attention = None

    # Weight values accordingly
    output_values = None

    #######################
    ###       END       ###
    #######################

    return output_values, attention

How did we deal with the tricky shapes of MultiheadAttention?

In [None]:
class MultiheadAttention(nn.Module):
    def __init__(self, input_dim, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be 0 modulo number of heads."

        self.embed_dim = embed_dim # dimension of concatenated heads
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.o_proj = nn.Linear(embed_dim, embed_dim)
        self.qkv_proj = nn.Linear(input_dim, embed_dim * 3)

        self._reset_parameters()

    def _reset_parameters(self):
        # Original Transformer initialization, see PyTorch documentation
        nn.init.xavier_uniform_(self.qkv_proj.weight)
        self.qkv_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.o_proj.weight)
        self.o_proj.bias.data.fill_(0)

    def forward(self, x, mask=None, return_attention=False):

        #######################
        ### YOUR CODE HERE! ###
        #######################

        batch_dim, seq_length, input_dim = x.shape

        # Compute linear projection for qkv and separate heads
        # QKV: [Batch, Head, SeqLen, Dims]
        qkv = None
        q, k, v = None


        # Apply Dot Product Attention to qkv ()
        attention_values, attention = None

        # Concatenate heads to [Batch, SeqLen, Embed Dim]
        attention_values = None

        # Output projection
        o = None

        #######################
        ###       END       ###
        #######################

        if return_attention:
            return o, attention
        else:
            return o

input_d = 3
seq_l = 4
embed_d = 4
n_heads = 2
b_size = 1

mh_att = MultiheadAttention(input_d, embed_d, n_heads)

x = torch.rand(b_size, seq_l, input_d)
x = torch.tensor([[[0.3360, 0.6676, 0.6393],
         [0.2083, 0.5484, 0.1204],
         [0.3533, 0.3038, 0.9383],
         [0.0499, 0.2048, 0.0107]]])
print(f"Input x: {x}")

att_output = mh_att(x)
print(f"MhA Output {att_output}")
assert att_output.shape == torch.Size([1, 4, 4]), "Error in computing multi-head attention"

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, input_dim, num_heads, dim_feedforward, dropout=0.0):
        """
        Args:
            input_dim: Dimensionality of the input
            num_heads: Number of heads to use in the attention block
            dim_feedforward: Dimensionality of the hidden layer in the MLP
            dropout: Dropout probability to use in the dropout layers
        """
        super().__init__()

        # Create Attention layer
        self.self_attn = MultiheadAttention(input_dim, input_dim, num_heads)

        # Create Two-layer MLP with droput
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, input_dim*2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(2*input_dim, input_dim)
        )
        # Layers to apply in between the main layers (Layer Norm and Dropout)
        self.norm = nn.Sequential(
            nn.LayerNorm(input_dim),
            nn.Dropout(dropout)
        )

    def forward(self, x, mask=None):
        # Compute Attention part
        attn=self.self_attn(x)
        x=self.norm(attn+x)

        # Compute MLP part
        x = self.norm(x+self.mlp(x))

        return x



In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, **block_args):
        super().__init__()
        self.layers = nn.ModuleList([EncoderBlock(**block_args) for _ in range(num_layers)])

    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask=mask)
        return x

    def get_attention_maps(self, x, mask=None):
        attention_maps = []
        for layer in self.layers:
            _, attn_map = layer.self_attn(x, mask=mask, return_attention=True)
            attention_maps.append(attn_map)
            x = layer(x)
        return attention_maps

Last time we used a pre-computed sine positional encoding, which is often used to this day.

This time we will see the other common positional encoding type: learned positional encodings! Initialize random embeddings for each possible position and make sure these embeddings are tracked by the model!

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=50):
        """
        Args
            d_model: Hidden dimensionality of the input.
            max_len: Maximum length of a sequence to expect.
        """
        super().__init__()

        #######################
        ### YOUR CODE HERE! ###
        #######################

        # Create random matrix of [1, SeqLen, HiddenDim] representing the positional encoding for max_len inputs
        self.pe = None

    def forward(self, x):
        x = None

        #######################
        ###       END       ###
        #######################

        return x

In [None]:
class TransformerPredictor(nn.Module):
    def __init__(
        self,
        input_dim,
        model_dim,
        num_classes,
        num_heads,
        num_layers,
        dropout=0.0,
        input_dropout=0.0,
    ):
        """
        Args:
            input_dim: Hidden dimensionality of the input
            model_dim: Hidden dimensionality to use inside the Transformer
            num_classes: Number of classes to predict per sequence element
            num_heads: Number of heads to use in the Multi-Head Attention blocks
            num_layers: Number of encoder blocks to use.
            lr: Learning rate in the optimizer
            warmup: Number of warmup steps. Usually between 50 and 500
            max_iters: Number of maximum iterations the model is trained for. This is needed for the CosineWarmup scheduler
            dropout: Dropout to apply inside the model
            input_dropout: Dropout to apply on the input features
        """
        super().__init__()
        self.input_dim = input_dim
        self.model_dim = model_dim
        self.num_classes = num_classes
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.dropout = dropout
        self.input_dropout = input_dropout

        # Create a Generic Input Encoder Input dim -> Model dim with input dropout
        self.input_net = nn.Sequential(
            nn.Linear(input_dim, model_dim),
            nn.Dropout(input_dropout)
        )

        # Create positional encoding for sequences
        self.positional_encoding = PositionalEncoding(model_dim)

        # Create transformer Encoder
        self.transformer = TransformerEncoder(num_layers, input_dim=model_dim, dim_feedforward=model_dim*2, num_heads=num_heads, dropout=dropout)

        # Create output classifier per sequence element Model_dim -> num_classes
        self.output_net = nn.Linear(model_dim, num_classes)

        #######################
        ### YOUR CODE HERE! ###
        #######################

        # Create classification token
        self.cls_token = None

    def forward(self, x, mask=None, add_positional_encoding=True):
        """
        Args:
            x: Input features of shape [Batch, SeqLen, input_dim]
            mask: Mask to apply on the attention outputs (optional)
            add_positional_encoding: If True, we add the positional encoding to the input.
                                      Might not be desired for some tasks.
        """
        x = self.input_net(x)

        # Add the cls token
        x = None

        if add_positional_encoding:
            x = self.positional_encoding(x)
        x = None

        # Get the output! Remember we only care about the classification token!
        x = None

        #######################
        ###       END       ###
        #######################

        return x

    @torch.no_grad()
    def get_attention_maps(self, x, mask=None, add_positional_encoding=True):
        """Function for extracting the attention matrices of the whole Transformer for a single batch.

        Input arguments same as the forward pass.
        """
        x = self.input_net(x)
        x = torch.cat([self.cls_token.expand(x.shape[0],-1,-1), x], dim=1)
        if add_positional_encoding:
            x = self.positional_encoding(x)
        attention_maps = self.transformer.get_attention_maps(x, mask=mask)
        return attention_maps



## Experiment: Sequence Classification

Let's try to do some classification with a simple task.

The following implements a dataset that counts the number of 0s in a sequence.

In [None]:
class ZeroCountDataset(data.Dataset):
    def __init__(self, num_categories, seq_len, size):
        super().__init__()
        self.num_categories = num_categories
        self.seq_len = seq_len
        self.size = size

        self.data = torch.randint(10, size=(self.size, self.seq_len))

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        inp_data = self.data[idx]
        labels = torch.sum(inp_data == 0)
        return inp_data, labels

We create an arbitrary number of random sequences of numbers between 0 and `num_categories-1`.
The label is simply the number of 0s.
We can create the corresponding data loaders below.

In [None]:
dataset = partial(ZeroCountDataset, 16, 16)
train_dl = data.DataLoader(dataset(50000), batch_size=128, shuffle=True, drop_last=True, pin_memory=True)
val_dl = data.DataLoader(dataset(1000), batch_size=128)
test_dl = data.DataLoader(dataset(10000), batch_size=128)

Let's look at an arbitrary sample of the dataset:

In [None]:
inp_data, labels = train_dl.dataset[0]
print("Input data:", inp_data)
print("Labels:    ", labels)

In [None]:
def train_step(model, x, y, optim):
    model.train()

    # Fetch data and transform categories to one-hot vectors
    inp_data = F.one_hot(x, num_classes=10).float()

    # Perform prediction and calculate loss and accuracy
    preds = model(inp_data, add_positional_encoding=True)
    loss = F.cross_entropy(preds.view(-1, preds.size(-1)), y.view(-1))
    acc = (preds.argmax(dim=-1) == y).float().mean()

    # Backpropagate and update weights
    loss.backward()
    optim.step()
    model.zero_grad()

    return loss, acc

def eval_step(model, x, y):
    with torch.no_grad():
        model.eval()

        # Fetch data and transform categories to one-hot vectors
        inp_data = F.one_hot(x, num_classes=model.num_classes).float()

        # Perform prediction and calculate loss and accuracy
        preds = model(inp_data, add_positional_encoding=True)
        loss = F.cross_entropy(preds.view(-1, preds.size(-1)), y.view(-1))
        acc = (preds.argmax(dim=-1) == y).float().mean()

    return loss, acc


Finally, we can create a training function similar to the one we have seen in previous laboratories. We running for $N$ epochs printing the training and validation loss and saving our best model based on the validation.
Afterward, we test our models on the test set.

In [None]:
def train_model(model, train_loader, val_loader, test_loader,
                optim, epochs=5):
    best_acc = 0.
    pbar = tqdm(range(epochs))
    for e in range(epochs):
        train_loss, train_acc = 0., 0.
        for x, y in train_loader:
            loss, acc = train_step(model, x, y, optim)
            train_loss += loss
            train_acc += acc

        val_loss, val_acc = 0., 0.
        for x, y in val_loader:
            loss, acc = eval_step(model, x, y)
            val_loss += loss
            val_acc += acc

        if val_acc/len(val_loader) > best_acc:
            torch.save(model.state_dict(), "best_model.pt")
            best_acc = val_acc/len(val_loader)

        pbar.update()
        pbar.set_description(f"Train Acc: {train_acc/len(train_loader)* 100:.2f} "
                            f"Train Loss: {train_loss/len(train_loader):.2f} "
                            f"Val Acc: {val_acc/len(val_loader)* 100 :.2f}  "
                            f"Val loss: {val_loss/len(val_loader):.2f} ")

    test_loss, test_acc = 0., 0.
    for x, y in test_loader:
        loss, acc = eval_step(model, x, y)
        test_loss += loss
        test_acc += acc

    print(f"Test accuracy: {test_acc/len(test_loader)*100 :.2f}")

    pbar.close()
    model.load_state_dict(torch.load("best_model.pt"))

    return model

Finally, we can train the model.
In this setup, we will use a single encoder block and a single head in the Multi-Head Attention.
This is chosen because of the simplicity of the task, and in this case, the attention can actually be interpreted
as an "explanation" of the predictions (compared to the other papers above dealing with deep Transformers).

In [None]:
count0_model = TransformerPredictor(
    input_dim=10,
    model_dim=32,
    num_heads=1,
    num_classes=10,
    num_layers=1,
    dropout=0.0,
)
optimizer = optim.AdamW(count0_model.parameters(), lr=0.001)

count0_model = train_model(count0_model, train_dl, val_dl, test_dl, optimizer)

As we would have expected, the Transformer can correctly solve the task.
However, how does the attention in the Multi-Head Attention block looks like for an arbitrary input?
Let's try to visualize it below.

In [None]:
#######################
### YOUR CODE HERE! ###
#######################

## Bonus: 0 detection

Let's try to do go further and detect the position and number of 0s in sentence that contains only one block of 0s.

The following dataset implements sentences of digits that contains one block of 0s (between 1 and 4 consecutive 0s).

In [None]:
class ZeroDetectDataset(data.Dataset):
    def __init__(self, seq_len, size):
        super().__init__()
        self.seq_len = seq_len
        self.size = size

        self.data = torch.randint(1, 10, size=(self.size, self.seq_len))
        self.starts = torch.randint(self.seq_len-4, size=(self.size,))
        self.lengths = torch.randint(1, 4, size=(self.size,))

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        inp_data = self.data[idx]
        inp_starts = self.starts[idx]
        inp_lengths = self.lengths[idx]

        inp_data[inp_starts:inp_starts+inp_lengths]=0
        return inp_data, inp_starts, inp_lengths

We create an arbitrary number of random sequences of numbers between 0 and `num_categories-1`.
The label is simply the number of 0s.
We can create the corresponding data loaders below.

In [None]:
dataset = partial(ZeroDetectDataset, 16)
train_dl = data.DataLoader(dataset(50000), batch_size=128, shuffle=True, drop_last=True, pin_memory=True)
val_dl = data.DataLoader(dataset(1000), batch_size=128)
test_dl = data.DataLoader(dataset(10000), batch_size=128)

Let's look at an arbitrary sample of the dataset:

In [None]:
inp_data, inp_start, inp_length = train_dl.dataset[0]
print("Input data:", inp_data)
print("Input start:", inp_start)
print("Input length:", inp_length)

We now have two labels: the position of our "0" block, and the number of 0s to detect. This is basically detecting a bounding box in the sentence!

To predict this, we are going to need to modify slightly the transformer model

In [None]:
class TransformerPredictor(nn.Module):
    def __init__(
        self,
        input_dim,
        model_dim,
        num_start,
        num_length,
        num_heads,
        num_layers,
        dropout=0.0,
        input_dropout=0.0,
    ):
        """
        Args:
            input_dim: Hidden dimensionality of the input
            model_dim: Hidden dimensionality to use inside the Transformer
            num_classes: Number of classes to predict per sequence element
            num_heads: Number of heads to use in the Multi-Head Attention blocks
            num_layers: Number of encoder blocks to use.
            lr: Learning rate in the optimizer
            warmup: Number of warmup steps. Usually between 50 and 500
            max_iters: Number of maximum iterations the model is trained for. This is needed for the CosineWarmup scheduler
            dropout: Dropout to apply inside the model
            input_dropout: Dropout to apply on the input features
        """
        super().__init__()
        self.input_dim = input_dim
        self.model_dim = model_dim
        self.num_start = num_start
        self.num_length = num_length
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.dropout = dropout
        self.input_dropout = input_dropout

        # Create a Generic Input Encoder Input dim -> Model dim with input dropout
        self.input_net = nn.Sequential(
            nn.Linear(input_dim, model_dim),
            nn.Dropout(input_dropout)
        )

        # Create positional encoding for sequences
        self.positional_encoding = PositionalEncoding(model_dim)

        # Create transformer Encoder
        self.transformer = TransformerEncoder(num_layers, input_dim=model_dim, dim_feedforward=model_dim*2, num_heads=num_heads, dropout=dropout)

        #######################
        ### YOUR CODE HERE! ###
        #######################

        # Create output classifier per sequence element Model_dim -> num_classes
        self.start_net = None
        self.length_net = None

        # Create classification token
        self.cls_token = None

    def forward(self, x, mask=None, add_positional_encoding=True):
        """
        Args:
            x: Input features of shape [Batch, SeqLen, input_dim]
            mask: Mask to apply on the attention outputs (optional)
            add_positional_encoding: If True, we add the positional encoding to the input.
                                      Might not be desired for some tasks.
        """
        x = self.input_net(x)

        # Add the cls token
        x = None

        if add_positional_encoding:
            x = self.positional_encoding(x)
        x = None

        # Get the output! Remember we only care about the classification token!
        start = None
        length = None

        #######################
        ###       END       ###
        #######################

        return start, length

    @torch.no_grad()
    def get_attention_maps(self, x, mask=None, add_positional_encoding=True):
        """Function for extracting the attention matrices of the whole Transformer for a single batch.

        Input arguments same as the forward pass.
        """
        x = self.input_net(x)
        x = torch.cat([self.cls_token.expand(x.shape[0],-1,-1), x], dim=1)
        if add_positional_encoding:
            x = self.positional_encoding(x)
        attention_maps = self.transformer.get_attention_maps(x, mask=mask)
        return attention_maps



In [None]:
def train_step(model, x, y_start, y_length, optim):
    model.train()

    # Fetch data and transform categories to one-hot vectors
    inp_data = F.one_hot(x, num_classes=10).float()

    # Perform prediction and calculate loss and accuracy
    preds_start, preds_length = model(inp_data, add_positional_encoding=True)
    loss = F.cross_entropy(preds_start.view(-1, preds_start.size(-1)), y_start.view(-1))
    loss += F.cross_entropy(preds_length.view(-1, preds_length.size(-1)), y_length.view(-1))

    acc = (preds_start.argmax(dim=-1) == y_start).float().mean()
    acc += (preds_length.argmax(dim=-1) == y_length).float().mean()

    # Backpropagate and update weights
    loss.backward()
    optim.step()
    model.zero_grad()

    return loss, acc/2

def eval_step(model, x, y_start, y_length):
    with torch.no_grad():
        model.eval()

        # Fetch data and transform categories to one-hot vectors
        inp_data = F.one_hot(x, num_classes=10).float()

        # Perform prediction and calculate loss and accuracy
        preds_start, preds_length = model(inp_data, add_positional_encoding=True)
        loss = F.cross_entropy(preds_start.view(-1, preds_start.size(-1)), y_start.view(-1))
        loss += F.cross_entropy(preds_length.view(-1, preds_length.size(-1)), y_length.view(-1))

        acc = (preds_start.argmax(dim=-1) == y_start).float().mean()
        acc += (preds_length.argmax(dim=-1) == y_length).float().mean()

    return loss, acc/2


Finally, we can create a training function similar to the one we have seen in previous laboratories. We running for $N$ epochs printing the training and validation loss and saving our best model based on the validation.
Afterward, we test our models on the test set.

In [None]:
def train_model(model, train_loader, val_loader, test_loader,
                optim, epochs=10):
    best_acc = 0.
    pbar = tqdm(range(epochs))
    for e in range(epochs):
        train_loss, train_acc = 0., 0.
        for x, y_start, y_length in train_loader:
            loss, acc = train_step(model, x, y_start, y_length, optim)
            train_loss += loss
            train_acc += acc

        val_loss, val_acc = 0., 0.
        for x, y_start, y_length in val_loader:
            loss, acc = eval_step(model, x, y_start, y_length)
            val_loss += loss
            val_acc += acc

        if val_acc/len(val_loader) > best_acc:
            torch.save(model.state_dict(), "best_model.pt")
            best_acc = val_acc/len(val_loader)

        pbar.update()
        pbar.set_description(f"Train Acc: {train_acc/len(train_loader)* 100:.2f} "
                            f"Train Loss: {train_loss/len(train_loader):.2f} "
                            f"Val Acc: {val_acc/len(val_loader)* 100 :.2f}  "
                            f"Val loss: {val_loss/len(val_loader):.2f} ")

    test_loss, test_acc = 0., 0.
    for x, y_start, y_length in test_loader:
        loss, acc = eval_step(model, x, y_start, y_length)
        test_loss += loss
        test_acc += acc

    print(f"Test accuracy: {test_acc/len(test_loader)*100 :.2f}")

    pbar.close()
    model.load_state_dict(torch.load("best_model.pt"))

    return model

Finally, we can train the model.
In this setup, we will use a single encoder block and a single head in the Multi-Head Attention.
This is chosen because of the simplicity of the task, and in this case, the attention can actually be interpreted
as an "explanation" of the predictions (compared to the other papers above dealing with deep Transformers).

In [None]:
detect0_model = TransformerPredictor(
    input_dim=10,
    model_dim=32,
    num_heads=1,
    num_start=12,
    num_length=4,
    num_layers=1,
    dropout=0.0,
)
optimizer = optim.AdamW(detect0_model.parameters(), lr=0.001)

detect0_model = train_model(detect0_model, train_dl, val_dl, test_dl, optimizer)

Can you check the model works?

In [None]:
#######################
### YOUR CODE HERE! ###
#######################

# Conclusion

And that's it! Remember that you can use dedicated learnable tokens to accumulate features useful for a global task.

You can try to complexify the task if you are interested! For instance you could try:
*   Checking if a particular digit is in the list
*   Finding out what the highest digit is
*   Finding out what the most frequent digit is

