# Atencja oparta o iloczyn skalarny i architektura Transformer

##Atencja oparta o iloczyn skalarny

**Atencja oparta o iloczyn skalarny** (*dot-product attention*) jest mechanizmem atencji stosowanym w architekturze Transformer.
Mechanizm atencji pozwala przekształcić każdy wektor reprezentacji (osadzenie) biorąc pod uwagę szerszy kontekst, czyli wszystkie pozostałe wektory w sekwencji.
W przypadku **auto-atencji** (*auto-attention*) brane są pod uwagę wektory w tej samej sekwencji. W przypadku **atencji krzyżowej** (*cross-attention*) kontekst stanowi inna sekwencja.
W zastosowaniach związanych z przetwarzaniem tekstów w języku naturalnym umożliwia to tworzenie kontekstowych reprezentacji słów/tokenów biorących pod uwagę kontekst w jakim występują.

Załóżmy, że mamy sekwencję $n$ wektorów, na przykład wektorowych reprezentacji słów/tokenów, $x_1, \ldots, x_n$, gdzie $x_i \in \mathbb{R}^{d}$.

**W mechanizmie auto-atencji opartej o iloczyn skalarny** dla każdego wektora $x_i$ wyznaczane są wartości:
*   **zapytania** $q_i = x_i W_q$
*   **klucza** $k_i = x_i W_k$
*   **wartości** $v_i = x_i W_v$

$W_q, W_k, W_v \in \mathbb{R}^{d \times d}$ są macierzami projekcji. Macierze projekcji są parametrami (wagami) warstwy atencji - są inicjalizowane losowo i optymalizowane w procesie uczenia modelu.

Wyznaczanie zapytania, klucza i wartości dla każdego elementu sekwencji możemy zapisać w formie macierzowej.
Niech $X \in \mathbb{R}^{n \times d}$ będzie macierzą złożoną z wejściowych wektorów:
$$
X = \left(
\begin{align}
x_1 \\
\vdots \\
x_n \\
\end{align}
\right)
$$
**Wówczas macierze złożone z wektorów zapytań, kluczy i wartości** wyznaczamy jako:
$$
Q = X W_q \\
K = X W_k \\
V = X W_v
$$

**Atencja oparta o iloczyn skalarny** zdefiniowana jest wzorem:
$$
\textrm{Attention}(Q, K, V) = \textrm{softmax} \left( \frac{QK^T}{\sqrt d} \right) V
$$
Wynik $\textrm{Attention}(Q, K, V)$ jest macierzą identycznego rozmiaru jak wejściowa macierz $X$ i zawiera zaktualizowane wartości wektorów z wejściowej sekwencji z uwzględnieniem kontekstu (wartości wszystkich pozostałych wektorów w sekwencji).

Wyznaczanie wyniku atencji składa się następujących kroków:
*  Krok 1: Wyznaczenie **współczynników atencji** (*attention scores*) mierzących podobieństwo między wektorami zapytań ($Q$) i kluczy ($K$) jako skalowanego iloczynu skalarnego $\frac{QK^T}{\sqrt d_k}$. Współczynnik skalujący $\frac{1}{\sqrt d_k}$ ogranicza wartości będące argumentami funkcji softmax. $d$ oznacza rozmiar wektora klucza czyli liczbę kolumn macierzy $K$.
*  Krok 2: Zastosowanie funkcji softmax aby wyznaczyć macierz **wag atencji** której wiersze sumują się do jedności.
*  Krok 3: Wyznaczenie wynikowych wartości, jako sum wektorów wartości ($V$) ważonych wagami atencji.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch import Tensor
import torch.nn.functional as F

In [None]:
torch.manual_seed(44)

seq_length = 4
d = 6

# Utwórz losowy tensor wejściowy zawierający sekwencję seq_length=4 złożoną z wektorów rozmiaru d=6
print("Macierz X")
X = torch.rand(seq_length, d)
print(X)
print(f"{X.shape=}\n")

# Do celów poglądowych utworzymy losowe macierze projekcji W_q, W_k, W_v zainicjalizowne rozkładem normalnym
W_q = torch.randn(d, d)
W_k = torch.randn(d, d)
W_v = torch.randn(d, d)

# Wyznacz macierze Q, K, V
Q = torch.matmul(X, W_q)
K = torch.matmul(X, W_k)
V = torch.matmul(X, W_v)

print(f"{Q.shape=}")
print(f"{K.shape=}")
print(f"{V.shape=}")

**Krok 1:** Wyznacz współczynniki atencji

In [None]:
d = Q.size(-1)
att_scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d))

In [None]:
print(f"Współczynniki atencji")
print(att_scores)

print(f"Suma elementów w wierszach: {att_scores.sum(dim=-1)}")
print(f"{att_scores.shape=}")

# Sprawdzenie poprawności wyznaczenia współczynników atencji
assert att_scores.shape == (seq_length, seq_length)
assert torch.isclose(att_scores[0,0], torch.tensor(2.1263), atol=1e-04)
assert torch.isclose(att_scores[1,2], torch.tensor(1.1623), atol=1e-04)

**Krok 2**: Zastosuj softmax aby wyznaczyć macierz wag atencji której wiersze sumują się do jedności.

In [None]:
att_weights = F.softmax(att_scores, dim=-1)

In [None]:
print(f"Wagi atencji")
print(att_weights)

print(f"Suma elementów w wierszach: {att_weights.sum(dim=-1)}")
print(f"{att_weights.shape=}")

# Sprawdzenie poprawności wyznaczenia współczynników atencji
assert att_weights.shape == (seq_length, seq_length)
assert torch.allclose(att_weights.sum(dim=-1), torch.ones_like(att_weights.sum(dim=-1)))

**Krok 3**:

Wyznaczenie wynikowych wartości, jako sum wektorów wartości (V) ważonych wagami atencji.

In [None]:
Z = torch.matmul(att_weights, V)

In [None]:
print(f"Wejściowa macierz X")
print(X)
print(f"\nMacierz wartości V")
print(V)
print(f"\nWynikowa macierz Z")
print(Z)

# Sprawdzenie poprawności wyznaczenia współczynników atencji
assert Z.shape == X.shape
assert torch.isclose(Z[0,0], torch.tensor(0.1211), atol=1e-04)
assert torch.isclose(Z[1,2], torch.tensor(1.5165), atol=1e-04)

Połącz napisane wcześniej fragmenty kodu w jedną funkcję `dot_product_attention` wyznaczającą wartości atencji opartej o iloczyn skalarny.
*   Na wejściu funkcja otrzyma zapytania, klucze i wartości jako tensory `Q`, `K`, `V`
*   Zwróci parę tensorów: wynikową wartość oraz wagi atencji

**Uwaga**: Funkcja powinna operować zarówno na dwuwymiarowych tensorach (macierzach) `(seq_len, d)` oraz trójwymiarowych tensorach zawierających jako pierwszy wymiar wsadu `(batch_size, seq_len, d)`. Aby funkcja poprawnie działała na macierzach o trzech wymiarach, wykorzystaj metodę `transpose` do transpozycji dwóch ostatnich wymiarów tensora `K`.

In [None]:
def dot_product_attention(Q: Tensor, K: Tensor, V: Tensor) -> tuple[Tensor, Tensor]:
    d = Q.size(-1)
    scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d, dtype=torch.float32))
    attention_weights = F.softmax(scores, dim=-1)
    output = torch.matmul(attention_weights, V)
    return output, attention_weights

In [None]:
# Spradzenie działania funkcji dot_product_attention

batch_size = 2
seq_length = 4
d = 6

# Sprawdzenie działania dla dwuywymiarowego tensora
X = torch.rand(seq_length, d)
Q = torch.matmul(X, W_q)
K = torch.matmul(X, W_k)
V = torch.matmul(X, W_v)

output, attention_weights = dot_product_attention(Q, K, V)
print(f"{output.shape=}")
print(f"{attention_weights.shape=}")

assert output.shape == X.shape
assert attention_weights.shape == (seq_length, seq_length)

In [None]:
# Sprawdzenie działania dla trójwymiarowego tensora
X = torch.rand(batch_size, seq_length, d)
Q = torch.matmul(X, W_q)
K = torch.matmul(X, W_k)
V = torch.matmul(X, W_v)

output, attention_weights = dot_product_attention(Q, K, V)
print(f"{output.shape=}")
print(f"{attention_weights.shape=}")

assert output.shape == X.shape
assert attention_weights.shape == (batch_size, seq_length, seq_length)

##Implementacja uproszczonej architektury dwukierunkowego Transformera

W dalszej części notatnika zaimplementujemy uproszczoną architekturę dwukierunkowego Transformera i zastosujemy do zadania klasyfikacji elementów wejściowej sekwencji.

**Dwukierunkowy Transformer**, inaczej **tylko-koder**, wyznaczając wartość atencji dla elementu sekwencji bierze pod uwagę wszystkie elementy z sekwencji, zarówne te poprzedzające jak i następujące później. Jest stosowany w modelach językowych klasy tylko-koder, takich jak BERT.
W odróżnieniu od pełnej architektury Transformera nasz model będzie wykorzystywał tylko jedną głowicę atencji.

**Jednokierunkowy Transformer**, inaczej **tylko-dekoder**,
wyznaczając wartość atencji dla elemetu sekwencji bierze pod uwagę tylko elementy występujące nie później w sekwencji.
Podczas wyznaczania wartości atencji stosowana jest maska atencji przyczynowej (*causal attention mask*), która dla każdego elementu w sekwencji maskuje dostęp do elementów po nim następujących.
Są stosowane w generatywnych modelach językowych takich, jak GPT.


Klasa `SingleHeadAttention` implementuje jednogłowicową warstwę auto-atencji bez maskowania.

In [None]:
class SingleHeadAttention(nn.Module):
    def __init__(self, d_model: int):
        super().__init__()

        # Macierze projekcji zaimplementujemy jako warstwa liniowa bez wektora obciążenia (bias)
        self.Q_w = nn.Linear(d_model, d_model, bias=False)
        self.K_w = nn.Linear(d_model, d_model, bias=False)
        self.V_w = nn.Linear(d_model, d_model, bias=False)

    def forward(self, X: Tensor) -> tuple[Tensor, Tensor]:
        Q = self.Q_w(X)
        K = self.K_w(X)
        V = self.V_w(X)

        values, attention = dot_product_attention(Q, K, V)
        return values, attention

In [None]:
att = SingleHeadAttention(d_model=d)

values, attention = att(X)
print(values.shape)
print(attention.shape)

Implementacja pojedynczej warstwy (pojedycznego bloku) kodera Transformer.

In [None]:
class TransformerEncoderLayer(nn.Module):

    def __init__(self, d_model: int, dim_feedforward: int):
        """
        Inputs:
            d_model - Dimensionality of the input
            dim_feedforward - Dimensionality of the hidden layer in the MLP
        """
        super().__init__()

        self.self_attn = SingleHeadAttention(d_model)

        self.linear_net = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(inplace=True),
            nn.Linear(dim_feedforward, d_model)
        )

        # Layers to apply in between the main layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        # Attention part
        attn_out, _ = self.self_attn(x)

        # Połączenie rezydualne
        x = x + attn_out
        x = self.norm1(x)

        # MLP part
        linear_out = self.linear_net(x)
        x = x + linear_out
        x = self.norm2(x)

        return x

In [None]:
encoder = TransformerEncoderLayer(d_model=d, dim_feedforward=2*d)
print(encoder)

Sprawdzenie działania warstwy kodera Transformer. Na wejściu podajemy tensor o rozmiarach `(batch_size, seq_length, d)` - złożony z `batch_size` sekwencji o `seq_length` elementach/wektorach rozmiaru `d` każdy.
Zauważmy, że na wyjściu otrzymujemy tensor o identycznym kształcie

In [None]:
x = torch.rand(batch_size, seq_length, d)
print(f"{x.shape=}")

y = encoder(x)
print(f"{y.shape=}")

Implementacja zestawu `num_layers` sekwencyjnie połączonych  warstw kodera Transformer.

In [None]:
class TransformerEncoder(nn.Module):

    def __init__(self, num_layers: int, d_model: int, dim_feedforward: int):
        super().__init__()
        self.layers = nn.ModuleList([TransformerEncoderLayer(d_model, dim_feedforward) for _ in range(num_layers)])

    def forward(self, x):
        for l in self.layers:
            x = l(x)
        return x

    def get_attention_maps(self, x):
        attention_maps = []
        for l in self.layers:
            _, attn_map = l.self_attn(x)
            attention_maps.append(attn_map)
            x = l(x)
        return attention_maps

In [None]:
encoder = TransformerEncoder(num_layers=2, d_model=d, dim_feedforward=2048)
print(encoder)

In [None]:
x = torch.rand(batch_size, seq_length, d)
print(f"{x.shape=}")

y = encoder(x)
print(f"{y.shape=}")

Dodatkowe elementy - kodowanie pozycyjne

In [None]:
import math


class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_len=5000):
        """
        Inputs
            d_model - Hidden dimensionality of the input.
            max_len - Maximum length of a sequence to expect.
        """
        super().__init__()

        # Create matrix of [SeqLen, HiddenDim] representing the positional encoding for max_len inputs
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)

        # register_buffer => Tensor which is not a parameter, but should be part of the modules state.
        # Used for tensors that need to be on the same device as the module.
        # persistent=False tells PyTorch to not add the buffer to the state dict (e.g. when we save the model)
        self.register_buffer('pe', pe, persistent=False)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

Kompletna architektura uproszczonego Transformera klasy tylko-koder. Model zostanie wykorzystany do klasyfikacji każdego elementu z sekwencji wejściowej. W tym celu zdefiniujemy głowicę klasyfikacyjną (klasyfikator liniowy) `self.classification = nn.Linear(d_model, num_classes)`. Głowica klasyfikacyjna będzie estymowała rozkład prawdopodobieństwa klas dla każdego wektora w sekwencji wejściowej na podstawie ich kontekstowych reprezentacji.



In [None]:
class MyTransformer(nn.Module):
    def __init__(self, vocab_size: int, d_model: int, num_classes: int, num_layers: int):
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer = TransformerEncoder(num_layers, d_model, 2*d_model)
        self.classification = nn.Linear(d_model, num_classes)

    def forward(self, x):
        x = self.embeddings(x)
        x = self.positional_encoding(x)
        x = self.transformer(x)

        assert x.dim() == 3
        # Jako reprezentację całej sekwencji bierzemy uśrednione wektory reprezentacji
        # x = x.mean(dim=1)
        x = self.classification(x)
        return x

    def get_attention_maps(self, x):
        """
        Function for extracting the attention matrices of the whole Transformer for a single batch.
        Input arguments same as the forward pass.
        """
        with torch.no_grad():
            x = self.embeddings(x)
            x = self.positional_encoding(x)
            attention_maps = self.transformer.get_attention_maps(x)
        return attention_maps

In [None]:
model = MyTransformer(vocab_size=10, d_model=32, num_classes=10, num_layers=1)
print(model)

Do wytrenowania modelu wykorzystamy bibliotekę PyTorch Lightning.

In [None]:
!pip install -q lightning
!pip install -q torchmetrics

In [None]:
import lightning as L
import torchmetrics
import torch.optim as optim


class LitNet(L.LightningModule):
    def __init__(self, classifier: nn.Module):
        super().__init__()
        self.classifier = classifier
        self.criterion = nn.CrossEntropyLoss()

        # TODO: Change hard coded number of classes
        self.metric_train_acc = torchmetrics.classification.Accuracy(task="multiclass", num_classes=10)
        self.metric_val_acc = torchmetrics.classification.Accuracy(task="multiclass", num_classes=10)
        self.metric_test_acc = torchmetrics.classification.Accuracy(task="multiclass", num_classes=10)

    def training_step(self, batch, batch_idx):
        # training_step implementuje jeden krok pętli treningowej
        x, target = batch
        logits = self.classifier(x)

        logits = logits.view(-1, logits.shape[-1])
        target = target.view(-1)

        loss = self.criterion(logits, target)
        self.log("train/loss", loss, prog_bar=True)

        _, preds = torch.max(logits, dim=1)
        self.metric_train_acc(preds, target)
        self.log('train/accuracy', self.metric_train_acc, on_step=False, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, target = batch
        logits = self.classifier(x)

        logits = logits.view(-1, logits.shape[-1])
        target = target.view(-1)

        loss = self.criterion(logits, target)
        self.log("val/loss", loss, prog_bar=True)

        _, preds = torch.max(logits, dim=1)
        self.metric_val_acc(preds, target)
        self.log('val/accuracy', self.metric_val_acc, on_step=False, on_epoch=True, prog_bar=True)

    def test_step(self, batch, batch_idx):
        x, target = batch
        logits = self.classifier(x)

        logits = logits.view(-1, logits.shape[-1])
        target = target.view(-1)

        _, preds = torch.max(logits, dim=1)
        self.metric_test_acc(preds, target)
        self.log('test/accuracy', self.metric_test_acc, on_step=False, on_epoch=True, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.classifier.parameters(), lr=1e-4, weight_decay=1e-5)
        lr_scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=1e-3, total_steps=self.trainer.estimated_stepping_batches)
        return [optimizer], [{'scheduler': lr_scheduler, 'interval': 'step'}]


Definicja hiper-parametrów procesu uczenia.



In [None]:
vocab_size = 10
d_model = 32
num_classes = vocab_size
num_layers = 1

In [None]:
# Init the network
my_transformer = MyTransformer(vocab_size=vocab_size, d_model=d_model, num_classes=num_classes, num_layers=num_layers)
# Wrap in a lightning module
lit_model = LitNet(my_transformer)

##Zadanie: Odwrócenie kolejności elementów w sekwencji

###Zbiór danych
Wygenerujemy syntetyczny zbiór danych zawierający sekwencje liczb od 0 do 9.
Naszym **zadaniem będzie odwrócenie kolejności liczb w sekwencji wejściowej**.
Potraktujemy to jako problem klasyfikacji każdego elementu sekwencji - chcemy aby model każdemu elementowi sekwencji wejściowej przypisał klasę oczekiwanego na wyjściu elementu.
Na przykład, kolejne elementy sekwencji $1,7,3,4,2$ powinny zostać sklasyfikowane jako odpowiednio $2,4,3,7,1$ - dzięki czemu uzyskamy odwróconą sekwencję.

In [None]:
from torch.utils.data import Dataset, DataLoader


class ReverseDataset(Dataset):

    def __init__(self, num_categories, seq_len, size):
        super().__init__()
        self.num_categories = num_categories
        self.seq_len = seq_len
        self.size = size

        self.data = torch.randint(self.num_categories, size=(self.size, self.seq_len))

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        inp_data = self.data[idx]
        labels = torch.flip(inp_data, dims=(0,))
        return inp_data, labels

In [None]:
train_loader = DataLoader(ReverseDataset(num_categories=10, seq_len=8, size=50000), batch_size=128, shuffle=True, drop_last=True, pin_memory=True)
val_loader   = DataLoader(ReverseDataset(num_categories=10, seq_len=8, size=1000), batch_size=128)
test_loader  = DataLoader(ReverseDataset(num_categories=10, seq_len=8, size=10000), batch_size=128)

Przykładowa sekwencja wejściowa i etykiety określające oczekiwane wyjście - klasy które powinny zostać przypisane przez model każdemu elementowi sekwencji wejściowej.

In [None]:
inp_data, labels = train_loader.dataset[0]
print("Wejście:  ", inp_data)
print("Etykiety: ", labels)

In [None]:
batch = next(iter(train_loader))
inputs, targets = batch

print(inputs.shape)
print(targets.shape)

y = my_transformer(inputs)
print(y.shape)

Sprawdzenie działania modelu z losowo zainicjalizowanymi wagami.

In [None]:
for x, target, predicted in zip(inputs[0], targets[0], y[0]):
    print(f"Element z wejściowej sekwencji : {x}   Etykieta: {target}   Predykowana klasa: {predicted.detach().argmax(dim=0)}")

Modele sieci neuronowych inicjalizowane są z losowymi wagami. PyTorch domyślnie inicjalizuje wagi model korzystając z metody Xaviera lub He. Więcej informacji: [link](https://www.deeplearning.ai/ai-notes/initialization/index.html).

Sprawdzimy maskę wag atencji wyznaczoną przez niewytrenowany model. Nie widać specjalnych zależności. Dla każdego elementu w sekwencji (wiersza) wagi atencji względem pozostałych elementów (kolumny) wyglądają losowo.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

data_input, labels = next(iter(val_loader))
data_input = data_input.to(device)
attention_maps = my_transformer.get_attention_maps(data_input)

In [None]:
import seaborn as sns
#import matplotlib.pyplot as plt

def visualize_attention_map(attention_weights):
    # Wizualizacja pojedycznej mapy atencji
    assert attention_weights.dim() == 2
    plt.figure(figsize=(6, 4))
    sns.heatmap(attention_weights.detach().cpu().numpy(), cmap="Blues")
    plt.xlabel("Keys")
    plt.ylabel("Queries")
    plt.show()

In [None]:
# Wizualizacja mapy atencji z pierwszej warstwy kodera Transformer dla pierwszej sekwencji ze wsadu
visualize_attention_map(attention_maps[0][0])

###Trenowanie modelu
Rozpoczęcie treningu modelu.

In [None]:
import os

CHECKPOINT_PATH = "saved_models/"

max_epochs = 3
lr = 5e-4
warmup = 50
max_iters = max_epochs*len(train_loader)

trainer = L.Trainer(max_epochs=2, log_every_n_steps=50)
trainer.fit(
    lit_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader
    )

###Ewaluacja modelu
Ewaluacja wytrenowanego modelu na zbiorze testowym.

In [None]:
trainer.test(dataloaders=test_loader)

Sprawdźmy wynik działania wytrenowanego modelu dla przykładowego wejścia.

In [None]:
batch = next(iter(test_loader))
inputs, targets = batch

print(inputs.shape)
print(targets.shape)

y = my_transformer(inputs)
print(y.shape)

In [None]:
for x, target, predicted in zip(inputs[0], targets[0], y[0]):
    print(f"Element z wejściowej sekwencji : {x}   Etykieta: {target}   Predykowana klasa: {predicted.detach().argmax(dim=0)}")

Wizualizacja mapy atencji dla przykładowego wejścia. Dla $i$-tego elementu w sekwencji model nauczył się zwracać uwagę na $i$-ty od końca element.
I o to chodziło, wyznaczając wartość $i$-tego elementu w odwróconej sekwencji należy wziąć wartość $i$-tego od końca elementu w wejściowej sekwencji.

In [None]:
attention_maps = my_transformer.get_attention_maps(data_input)
# Odwołanie do map atencji z pierwszej warstwy dla pierwszej sekwencji ze wsadu
visualize_attention_map(attention_maps[0][0])

Zauważmy, że model nauczył się przekształcać sekwencje liczb poprzez demonstrację par złożonych z przykładowych sekwencji wejściowych i oczekiwanego wejścia.

##Zadanie: Zbalansowane nawiasy

Problem odwrócenia kolejności ciągu był bardzo prosty. Model nie musiał brać pod uwagę wartości elementów w sekwencji - wystarczyło, że nauczył się aby jako wartość $i$-tego elementu wyjścia brać $n-i$-ty element wejścia.

Spróbujmy trudniejszego problemu - sprawdzenia czy sekwencja nawiasów jest zbalansowana. Rozwiązanie tego problemy wymaga wzięcia pod uwagę zarówno pozycji elementów (nawiasów) jak i ich wartości (otwierający czy zamykający).

W odróżnieniu od poprzedniego problemu klasyfikujemy całą sekwencję - czy zawiera zrównoważone nawiasy czy nie. Czyli klasyfikator musimy oprzeć na wektorze reprezentującym całą sekwencję, a nie pojedyczy element.
Aby uzyskać **reprezentację o stałej długości dla całej sekwencji** można zastosować następujące podejścia:
- Na początku sekwencji dodać specjalny token `CLS` i kontekstową reprezentację tego tokenu traktować jak reprezentację całej sekwencji wejściowej. To podejście jest typowo stosowane w modelach klasy tylko-koder, np. BERT.
- Uśrednić kontekstowe reprezentacje wszystkich tokenów w sekwencji. W dalszej części notatnika zastosujemy to podejście.

###Zbiór danych

Pomocnicze funkcje tworzące zbiór danych złożony z napisów ze zbalansowanymi i nie zbalansowanymi nawiasami.

In [None]:
import random

def generate_balanced_brackets(n: int) -> list[str]:
    """Generate all balanced brackets string with n pairs."""
    result = []

    def backtrack(current, open_count, close_count):
        if len(current) == 2 * n:
            result.append(current)
            return

        if open_count < n:
            backtrack(current + "(", open_count + 1, close_count)
        if close_count < open_count:
            backtrack(current + ")", open_count, close_count + 1)

    backtrack("", 0, 0)
    return result

In [None]:
def generate_random_brackets(length) -> str:
    """Generate a random string of open or close brackets."""
    return "".join(random.choice(["(", ")"]) for _ in range(length))

In [None]:
def is_balanced(s: str) -> bool:
    """Check if a string of brackets is balanced."""
    balance = 0
    for char in s:
        if char == "(":
            balance += 1
        elif char == ")":
            balance -= 1
        if balance < 0:
            return False
    return balance == 0

In [None]:
class BracketDataset(Dataset):

    def __init__(self, num_pairs: int):
        super().__init__()
        self.num_pairs = num_pairs
        self.balanced_brackets = generate_balanced_brackets(num_pairs)

    def __len__(self):
        return len(self.balanced_brackets) * 2

    def __getitem__(self, idx):
        if idx % 2 == 0:
            s = self.balanced_brackets[idx // 2]
            label = 1
        else:
            s = generate_random_brackets(self.num_pairs*2)
            label = 0

        # Kodowanie: 0="(", 1=")"
        x = torch.tensor([0 if ch=="(" else 1 for ch in s])
        label = torch.tensor(label)
        return x, label

Przykładowe elementy ze zbioru danych - `0` oznacza nawias otwierający a `1` nawias zamykający. Etykieta `1` oznacza, że sekwencja nawiasów jest zbalansowana, a `0` przeciwnie.

In [None]:
ds = BracketDataset(num_pairs=10)
print(len(ds))
print(ds[0])
print(ds[1])
print(ds[2])
print(ds[3])
print(ds[4])

In [None]:
def decode_dataset_element(e):
    # Zdekoduj elementy zbioru danych
    x, label = e
    s = "".join(["(" if ch==0 else ")" for ch in x])
    s = f'{s}   {"balanced" if label==1 else "unbalanced"}'
    return s

In [None]:
print(decode_dataset_element(ds[0]))
print(decode_dataset_element(ds[1]))
print(decode_dataset_element(ds[2]))
print(decode_dataset_element(ds[3]))

In [None]:
from torch.utils.data import random_split

train_size = int(0.8 * len(ds))
val_size = int(0.1 * len(ds))
test_size = len(ds) - train_size - val_size

train_ds, val_ds, test_ds = random_split(ds, [train_size, val_size, test_size])

print(f"Rozmiar zbioru trningowego: {len(train_ds)}")
print(f"Rozmiar zbioru walidacyjnego: {len(val_ds)}")
print(f"Rozmiar zbioru testowego: {len(test_ds)}")

In [None]:
train_loader = DataLoader(train_ds, batch_size=128, shuffle=True, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=128)
test_loader  = DataLoader(test_ds, batch_size=128)

###Definicja architektury modelu

Definicja modelu o architekturze Transformer klasy tylko-koder z liniową głowicą klasyfikującą sekwencję. Głowica klasyfikacyjna `self.classification` oparta jest o uśrednione kontekstowe reprezentacje elementów w sekwencji.

In [None]:
class MyTransformer(nn.Module):
    def __init__(self, vocab_size: int, d_model: int, num_classes: int, num_layers: int):
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer = TransformerEncoder(num_layers, d_model, 2*d_model)
        self.classification = nn.Linear(d_model, num_classes)

    def forward(self, x):
        x = self.embeddings(x)
        x = self.positional_encoding(x)
        x = self.transformer(x)

        assert x.dim() == 3

        # Jako reprezentację całej sekwencji bierzemy uśrednione wektory reprezentacji
        x = x.mean(dim=1)
        x = self.classification(x)
        return x

    def get_attention_maps(self, x):
        """
        Function for extracting the attention matrices of the whole Transformer for a single batch.
        Input arguments same as the forward pass.
        """
        with torch.no_grad():
            x = self.embeddings(x)
            x = self.positional_encoding(x)
            attention_maps = self.transformer.get_attention_maps(x)
        return attention_maps

In [None]:
class LitNet(L.LightningModule):
    def __init__(self, classifier: nn.Module):
        super().__init__()
        self.classifier = classifier
        self.criterion = nn.CrossEntropyLoss()

        # TODO: Change hard coded number of classes
        self.metric_train_acc = torchmetrics.classification.Accuracy(task="multiclass", num_classes=2)
        self.metric_val_acc = torchmetrics.classification.Accuracy(task="multiclass", num_classes=2)
        self.metric_test_acc = torchmetrics.classification.Accuracy(task="multiclass", num_classes=2)

    def training_step(self, batch, batch_idx):
        # training_step implementuje jeden krok pętli treningowej
        x, target = batch
        logits = self.classifier(x)
        loss = self.criterion(logits, target)
        self.log("train/loss", loss, prog_bar=True)

        _, preds = torch.max(logits, dim=1)
        self.metric_train_acc(preds, target)
        self.log('train/accuracy', self.metric_train_acc, on_step=False, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, target = batch
        logits = self.classifier(x)
        loss = self.criterion(logits, target)
        self.log("val/loss", loss, prog_bar=True)

        _, preds = torch.max(logits, dim=1)
        self.metric_val_acc(preds, target)
        self.log('val/accuracy', self.metric_val_acc, on_step=False, on_epoch=True, prog_bar=True)

    def test_step(self, batch, batch_idx):
        x, target = batch
        logits = self.classifier(x)

        _, preds = torch.max(logits, dim=1)
        self.metric_test_acc(preds, target)
        self.log('test/accuracy', self.metric_test_acc, on_step=False, on_epoch=True, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.classifier.parameters(), lr=1e-4, weight_decay=1e-5)
        lr_scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=1e-3, total_steps=self.trainer.estimated_stepping_batches)
        return [optimizer], [{'scheduler': lr_scheduler, 'interval': 'step'}]

Utworzenie instancji modelu sieci MyTransformer złożonej z jednej warstwy kodera Transformer (`num_layers=1`).

In [None]:
# Init the network
my_transformer = MyTransformer(vocab_size=2, d_model=32, num_classes=2, num_layers=1)
# Wrap in a lightning module
lit_model = LitNet(my_transformer)

###Trenowanie modelu

Uruchom trenowanie modelu.

In [None]:
import os

CHECKPOINT_PATH = "saved_models/"

max_epochs = 6
lr = 5e-4
warmup = 50
max_iters = max_epochs*len(train_loader)

#trainer = L.Trainer(max_epochs=num_epochs, logger=wandb_logger)
trainer = L.Trainer(max_epochs=2, log_every_n_steps=50)
trainer.fit(
    lit_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader
    )

###Ewaluacja modelu

In [None]:
trainer.test(dataloaders=test_loader)

Skuteczność klasyfikacji jest ograniczona (dokładność rzędu 93%), wytrenowany model nie jest w stanie poprawnie sprawdzić zbalansowania nawiasów dla każdej sekwencji wejściowej.

Wizualizacja mapy atencji dla przykładowego wejścia.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

data_input, labels = next(iter(val_loader))
data_input = data_input.to(device)
attention_maps = my_transformer.get_attention_maps(data_input)

# Odwołanie do map atencji z dla pierwszej sekwencji ze wsadu
visualize_attention_map(attention_maps[0][0])
print(data_input[1])

Sprawdzimy czy większy model, złożony z trzech warstw kodera Transformer (`num_layers=3`) pozwoli osiągnąć lepsze wyniki.

In [None]:
# Init the network
my_transformer = MyTransformer(vocab_size=2, d_model=32, num_classes=2, num_layers=3)
# Wrap in a lightning module
lit_model = LitNet(my_transformer)

In [None]:
import os

CHECKPOINT_PATH = "saved_models/"

max_epochs = 6
lr = 5e-4
warmup = 50
max_iters = max_epochs*len(train_loader)

#trainer = L.Trainer(max_epochs=num_epochs, logger=wandb_logger)
trainer = L.Trainer(max_epochs=2, log_every_n_steps=50)
trainer.fit(
    lit_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader
    )

In [None]:
trainer.test(dataloaders=test_loader)

Zwiększenie liczy warstw kodera do trzech pozwoliło lepiej uchwycić zależności między elementami sekwencji. Skuteczność klasyfikacji na zbiorze testowym wzrosła z 93 do 97%.