# Traduction en français du notebook *15* du cours ***Deep Learning*** d'Alfredo Canziani, professeur assistant à la *New York University* :
https://github.com/Atcold/pytorch-Deep-Learning/blob/master/15-transformer.ipynb

# Transformers

Pour comprendre tout ce qui se passe ci-dessous, consultez d'abord les notes de cours / la leçon vidéo.

In [1]:
import torch 
from torch import nn
import torch.nn.functional as f
import numpy as np 

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
nn_Softargmax = nn.Softmax  

## Attention multi-têtes

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, p, d_input=None):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        if d_input is None:
            d_xq = d_xk = d_xv = d_model
        else:
            d_xq, d_xk, d_xv = d_input
            
        # S'assurer que la dimension d'enchâssement du modèle est un multiple du nombre de têtes
        assert d_model % self.num_heads == 0

        self.d_k = d_model // self.num_heads
        
        # Ils sont encore de dimension d_model. Ils seront divisés en nombre de têtes 
        self.W_q = nn.Linear(d_xq, d_model, bias=False)
        self.W_k = nn.Linear(d_xk, d_model, bias=False)
        self.W_v = nn.Linear(d_xv, d_model, bias=False)
        
        # Les résultats de toutes les sous-couches doivent être de dimension d_model
        self.W_h = nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V):
        batch_size = Q.size(0) 
        k_length = K.size(-2) 
        
        # Mise à l'échelle par d_k pour que le soft(arg)max ne sature pas
        Q = Q / np.sqrt(self.d_k)                         # (bs, n_heads, q_length, dim_per_head)
        scores = torch.matmul(Q, K.transpose(2,3))          # (bs, n_heads, q_length, k_length)
        
        A = nn_Softargmax(dim=-1)(scores)   # (bs, n_heads, q_length, k_length)
        
        # Obtenir la moyenne pondérée des valeurs
        H = torch.matmul(A, V)     # (bs, n_heads, q_length, dim_per_head)

        return H, A 

        
    def split_heads(self, x, batch_size):
        """
        Découpe la dernière dimension en (têtes X profondeur)
        Retourne après transposition pour mettre en forme (batch_size X num_heads X seq_length X d_k)
        """
        return x.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

    def group_heads(self, x, batch_size):
        """
        Combinez à nouveau les têtes pour obtenir (batch_size X seq_length X (num_heads times d_k))
        """
        return x.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
    

    def forward(self, X_q, X_k, X_v):
        batch_size, seq_length, dim = X_q.size()

        # Après transformation, découpage en num_heads 
        Q = self.split_heads(self.W_q(X_q), batch_size)  # (bs, n_heads, q_length, dim_per_head)
        K = self.split_heads(self.W_k(X_k), batch_size)  # (bs, n_heads, k_length, dim_per_head)
        V = self.split_heads(self.W_v(X_v), batch_size)  # (bs, n_heads, v_length, dim_per_head)
        
        # Calculer les poids d'attention pour chacune des têtes
        H_cat, A = self.scaled_dot_product_attention(Q, K, V)
        
        # Remettre toutes les têtes ensemble par concaténation
        H_cat = self.group_heads(H_cat, batch_size)    # (bs, q_length, dim)
        
        # Couche linéaire finale  
        H = self.W_h(H_cat)          # (bs, q_length, dim)
        
        return H, A

### Quelques contrôles :

In [4]:
temp_mha = MultiHeadAttention(d_model=512, num_heads=8, p=0)
def print_out(Q, K, V):
    temp_out, temp_attn = temp_mha.scaled_dot_product_attention(Q, K, V)
    print('Les poids d attention sont:', temp_attn.squeeze())
    print('La sortie est:', temp_out.squeeze())

Pour vérifier que notre auto-attention fonctionne : si la requête correspond à l'une des valeurs clés, toute l'attention doit y être portée, la valeur renvoyée étant celle de cet indice

In [5]:
test_K = torch.tensor(
    [[10, 0, 0],
     [ 0,10, 0],
     [ 0, 0,10],
     [ 0, 0,10]]
).float()[None,None]

test_V = torch.tensor(
    [[   1,0,0],
     [  10,0,0],
     [ 100,5,0],
     [1000,6,0]]
).float()[None,None]

test_Q = torch.tensor(
    [[0, 10, 0]]
).float()[None,None]
print_out(test_Q, test_K, test_V)

Les poids d attention sont: tensor([3.7266e-06, 9.9999e-01, 3.7266e-06, 3.7266e-06])
La sortie est: tensor([1.0004e+01, 4.0993e-05, 0.0000e+00])


Super ! On peut voir qu'il se concentre sur la deuxième clé et renvoie la deuxième valeur. 

Si nous donnons une requête qui correspond exactement à deux clés, elle devrait retourner la valeur moyenne des deux valeurs pour ces deux clés. 

In [6]:
test_Q = torch.tensor([[0, 0, 10]]).float()  
print_out(test_Q, test_K, test_V)

Les poids d attention sont: tensor([1.8633e-06, 1.8633e-06, 5.0000e-01, 5.0000e-01])
La sortie est: tensor([549.9979,   5.5000,   0.0000])


Nous constatons qu'il se concentre à parts égales sur la troisième et la quatrième clé et renvoie la moyenne de leurs valeurs.

Il donne maintenant toutes les requêtes en même temps :

In [7]:
test_Q = torch.tensor(
    [[0, 0, 10], [0, 10, 0], [10, 10, 0]]
).float()[None,None]
print_out(test_Q, test_K, test_V)

Les poids d attention sont: tensor([[1.8633e-06, 1.8633e-06, 5.0000e-01, 5.0000e-01],
        [3.7266e-06, 9.9999e-01, 3.7266e-06, 3.7266e-06],
        [5.0000e-01, 5.0000e-01, 1.8633e-06, 1.8633e-06]])
La sortie est: tensor([[5.5000e+02, 5.5000e+00, 0.0000e+00],
        [1.0004e+01, 4.0993e-05, 0.0000e+00],
        [5.5020e+00, 2.0497e-05, 0.0000e+00]])


## Convolution 1D  avec `kernel_size = 1`

Il s'agit essentiellement d'un MLP avec une couche cachée et une activation ReLU appliquée à chaque élément de l'ensemble.

In [8]:
class CNN(nn.Module):
    def __init__(self, d_model, hidden_dim, p):
        super().__init__()
        self.k1convL1 = nn.Linear(d_model,    hidden_dim)
        self.k1convL2 = nn.Linear(hidden_dim, d_model)
        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.k1convL1(x)
        x = self.activation(x)
        x = self.k1convL2(x)
        return x

## Bloc encodeur du Transformer

Nous avons maintenant tous les composants pour le bloc encodeur du Transformer ci-dessous !!!

In [9]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, conv_hidden_dim, p=0.1):
        super().__init__()

        self.mha = MultiHeadAttention(d_model, num_heads, p)
        self.cnn = CNN(d_model, conv_hidden_dim, p)

        self.layernorm1 = nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
        self.layernorm2 = nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
    
    def forward(self, x):
        
        # Attention multi-têtes 
        attn_output, _ = self.mha(x, x, x)  # (batch_size, input_seq_len, d_model)
        
        # Couche de normalisation après avoir ajouté la connexion résiduelle 
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)
        
        # Passe avant 
        cnn_output = self.cnn(out1)  # (batch_size, input_seq_len, d_model)
        
        # Deuxième couche de normalisation après avoir ajouté la connexion résiduelle 
        out2 = self.layernorm2(out1 + cnn_output)  # (batch_size, input_seq_len, d_model)

        return out2

### Encodeur 
#### Blocks de N couches d'encodeurs + encodage positionnel + enchâssement des entrées

L'auto-attention en elle-même n'a pas de récurrence ou de convolutions, donc pour la rendre sensible à la position, nous devons fournir des codages de position supplémentaires. Ceux-ci sont calculés comme suit :

\begin{aligned}
E(p, 2i)    &= \sin(p / 10000^{2i / d}) \\
E(p, 2i+1) &= \cos(p / 10000^{2i / d})
\end{aligned}

In [10]:
def create_sinusoidal_embeddings(nb_p, dim, E):
    theta = np.array([
        [p / np.power(10000, 2 * (j // 2) / dim) for j in range(dim)]
        for p in range(nb_p)
    ])
    E[:, 0::2] = torch.FloatTensor(np.sin(theta[:, 0::2]))
    E[:, 1::2] = torch.FloatTensor(np.cos(theta[:, 1::2]))
    E.detach_()
    E.requires_grad = False
    E = E.to(device)

class Embeddings(nn.Module):
    def __init__(self, d_model, vocab_size, max_position_embeddings, p):
        super().__init__()
        self.word_embeddings = nn.Embedding(vocab_size, d_model, padding_idx=1)
        self.position_embeddings = nn.Embedding(max_position_embeddings, d_model)
        create_sinusoidal_embeddings(
            nb_p=max_position_embeddings,
            dim=d_model,
            E=self.position_embeddings.weight
        )

        self.LayerNorm = nn.LayerNorm(d_model, eps=1e-12)

    def forward(self, input_ids):
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device) # (max_seq_length)
        position_ids = position_ids.unsqueeze(0).expand_as(input_ids)                      # (bs, max_seq_length)
        
        # Obtenir l'enchâssement des mots pour chaque entrée id
        word_embeddings = self.word_embeddings(input_ids)                   # (bs, max_seq_length, dim)
        
        # Obtenir la position d'enchâssement pour chaque position
        position_embeddings = self.position_embeddings(position_ids)        # (bs, max_seq_length, dim)
        
        # Sommer les deux 
        embeddings = word_embeddings + position_embeddings  # (bs, max_seq_length, dim)
        
        # Couche de normalisation 
        embeddings = self.LayerNorm(embeddings)             # (bs, max_seq_length, dim)
        return embeddings

In [11]:
class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, ff_hidden_dim, input_vocab_size,
               maximum_position_encoding, p=0.1):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = Embeddings(d_model, input_vocab_size,maximum_position_encoding, p)

        self.enc_layers = nn.ModuleList()
        for _ in range(num_layers):
            self.enc_layers.append(EncoderLayer(d_model, num_heads, ff_hidden_dim, p))
        
    def forward(self, x):
        x = self.embedding(x) # Transforme en (batch_size, input_seq_length, d_model)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x)

        return x  # (batch_size, input_seq_len, d_model)

In [12]:
import torchtext.data as data
import torchtext.datasets as datasets

In [13]:
max_len = 200
text = data.Field(sequential=True, fix_length=max_len, batch_first=True, lower=True, dtype=torch.long)
label = data.LabelField(sequential=False, dtype=torch.long)
datasets.IMDB.download('./')
ds_train, ds_test = datasets.IMDB.splits(text, label, path='./imdb/aclImdb/')
print('train : ', len(ds_train))
print('test : ', len(ds_test))
print('train.fields :', ds_train.fields)

train :  25000
test :  25000
train.fields : {'text': <torchtext.data.field.Field object at 0x0000020960520A90>, 'label': <torchtext.data.field.LabelField object at 0x0000020960520AC8>}


In [14]:
ds_train, ds_valid = ds_train.split(0.9)
print('train : ', len(ds_train))
print('valid : ', len(ds_valid))
print('test : ', len(ds_test))

train :  22500
valid :  2500
test :  25000


In [15]:
num_words = 50_000
text.build_vocab(ds_train, max_size=num_words, specials=['<pad>','<unk>'])
label.build_vocab(ds_train)
vocab = text.vocab

In [16]:
batch_size = 164
train_loader, valid_loader, test_loader = data.BucketIterator.splits(
    (ds_train, ds_valid, ds_test), batch_size=batch_size, sort_key=lambda x: len(x.text), repeat=False)

In [17]:
class TransformerClassifier(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, conv_hidden_dim, input_vocab_size, num_answers):
        super().__init__()
        
        self.encoder = Encoder(num_layers, d_model, num_heads, conv_hidden_dim, input_vocab_size,
                         maximum_position_encoding=10000)
        self.dense = nn.Linear(d_model, num_answers)

    def forward(self, x):
        x = self.encoder(x)
        
        x, _ = torch.max(x, dim=1)
        x = self.dense(x)
        return x

In [18]:
model = TransformerClassifier(num_layers=1, d_model=32, num_heads=2, 
                         conv_hidden_dim=128, input_vocab_size=50002, num_answers=2)
model.to(device)

TransformerClassifier(
  (encoder): Encoder(
    (embedding): Embeddings(
      (word_embeddings): Embedding(50002, 32, padding_idx=1)
      (position_embeddings): Embedding(10000, 32)
      (LayerNorm): LayerNorm((32,), eps=1e-12, elementwise_affine=True)
    )
    (enc_layers): ModuleList(
      (0): EncoderLayer(
        (mha): MultiHeadAttention(
          (W_q): Linear(in_features=32, out_features=32, bias=False)
          (W_k): Linear(in_features=32, out_features=32, bias=False)
          (W_v): Linear(in_features=32, out_features=32, bias=False)
          (W_h): Linear(in_features=32, out_features=32, bias=True)
        )
        (cnn): CNN(
          (k1convL1): Linear(in_features=32, out_features=128, bias=True)
          (k1convL2): Linear(in_features=128, out_features=32, bias=True)
          (activation): ReLU()
        )
        (layernorm1): LayerNorm((32,), eps=1e-06, elementwise_affine=True)
        (layernorm2): LayerNorm((32,), eps=1e-06, elementwise_affine=True)
   

In [19]:
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
epochs = 10
t_total = len(train_loader) * epochs

In [20]:
def train(train_loader, valid_loader):
    
    for epoch in range(epochs):
        train_iterator, valid_iterator = iter(train_loader), iter(valid_loader)
        nb_batches_train = len(train_loader)
        train_acc = 0
        model.train()
        losses = 0.0

        for batch in train_iterator:
            x = batch.text.to(device)
            y = batch.label.to(device)
            
            out = model(x)  # ①

            loss = f.cross_entropy(out, y)  # ②
            
            model.zero_grad()  # ③

            loss.backward()  # ④
            losses += loss.item()

            optimizer.step()  # ⑤
                        
            train_acc += (out.argmax(1) == y).cpu().numpy().mean()
        
        print(f"Training loss at epoch {epoch} is {losses / nb_batches_train}")
        print(f"Training accuracy: {train_acc / nb_batches_train}")
        print('Evaluating on validation:')
        evaluate(valid_loader)

In [21]:
def evaluate(data_loader):
    data_iterator = iter(data_loader)
    nb_batches = len(data_loader)
    model.eval()
    acc = 0 
    for batch in data_iterator:
        x = batch.text.to(device)
        y = batch.label.to(device)
                
        out = model(x)
        acc += (out.argmax(1) == y).cpu().numpy().mean()

    print(f"Eval accuracy: {acc / nb_batches}")

In [22]:
# L'éxécutution de cette cellule peut prendre du temps sur un CPU
train(train_loader, valid_loader)

Training loss at epoch 0 is 0.6654672907746356
Training accuracy: 0.59256804524567
Evaluating on validation:
Eval accuracy: 0.6500762195121951
Training loss at epoch 1 is 0.5884601322637089
Training accuracy: 0.6856994521032167
Evaluating on validation:
Eval accuracy: 0.7198170731707317
Training loss at epoch 2 is 0.5068715074356052
Training accuracy: 0.7531261046306116
Evaluating on validation:
Eval accuracy: 0.7416539634146341
Training loss at epoch 3 is 0.42528726894786395
Training accuracy: 0.8040827147401908
Evaluating on validation:
Eval accuracy: 0.7926448170731708
Training loss at epoch 4 is 0.3583308711000111
Training accuracy: 0.8450092788971366
Evaluating on validation:
Eval accuracy: 0.8034679878048779
Training loss at epoch 5 is 0.3021336953709091
Training accuracy: 0.8747127960410039
Evaluating on validation:
Eval accuracy: 0.8129192073170731
Training loss at epoch 6 is 0.2558224143973295
Training accuracy: 0.8986998497702369
Evaluating on validation:
Eval accuracy: 0.821

In [23]:
evaluate(test_loader)

Eval accuracy: 0.8077541314628836
