In [3]:
import numpy as np

from tqdm import tqdm, trange

import torch
import torch.nn as nn
from torch.optim import Adam
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader

from torchvision.transforms import ToTensor
from torchvision.datasets.mnist import MNIST

In [4]:
np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x7f2dd9999b50>

In [8]:
transform = ToTensor()
train_set = MNIST(root='./datasets', train=True, download=True, transform=transform)
test_set = MNIST(root='./datasets', train=False, download=True, transform=transform)


In [9]:
train_loader = DataLoader(train_set, shuffle=True, batch_size=128)
test_loader = DataLoader(test_set, shuffle=False, batch_size=128)

In [11]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device: ' + str(device))

Device: cuda


In [67]:
model = MyViT((1, 28, 28), n_patches = 7, n_blocks = 2, hidden_d = 8, n_heads = 2, out_d = 10).to(device)

In [68]:
N_EPOCHS = 5
LR = 0.005

In [69]:
optimizer = Adam(model.parameters(), lr = LR)
criterion = CrossEntropyLoss()

In [72]:
# Training
for epoch in trange(N_EPOCHS, desc='Training'):
    train_loss = 0.0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1} in training", leave=False):
        x, y = batch
        x, y = x.to(device), y.to(device)
        y_hat = model(x)
        loss = criterion(y_hat, y)
        train_loss += loss.detach().cpu() / len(train_loader) # faccio la media delle loss?

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}/{N_EPOCHS} loss: {train_loss:.2f}")
return train_loss

Training:   0%|          | 0/5 [00:00<?, ?it/s]
Epoch 1 in training:   0%|          | 0/469 [00:00<?, ?it/s][A
Epoch 1 in training:   0%|          | 1/469 [00:00<06:42,  1.16it/s][A
Epoch 1 in training:   0%|          | 2/469 [00:01<06:05,  1.28it/s][A
Epoch 1 in training:   1%|          | 3/469 [00:02<06:21,  1.22it/s][A
Epoch 1 in training:   1%|          | 4/469 [00:03<06:09,  1.26it/s][A
Epoch 1 in training:   1%|          | 5/469 [00:03<06:02,  1.28it/s][A
Epoch 1 in training:   1%|▏         | 6/469 [00:04<05:45,  1.34it/s][A
Epoch 1 in training:   1%|▏         | 7/469 [00:05<05:43,  1.34it/s][A
Epoch 1 in training:   2%|▏         | 8/469 [00:06<05:36,  1.37it/s][A
Epoch 1 in training:   2%|▏         | 9/469 [00:06<05:52,  1.30it/s][A
Epoch 1 in training:   2%|▏         | 10/469 [00:07<05:59,  1.28it/s][A
Training:   0%|          | 0/5 [00:08<?, ?it/s]                      [A


KeyboardInterrupt: 

In [None]:
# test
with torch.no_grad():
    correct, total = 0, 0
    test_loss = 0.0
    for batch in tqdm(test_loader, desc='Testing'):
        x, y = batch
        x, y = x.to(device), y.to(device)
        y_hat = model(x)

        loss = criterion(y_hat, y)
        test_loss += loss.detach().cpu() / len(test_loader)

        correct += torch.sum(torch.argmax(y_hat, dim=1) == y).detach().cpu().item()
        total += len(x)
    print(f"Test loss: {test_loss:.2f}")
    print(f"Test accuracy: {correct / total * 100:.2f}%")

In [27]:
# Devo passare da avere immagini che hanno dimensione (N, C, H, W), 
# rispettivamente: numero di elementi nel batch, canali, altezza e 
# larghezza dell'immagine, ad oggetti di dimensione (N, N_PATCHES, PATCHES_DIM)
# Il numero di patch (N_PATCHES) è parametro del modello.
# Il reshape sarà: (N, PxP, HxC/P x WxC/P), dove P è il numero di patch su una 
# sola dimensione, quindi il numero totale di patches sarà PxP. La dimensione delle
# patch su ogni dimensione sarà H/P e W/P, ed ognuna di esse dovrà essere moltiplicata
# per il numero di canali dell'immagine. La PATCHES_DIM sarà il prodotto di questi 
# due valori

def patchify(images, n_patches):
    # n_patches è il numero di patch su una dimensione dell'immagine
    n, c, h, w = images.shape

    assert h == w, "Images must be squares"

    patches = torch.zeros(n, n_patches ** 2, h*c*w // n_patches ** 2) # Cos'è l'ultima dimensione?
    patch_size = h // n_patches

    for idx, image in enumerate(images):
        for i in range(n_patches):
            for j in range(n_patches):
                # rompo le immagini in patch (la prima dimensione è il numero di canali, li prendo tutti)
                # facendo scorrere i vari patch sopra l'immagine.
                patch = image[:, i*patch_size : (i+1) * patch_size, j*patch_size : (j+1) * patch_size]
                # salvo le patch schiacciate in una matrice, rispecchiando
                # nella matrice le posizioni che le patch avevano nell'immagine
                patches[idx, i*n_patches+j] = patch.flatten()

    return patches

In [38]:
def get_positional_embeddings(sequence_length, d):
    result = torch.ones(sequence_length, d)
    for i in range(sequence_length):
        for j in range(d):
            result[i][j] = np.sin(i / (10000 ** (j / d))) if j % 2 == 0 else np.cos(i / (10000 ** ((j - 1) / d)))
    return result

In [65]:
class MyViT(nn.Module):
    def __init__(self, chw = (1, 28, 28), n_patches = 7, n_blocks=2, hidden_d=8, n_heads=2, out_d=10):
        super(MyViT, self).__init__()
        
        # Attributi
        self.chw = chw
        self.n_patches = n_patches
        self.hidden_d = hidden_d
        self.n_blocks = n_blocks
        self.n_heads = n_heads
        
        self.patch_size = (chw[1] / n_patches, chw[2] / n_patches)
        
        # Primo mapping lineare: necessario perchè non posso dare in input
        # al transformer un patch a due dimensioni. Uso una trasformazione lineare
        # che mi porta da una matrice 4x4 (singolo patch) ad un mapping lungo 8.
        # La dimensione di output del mapping è arbitraria e fissa.
        
        # Dimensione in input per la trasformazione lineare, in questo caso è 16 (4x4)
        self.input_d = int(chw[0] * self.patch_size[0] * self.patch_size[1])
        
        # Trasformazione lineare (è solo una matrice)
        self.linear_mapper = nn.Linear(self.input_d, self.hidden_d)
        
        # Aggiungo il token CLS alla sequenza: è un token posto all'inizio della sequenza dei
        # patch schiacciati e trasformati linearmente. Voglio che, durante il training,
        # a tale token venga 'assegnata' l'informazione globale contenuta in tutti gli
        # altri patch della sequenza, in modo tale da poter fare classificazione.
        # Il valore di tale token deve essere apprendibile dal modello, per questo
        # è di tipo nn.Parameter (ovvero un tensore che viene aggiunto al modello e
        # su cui calcolo la discesa del gradiente affinchè contenga l'informazione
        # necessaria al task che voglio che faccia). Molto interessante: posso far fare
        # ai vari token i task che voglio, fintanto che sono di tipo nn.Parameter
        # perchè il training farà in modo che apprendano l'informazione necessaria al
        # task che gli assegno.
        
        self.class_token = nn.Parameter(torch.rand(1, self.hidden_d)) # Inizializzato a caso.
        
        # Aggiungo i positional embeddings
        # self.pos_embed = nn.Parameter(torch.Tensor(get_positional_embeddings(self.n_patches ** 2 + 1, self.hidden_d)))
        # self.pos_embed.requires_grad = False # Uso solo seno e coseno per calcolarli, non sono valori da apprendere
        self.register_buffer('positional_embeddings', get_positional_embeddings(n_patches ** 2 + 1, hidden_d), persistent=False)
        
        # Blocchi di encoder
        self.blocks = nn.ModuleList([MyViTBlock(hidden_d, n_heads) for _ in range(n_blocks)])
        
        # MLP per la classificazione
        self.mlp = nn.Sequential(
            nn.Linear(self.hidden_d, out_d),
            nn.Softmax(dim=-1)
        )
        
    def forward(self, images):
        
        n, c, h, w = images.shape
        patches = patchify(images, self.n_patches).to(self.positional_embeddings.device)
        tokens = self.linear_mapper(patches)
        
        # Aggiungo il CLS alla sequenza
        # tokens = torch.stack([torch.vstack((self.class_token, tokens[i])) for i in range(len(tokens))])
        tokens = torch.cat((self.class_token.expand(n, 1, -1), tokens), dim=1)
        
        # Aggiungo ai token l'informazione posizionale
        # pos_embed = self.pos_embed.repeat(images.shape[0], 1, 1)
        # out = tokens + pos_embed
        out = tokens + self.positional_embeddings.repeat(n, 1, 1)
        
        # Itero sui vari blocchi del transformer
        for block in self.blocks:
            out = block(out)
        
        # Estraggo solo il token CLS (che sta all'inizio di ogni sequenza)
        out = out[:, 0]
        
        return self.mlp(out) # faccio classificazione su tale token

In [55]:
class MyMSA(nn.Module):
    def __init__(self, d, n_heads = 2):
        super(MyMSA, self).__init__()
        self.d = d
        self.n_heads = n_heads
        
        d_head = int(d / n_heads)
        
        # Tutte e tre K, Q, V sono matrici, dunque trasformazioni lineari
        self.q_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
        self.k_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
        self.v_mappings = nn.ModuleList([nn.Linear(d_head, d_head) for _ in range(self.n_heads)])
        
        self.d_head = d_head
        self.softmax = nn.Softmax(-1)
        
    def forward(self, sequences):
        # La sequenza di token ha shape (N, seq_length, token_dim)
        # La portiamo alla shape (N, seq_length, n_heads, token_dim / n_heads)
        # E poi la riportiamo a 
        
        result = []
        for sequence in sequences:
            seq_result = []
            for head in range(self.n_heads):
                q_mapping = self.q_mappings[head]
                k_mapping = self.k_mappings[head]
                v_mapping = self.v_mappings[head]
                
                seq = sequence[:, head * self.d_head : (head + 1) * self.d_head]
                q, k, v = q_mapping(seq), k_mapping(seq), v_mapping(seq)
                
                attention = self.softmax(q @ k.T / (self.d_head ** 0.5))
                seq_result.append(attention @ v)
            result.append(torch.hstack(seq_result))
            
        return torch.cat([torch.unsqueeze(r, dim=0) for r in result])

In [52]:
class MyViTBlock(nn.Module):
    def __init__(self, hidden_d, n_heads, mlp_ratio=4):
        super(MyViTBlock, self).__init__()
        self.hidden_d = hidden_d
        self.n_heads = n_heads

        self.norm1 = nn.LayerNorm(hidden_d)
        self.mhsa = MyMSA(hidden_d, n_heads)
        self.norm2 = nn.LayerNorm(hidden_d)
        self.mlp = nn.Sequential(
            nn.Linear(hidden_d, mlp_ratio * hidden_d),
            nn.GELU(),
            nn.Linear(mlp_ratio * hidden_d, hidden_d)
        )

    def forward(self, x):
        out = x + self.mhsa(self.norm1(x))
        out = out + self.mlp(self.norm2(out))
        return out

In [56]:
model = MyViTBlock(hidden_d=8, n_heads=2)

x = torch.randn(7, 50, 8)  # Dummy sequences
print(model(x).shape)      # torch.Size([7, 50, 8])

torch.Size([7, 50, 8])


In [66]:
model = MyViT(chw=(1, 28, 28), n_patches=7)
x = torch.randn(7, 1, 28, 28)
print(model(x).shape)

torch.Size([7, 10])
