# 5. Transformers on MNIST

### About this notebook

This notebook was used in the 50.039 Deep Learning course at the Singapore University of Technology and Design.

**Author:** Matthieu DE MARI (matthieu_demari@sutd.edu.sg)

**Version:** 1.1 (29/08/2023)

**Requirements:**
- Python 3 (tested on v3.11.4)
- Matplotlib (tested on v3.7.2)
- Numpy (tested on v1.25.2)
- Torch (tested on v2.0.1+cu118)
- Torchvision (tested on v0.15.2+cu118)
- We also strongly recommend setting up CUDA on your machine! (At this point, honestly, it is almost mandatory).

### Imports and CUDA

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.datasets as datasets
CUDA = torch.cuda.is_available()
device = torch.device("cuda" if CUDA else "cpu")

### Load MNIST

At this point, do I really need to explain what this does?

In [10]:
from DatasetGenerator import DatasetGenerator

In [11]:
pathDirData = '../raw_data/archive'
pathFileTrain = './dataset/train_1.txt'
pathFileVal = './dataset/val_1.txt'

transResize = 256
transCrop = 224
trBatchSize = 4
num_class = 14

normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

transformList = []
# transformList.append(transforms.Resize(transResize))
transformList.append(transforms.RandomResizedCrop(transCrop))
transformList.append(transforms.RandomHorizontalFlip())
transformList.append(transforms.ToTensor())
transformList.append(normalize)      
transformSequence=transforms.Compose(transformList)

datasetTrain = DatasetGenerator(pathImageDirectory=pathDirData, pathDatasetFile=pathFileTrain, transform=transformSequence)
datasetVal =   DatasetGenerator(pathImageDirectory=pathDirData, pathDatasetFile=pathFileVal, transform=transformSequence)
train_loader = DataLoader(dataset=datasetTrain, batch_size=trBatchSize, shuffle=True,  num_workers=12, pin_memory=True)
val_loader = DataLoader(dataset=datasetVal, batch_size=trBatchSize, shuffle=False, num_workers=12, pin_memory=True)

### Define self-attention layer, and Transformer model

We will have to flatten the images to process them with Linear operations and attention operations.

In [12]:
# Define a self-attention layer implementation
class SelfAttentionLayer(nn.Module):
    def __init__(self, in_features):
        super(SelfAttentionLayer, self).__init__()
        self.in_features = in_features
        self.query = nn.Linear(in_features, in_features)
        self.key = nn.Linear(in_features, in_features)
        self.value = nn.Linear(in_features, in_features)

    def forward(self, x):
        batch_size = x.size(0)
        query = self.query(x).view(batch_size, -1, self.in_features)
        key = self.key(x).view(batch_size, -1, self.in_features)
        value = self.value(x).view(batch_size, -1, self.in_features)
        attention_weights = F.softmax(torch.bmm(query, key.transpose(1, 2))/(self.in_features**0.5), dim = 2)
        out = torch.bmm(attention_weights, value).view(batch_size, -1)
        return out

In [13]:
# Neural network definition using self-attention
class Transformer(nn.Module):
    def __init__(self):
        super(Transformer, self).__init__()
        self.avg_pool = nn.AvgPool2d(8)
        self.dim_reduce = nn.Conv2d(in_channels = 3, out_channels=1, kernel_size = 1, stride = 1, padding = 0, bias = False)
        self.fc1 = nn.Linear(28*28, 128)
        self.attention1 = SelfAttentionLayer(128)
        self.fc2 = nn.Linear(128, 64)
        self.attention2 = SelfAttentionLayer(64)
        self.fc3 = nn.Linear(64, 14)

    def forward(self, x):
        x = self.avg_pool(x)
        x = self.dim_reduce(x)
        x = x.view(-1, 28*28)
        x = F.relu(self.fc1(x))
        x = self.attention1(x)
        x = F.relu(self.fc2(x))
        x = self.attention2(x)
        x = self.fc3(x)
        # x = nn.Softmax(x)

        return x

### Try out our model

Create model and see its structure

In [14]:
# Create model
model = Transformer()
print(model)

Transformer(
  (avg_pool): AvgPool2d(kernel_size=8, stride=8, padding=0)
  (dim_reduce): Conv2d(3, 1, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (fc1): Linear(in_features=784, out_features=128, bias=True)
  (attention1): SelfAttentionLayer(
    (query): Linear(in_features=128, out_features=128, bias=True)
    (key): Linear(in_features=128, out_features=128, bias=True)
    (value): Linear(in_features=128, out_features=128, bias=True)
  )
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (attention2): SelfAttentionLayer(
    (query): Linear(in_features=64, out_features=64, bias=True)
    (key): Linear(in_features=64, out_features=64, bias=True)
    (value): Linear(in_features=64, out_features=64, bias=True)
  )
  (fc3): Linear(in_features=64, out_features=14, bias=True)
)


### Simple trainer like before

Again, very similar to what we have done in Week 4...

In [15]:
## function to calculate the F1 score
def f1_score(tp, fp, fn):
    return 2 * (tp) / (2 * tp + fp + fn)

In [16]:
# Create model
model = Transformer()
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)

# Train the model
n_epochs = 5
for epoch in range(n_epochs):
    tp_array = [0 for x in range(num_class)]
    fp_array = [0 for x in range(num_class)]
    fn_array = [0 for x in range(num_class)]
    for i, (images, labels) in enumerate(train_loader):
        # Flatten image
        # print(images.shape)
        # images = images.reshape(-1, 28*28)
        # Forward pass
        outputs = model(images)
        # print("shape of input: "+ str(images.shape))
        # print("shape of output " + str(outputs.shape))
        # print("shape of labels: " + str(labels.shape))
        loss = criterion(outputs, labels)
        # calculate statistics
        pred_labels = (nn.Softmax(dim=1)(outputs) > 0.5).long()
        tp_array += sum(torch.logical_and(pred_labels, labels))
        fp_array += sum(torch.logical_and(torch.logical_xor(pred_labels, labels).long(), pred_labels))
        fn_array += sum(torch.logical_and(torch.logical_xor(pred_labels, labels).long(), labels))
        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Display
        if (i + 1) % 100 == 0:
            print("Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, tp_sum: {:.4f}, fp_sum: {:.4f}, fn_sum: {:.4f}, cumulative_f1_score: {:.4f}".format(epoch + 1, \
                                                                     n_epochs, \
                                                                     i + 1, \
                                                                     len(train_loader), \
                                                                     loss.item(),\
                                                                     sum(tp_array), \
                                                                     sum(fp_array),\
                                                                     sum(fn_array),\
                                                                     f1_score(sum(tp_array), sum(fp_array), sum(fn_array))))

Epoch [1/5], Step [100/19617], Loss: 1.4938, tp_sum: 0.0000, fp_sum: 8.0000, fn_sum: 299.0000, cumulative_f1_score: 0.0000
Epoch [1/5], Step [200/19617], Loss: 2.5127, tp_sum: 0.0000, fp_sum: 11.0000, fn_sum: 593.0000, cumulative_f1_score: 0.0000
Epoch [1/5], Step [300/19617], Loss: 1.9752, tp_sum: 0.0000, fp_sum: 11.0000, fn_sum: 870.0000, cumulative_f1_score: 0.0000
Epoch [1/5], Step [400/19617], Loss: 1.4754, tp_sum: 0.0000, fp_sum: 12.0000, fn_sum: 1141.0000, cumulative_f1_score: 0.0000
Epoch [1/5], Step [500/19617], Loss: -0.0000, tp_sum: 0.0000, fp_sum: 13.0000, fn_sum: 1423.0000, cumulative_f1_score: 0.0000
Epoch [1/5], Step [600/19617], Loss: 1.8551, tp_sum: 0.0000, fp_sum: 13.0000, fn_sum: 1699.0000, cumulative_f1_score: 0.0000
Epoch [1/5], Step [700/19617], Loss: 1.1193, tp_sum: 0.0000, fp_sum: 13.0000, fn_sum: 1973.0000, cumulative_f1_score: 0.0000
Epoch [1/5], Step [800/19617], Loss: 3.4640, tp_sum: 0.0000, fp_sum: 13.0000, fn_sum: 2279.0000, cumulative_f1_score: 0.0000
Epo

KeyboardInterrupt: 

In [None]:
torch.randn(2, 3)

tensor([[ 1.4702, -0.2531, -1.2555],
        [-0.2202,  1.0028, -0.4013]])

In [None]:
(nn.Softmax(dim=1)(input) > 0.5).long()


tensor([[0, 0, 0],
        [0, 1, 0]])

In [None]:
nn.Softmax(dim=)

Softmax(
  dim=tensor([[ 1.0308e+00, -5.6095e-01,  7.5387e-01,  1.2724e+00,  2.4425e-01,
            2.5905e-01, -1.5240e+00,  1.3018e-01,  4.2109e-02, -6.5810e-01,
           -6.1871e-01, -8.5326e-01, -5.8720e-01, -2.7311e+00],
          [ 8.8267e-01, -5.9323e-01,  7.6515e-01,  1.0403e+00,  2.5159e-03,
            8.5370e-03, -1.2536e+00, -6.8686e-02,  9.9156e-02, -4.8709e-01,
           -7.8932e-01, -9.6005e-01, -4.1589e-01, -2.3021e+00],
          [ 8.0129e-01, -4.6816e-01,  6.3613e-01,  1.0682e+00,  1.2404e-01,
            1.4091e-01, -1.1497e+00,  4.5737e-02, -2.4764e-02, -4.4636e-01,
           -4.6768e-01, -7.2844e-01, -4.1407e-01, -2.1323e+00],
          [ 1.0949e+00, -4.7864e-01,  9.8542e-01,  1.4261e+00,  2.3810e-01,
            9.3057e-02, -1.7310e+00, -1.6389e-01, -8.2908e-02, -5.7505e-01,
           -7.9433e-01, -9.7229e-01, -5.2190e-01, -2.9390e+00]],
         grad_fn=<AddmmBackward0>)
)

In [None]:
torch.max(outputs.data, 1)

torch.return_types.max(
values=tensor([1.2724, 1.0403, 1.0682, 1.4261]),
indices=tensor([3, 3, 3, 3]))

In [None]:
import torch

In [None]:
a = torch.tensor([[0,1,0,0],[1,0,0,0]])
b = torch.tensor([[0,1,0,0],[0,1,0,0]])
print(torch.logical_xor(a,b).long())
print(sum(sum(torch.logical_xor(a,b))))

tensor([[0, 0, 0, 0],
        [1, 1, 0, 0]])
tensor(2)


### Evaluate model

We get a 97% test accuracy, after only 5 iterations of training!

In [None]:
# Test the model
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in train_loader:
        # Flatten images
        # images = images.reshape(-1, 28 * 28)
        # Forward pass and accuracy calculation
        outputs = model(images)
        # _, predicted = torch.max(outputs.data, 1)
        
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    # Final display
    print("Test Accuracy: {} %".format(100*correct/total))

RuntimeError: The size of tensor a (4) must match the size of tensor b (14) at non-singleton dimension 1

In [None]:
outputs 

NameError: name 'outputs' is not defined

### Quick question

Could we obtain a better performance could be obtained by combining Convolutional operations and Attention ones?

Would the layer below do the trick?

In [None]:
# Define a convolutional attention layer implementation
class ConvAttentionLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super(ConvAttentionLayer, self).__init__()
        self.query_conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size // 2)
        self.key_conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size // 2)
        self.value_conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size // 2)

    def forward(self, x):
        query = self.query_conv(x)
        key = self.key_conv(x)
        value = self.value_conv(x)
        batch_size, channels, height, width = query.size()
        query = query.view(batch_size, channels, -1)
        key = key.view(batch_size, channels, -1)
        value = value.view(batch_size, channels, -1)
        attention_weights = F.softmax(torch.bmm(query.transpose(1, 2), key), dim=2)
        out = torch.bmm(value, attention_weights).view(batch_size, channels, height, width)
        return out

Could we then use it to assemble a Convolutional Transformer?

In [None]:
# Neural network definition using convolutional attention
class ConvTransformer(nn.Module):
    def __init__(self):
        super(ConvTransformer, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size = 3, padding = 1)
        self.attention1 = ConvAttentionLayer(16, 16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size = 3, padding = 1)
        self.attention2 = ConvAttentionLayer(32, 32)
        self.fc = nn.Linear(32*28*28, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.attention1(x)
        x = F.relu(self.conv2(x))
        x = self.attention2(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# Create model
conv_model = ConvTransformer()
print(conv_model)

ConvTransformer(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (attention1): ConvAttentionLayer(
    (query_conv): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (key_conv): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (value_conv): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (attention2): ConvAttentionLayer(
    (query_conv): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (key_conv): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (value_conv): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
  (fc): Linear(in_features=25088, out_features=10, bias=True)
)


**Open question:** Would that train and obtain better performance than the "Linear" transformer we trained earlier?