# **Transformer Architecture**

### **Encoder-Decoder Architecture**

It is used for **sequence-to-sequence** tasks like machine translation.

This is the architecture proposed by the original transformer paper [Attention is All You Need](https://arxiv.org/abs/1706.03762) by Vaswani et al. in 2017.

<img src="assets/encoder_decoder.png" alt="Encoder-Decoder Architecture" style="background-color:white;" height="500" />

### **Decoder-Only Architecture**

It is used for **sequence generation** tasks like language modeling.

This simpler architecture will be implemented in this notebook from scratch.

<img src="assets/decoder_only.png" alt="Decoder-Only Architecture" style="background-color:white;" height="500" />

In [None]:
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

torch.manual_seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cuda


## **Scaled Dot-Product Attention**

Let $Q \in \mathbb{R}^{m \times d_k}$, $K \in \mathbb{R}^{n \times d_k}$ y $V \in \mathbb{R}^{n \times d_v}$ be the query, key and value matrices, respectively. The scaled dot-product attention is defined as:

$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V \in \mathbb{R}^{m \times d_v}
$$

<img src="assets/self_attention.png" alt="Decoder-Only Architecture" style="background-color:white;" height="500" />

In [None]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self,
                 sequenceLength: int,
                 embeddingSize: int,
                 headSize: int,
                 dropout: float) -> None:
        super().__init__()
        self.key = nn.Linear(embeddingSize, headSize, bias=False)
        self.query = nn.Linear(embeddingSize, headSize, bias=False)
        self.value = nn.Linear(embeddingSize, headSize, bias=False)
        self.register_buffer("tril", torch.tril(torch.ones(sequenceLength, sequenceLength)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        batchSize, sequenceLength, embeddingSize = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)
        weights = q @ k.transpose(-2, -1) / (embeddingSize ** 0.5)
        weights = weights.masked_fill(self.tril[:sequenceLength, :sequenceLength] == 0, float("-inf"))
        weights = F.softmax(weights, dim=-1)
        weights = self.dropout(weights)
        x = weights @ v
        return x

## **Multi-Head Attention**

Let $h$ be the number of heads. The multi-head attention is defined as:

$$
\text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \ldots, \text{head}_h)W^O \in \mathbb{R}^{m \times d_v}
$$

where $\text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V)$ and $W_i^Q \in \mathbb{R}^{d_k \times d_k}$, $W_i^K \in \mathbb{R}^{d_k \times d_k}$, $W_i^V \in \mathbb{R}^{d_v \times d_v}$ and $W^O \in \mathbb{R}^{hd_v \times d_v}$ are the learnable parameters.

<img src="assets/multi_head_attention.png" alt="Decoder-Only Architecture" style="background-color:white;" height="500" />

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self,
                 nHeads: int,
                 sequenceLength: int,
                 embeddingSize: int,
                 dropout: float) -> None:
        super().__init__()
        self.heads = nn.ModuleList([ScaledDotProductAttention(sequenceLength,
                                                              embeddingSize,
                                                              embeddingSize // nHeads,
                                                              dropout) for _ in range(nHeads)])
        self.projection = nn.Linear(embeddingSize, embeddingSize)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = torch.cat([head(x) for head in self.heads], dim=-1)
        x = self.projection(x)
        return x

## **Position-wise Feed-Forward Networks**

The position-wise feed-forward networks are defined as:

$$
\text{FFN}(x) = \text{ReLU}(xW_1 + b_1)W_2 + b_2
$$

where $W_1 \in \mathbb{R}^{d_{ff} \times d_{model}}$, $b_1 \in \mathbb{R}^{d_{ff}}$, $W_2 \in \mathbb{R}^{d_{model} \times d_{ff}}$ and $b_2 \in \mathbb{R}^{d_{model}}$ are the learnable parameters.

In [None]:
class FeedForward(nn.Module):
    def __init__(self,
                 embeddingSize: int,
                 dropout: float) -> None:
        super().__init__()
        self.feedForward = nn.Sequential(
            nn.Linear(embeddingSize, 4 * embeddingSize),
            nn.ReLU(),
            nn.Linear(4 * embeddingSize, embeddingSize),
            nn.Dropout(dropout)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.feedForward(x)

## **Layer Normalization**

The layer normalization is defined as:

$$
\text{LayerNorm}(x) = \gamma \odot \frac{x - \mu}{\sigma} + \beta
$$

where $\gamma \in \mathbb{R}^{d_{model}}$ and $\beta \in \mathbb{R}^{d_{model}}$ are the learnable parameters, and $\mu$ and $\sigma$ are the mean and standard deviation of $x$, respectively.

In the original paper, the layer normalization is applied after the residual connections, whereas in more modern implementations, it is applied before in what is known as **pre-norm formulation**.

```python
# Original paper
x = LayerNorm(x + MultiHeadAttention(x))
x = LayerNorm(x + FeedForward(x))

# Pre-norm formulation
x = x + MultiHeadAttention(LayerNorm(x))
x = x + FeedForward(LayerNorm(x))
```

In [None]:
class Layer(nn.Module):
    def __init__(self,
                 nHeads: int,
                 sequenceLength: int,
                 embeddingSize: int,
                 dropout: float) -> None:
        super().__init__()
        self.multiHeadAttention = MultiHeadAttention(nHeads,
                                                     sequenceLength,
                                                     embeddingSize,
                                                     dropout)
        self.feedForward = FeedForward(embeddingSize, dropout)
        self.layerNorm1 = nn.LayerNorm(embeddingSize)
        self.layerNorm2 = nn.LayerNorm(embeddingSize)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.multiHeadAttention(self.layerNorm1(x))
        x = x + self.feedForward(self.layerNorm2(x))
        return x

## **Language Model**

In [None]:
class GPTLanguageModel(nn.Module):
    def __init__ (self,
                  vocabularySize: int,
                  nLayers: int,
                  nHeads: int,
                  sequenceLength:int,
                  embeddingSize: int,
                  dropout: float) -> None:
        super().__init__()
        self.tokenEmbeddingTable = nn.Embedding(vocabularySize, embeddingSize)
        self.positionEmbeddingTable = nn.Embedding(sequenceLength, embeddingSize)
        self.layers = nn.Sequential(*[Layer(nHeads,
                                            sequenceLength,
                                            embeddingSize,
                                            dropout) for _ in range(nLayers)])
        self.layerNorm = nn.LayerNorm(embeddingSize)
        self.linearModelHead = nn.Linear(embeddingSize, vocabularySize)
        self.sequenceLength = sequenceLength
        self.apply(self.initWeights)

    def initWeights(self, module: nn.Module) -> None:
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, x: torch.Tensor, targets: torch.Tensor = None) -> tuple[torch.Tensor, torch.Tensor]:
        batchSize, sequenceLength = x.shape
        tokenEmbeddings = self.tokenEmbeddingTable(x)
        positionEmbeddings = self.positionEmbeddingTable(torch.arange(sequenceLength, device=device))
        x = tokenEmbeddings + positionEmbeddings
        x = self.layers(x)
        x = self.layerNorm(x)
        logits = self.linearModelHead(x)
        if targets is None:
            loss = None
        else:
            batchSize, sequenceLength, embeddingSize = logits.shape
            logits = logits.view(batchSize * sequenceLength, embeddingSize)
            targets = targets.view(batchSize * sequenceLength)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, x: torch.Tensor, nTokens: int) -> torch.Tensor:
        self.eval()
        with torch.no_grad():
            for _ in range(nTokens):
                logits, loss = self(x[:, -self.sequenceLength:])
                probabilities = F.softmax(logits[:, -1, :], dim=-1)
                nextToken = torch.multinomial(probabilities, num_samples=1)
                x = torch.cat([x, nextToken], dim=1)
        self.train()
        return x

## **Text Generation**

In [None]:
dataPath = Path("data") / "martin_fierro.txt" # "shakespeare.txt"
with open(dataPath, "r") as f:
    text = f.read()
print(f"Text length: {len(text)}")
print(text[:250])

Text length: 187095
I

Aquí me pongo a cantar
al compás de la vigüela,
que el hombre que lo desvela
una pena estrordinaria,
como la ave solitaria
con el cantar se consuela.

Pido a los santos del cielo
que ayuden mi pensamiento:
les pido en este momento
que voy a cantar


In [None]:
vocabulary = sorted(set(text))
vocabularySize = len(vocabulary)
print(f"Vocabulary size: {vocabularySize}")
print("".join(vocabulary))

Vocabulary size: 72

 !"'(),-.:;?ABCDEFGHIJLMNOPQRSTUVXYZabcdefghijklmnopqrstuvxyz¡¿Ñáéíñóúü


In [None]:
stringToToken = {ch: i for i, ch in enumerate(vocabulary)}
tokenToString = {i: ch for i, ch in enumerate(vocabulary)}
encode = lambda string: [stringToToken[char] for char in string]
decode = lambda tokens: "".join([tokenToString[token] for token in tokens])
print(encode("Los hermanos sean unidos"))
print(decode(encode("Los hermanos sean unidos")))

[23, 51, 55, 1, 44, 41, 54, 49, 37, 50, 51, 55, 1, 55, 41, 37, 50, 1, 57, 50, 45, 40, 51, 55]
Los hermanos sean unidos


In [None]:
data = torch.tensor(encode(text), dtype=torch.long)
print(f"Data shape: {data.shape}")
print(f"Data type: {data.dtype}")
print(data[:100])

Data shape: torch.Size([187095])
Data type: torch.int64
tensor([21,  0,  0, 13, 53, 57, 67,  1, 49, 41,  1, 52, 51, 50, 43, 51,  1, 37,
         1, 39, 37, 50, 56, 37, 54,  0, 37, 48,  1, 39, 51, 49, 52, 65, 55,  1,
        40, 41,  1, 48, 37,  1, 58, 45, 43, 71, 41, 48, 37,  7,  0, 53, 57, 41,
         1, 41, 48,  1, 44, 51, 49, 38, 54, 41,  1, 53, 57, 41,  1, 48, 51,  1,
        40, 41, 55, 58, 41, 48, 37,  0, 57, 50, 37,  1, 52, 41, 50, 37,  1, 41,
        55, 56, 54, 51, 54, 40, 45, 50, 37, 54])


In [None]:
trainValSplit = 0.8
trainSize = int(len(data) * trainValSplit)
trainData = data[:trainSize]
valData = data[trainSize:]
print(f"Train data shape: {trainData.shape}")
print(f"Validation data shape: {valData.shape}")

Train data shape: torch.Size([149676])
Validation data shape: torch.Size([37419])


In [None]:
def getBatch(data: torch.Tensor,
             batchSize: int,
             sequenceLength: int) -> tuple[torch.Tensor, torch.Tensor]:
    ix = torch.randint(0, data.size(0) - sequenceLength, (batchSize,))
    x = torch.stack([data[i:i+sequenceLength] for i in ix])
    y = torch.stack([data[i+1:i+sequenceLength+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y


batchSize = 4
sequenceLength = 8
xBatch, yBatch = getBatch(trainData, batchSize, sequenceLength)
print(f"Context batch shape: {xBatch.shape}")
print(xBatch)
print("Target batch shape:", yBatch.shape)
print(yBatch)
for batch in range(batchSize):
    for token in range(sequenceLength):
        context = xBatch[batch, :token+1].tolist()
        target = yBatch[batch, token].item()
        print(f"Context: {context} -> Target: {target}")

Context batch shape: torch.Size([4, 8])
tensor([[57, 50, 51,  1, 55, 41,  1, 40],
        [17, 55,  1, 40, 41,  1, 37, 48],
        [55, 37, 54, 50, 51, 55, 51,  0],
        [48, 53, 57, 45, 54, 45, 40, 51]], device='cuda:0')
Target batch shape: torch.Size([4, 8])
tensor([[50, 51,  1, 55, 41,  1, 40, 37],
        [55,  1, 40, 41,  1, 37, 48, 49],
        [37, 54, 50, 51, 55, 51,  0, 60],
        [53, 57, 45, 54, 45, 40, 51,  9]], device='cuda:0')
Context: [57] -> Target: 50
Context: [57, 50] -> Target: 51
Context: [57, 50, 51] -> Target: 1
Context: [57, 50, 51, 1] -> Target: 55
Context: [57, 50, 51, 1, 55] -> Target: 41
Context: [57, 50, 51, 1, 55, 41] -> Target: 1
Context: [57, 50, 51, 1, 55, 41, 1] -> Target: 40
Context: [57, 50, 51, 1, 55, 41, 1, 40] -> Target: 37
Context: [17] -> Target: 55
Context: [17, 55] -> Target: 1
Context: [17, 55, 1] -> Target: 40
Context: [17, 55, 1, 40] -> Target: 41
Context: [17, 55, 1, 40, 41] -> Target: 1
Context: [17, 55, 1, 40, 41, 1] -> Target: 37
C

In [None]:
nLayers = 6
nHeads = 6
sequenceLength = 256
embeddingSize = 384
dropout = 0.2
model = GPTLanguageModel(vocabularySize,
                         nLayers,
                         nHeads,
                         sequenceLength,
                         embeddingSize,
                         dropout).to(device)
print(f"{sum(p.numel() for p in model.parameters())/1e6:.2f}M parameters")

10.79M parameters


In [None]:
@torch.no_grad()
def estimateLoss(model: nn.Module,
                 data: torch.Tensor,
                 evalIter: int,
                 batchSize: int,
                 sequenceLength: int) -> float:
    model.eval()
    losses = torch.zeros(evalIter)
    for i in range(evalIter):
        xBatch, yBatch = getBatch(data, batchSize, sequenceLength)
        logits, loss = model(xBatch, yBatch)
        losses[i] = loss.item()
    model.train()
    return losses.mean().item()


modelPath = Path("models") / "martin_fierro.pt" # "shakespeare.pt"

# Load trained model
# model.load_state_dict(torch.load(modelPath), weights_only=False)

# Train model from scratch
batchSize = 64
learningRate = 3e-4
maxIter = 800
evalInterval = 100
evalIter = 100
optimizer = optim.Adam(model.parameters(), lr=learningRate)
for iter in range(maxIter + 1):
    if iter % evalInterval == 0 or iter == maxIter - 1:
        trainLoss = estimateLoss(model,
                                 trainData,
                                 evalIter,
                                 batchSize,
                                 sequenceLength)
        valLoss = estimateLoss(model, valData, evalIter, batchSize, sequenceLength)
        print(f"Iter: {iter}, Train loss: {trainLoss}, Val loss: {valLoss}")
    xBatch, yBatch = getBatch(trainData, batchSize, sequenceLength)
    logits, loss = model(xBatch, yBatch)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
torch.save(model.state_dict(), modelPath)

Iter: 0, Train loss: 4.339381217956543, Val loss: 4.339980602264404
Iter: 100, Train loss: 2.331040620803833, Val loss: 2.331016778945923
Iter: 200, Train loss: 2.2477424144744873, Val loss: 2.255774974822998
Iter: 300, Train loss: 2.1533539295196533, Val loss: 2.1672592163085938
Iter: 400, Train loss: 2.0055882930755615, Val loss: 2.0298171043395996
Iter: 500, Train loss: 1.8037731647491455, Val loss: 1.864970088005066
Iter: 600, Train loss: 1.6102136373519897, Val loss: 1.723291277885437
Iter: 700, Train loss: 1.4209532737731934, Val loss: 1.624464988708496
Iter: 799, Train loss: 1.2423194646835327, Val loss: 1.5971603393554688
Iter: 800, Train loss: 1.2341572046279907, Val loss: 1.5955504179000854


In [None]:
context = "Los hermanos sean unidos"
context = torch.tensor(encode(context), dtype=torch.long).unsqueeze(0).to(device)
generatedText = decode(model.generate(context, nTokens=10000)[0].tolist())
print(generatedText)
with open("martin_fierro_output.txt", "w") as f:
    f.write(generatedText)

Los hermanos sean unidos.

Los astimos las juridos
que enel roron sustumbures;
y en caminias en otan risa,
debe aqueles pampas mi foro,
cuando, da el encaque en supal
eran de boliadas lo sol.

Cuanto el desapulto
nos cretestaban las campas
viendo a al indio y levanto;
pero a un hombre tenía
como al tenerro campañó
si lo puse lo ec"
levaran letó el trastoún yito.
yo ya me hale alma suelo viles
con las los yos, copletos
y sin perros y hablaridos
y indao las infiecias.

Con laste cuitó con las tílas
habías é rastó los de día
émpre una: venos ¡Para!
que el trabios que había
con el arrogallé seguillo!
Y Na tía estaba me rancal
era ganaba las canas arganas
de un punto cantón
es la vistas sen traillan
de uno que al nochina
en al indio a entre asiona.

Una prece esa comentenda
como la enperranzan
pero pan en dar estrasquiandas
pue.Ña..
no hizos tras en las pare¿
Y algunas de hombre al áen potro;
me he dicé que afligarse el entreo
en medio del cancelanto;

que he dé por cuitabullao
en monitaré 