In [1]:
import math
import torch
from torch import nn
from tokenizers import Tokenizer
from tokenizers.models import BPE
from torch.nn import functional as F
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace


# I. Prerequisits

## BPE (Byte Pair Encoding) Tokenization

1. Initialize vocabulary: all individual characters (or bytes).

2. Count frequencies of all adjacent symbol pairs.

3. Merge the most frequent pair into a new symbol.

4. Repeat steps 2–3 until reaching the desired vocabulary size.

In [2]:
VOCAB_SIZE = 2000

tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
tokenizer.pre_tokenizer = Whitespace()

trainer = BpeTrainer(
    vocab_size=VOCAB_SIZE,
    special_tokens=["[UNK]", "[PAD]", "[CLS]", "[SEP]", "[MASK]"]
)

tokenizer.train(files=["train.txt"], trainer=trainer)






In [3]:
# 3) Use it
enc = tokenizer.encode("hello words tokenization")
print(enc.tokens)
print(enc.ids)


['hel', 'l', 'o', 'words', 'tokenization']
[78, 15, 18, 59, 136]


## Embedding

`nn.Embedding` - **trainable** lookup table that maps token IDs to dense vectors.

It is initialized with random weights (_Xavier_) and learns them during training.

Text is tokenized $\to$ converted to IDs $\to$ passed through nn.Embedding $\to$ turned into embeddings for the model.


In [4]:
vocab_size = VOCAB_SIZE
embedding_dim = 512

emb = nn.Embedding(vocab_size, embedding_dim)

emb.weight      # shape: (vocab_size, embedding_dim)

Parameter containing:
tensor([[ 0.3215, -1.5927,  0.7108,  ...,  2.0676,  0.0083,  0.4260],
        [-1.7744,  0.1643, -0.4902,  ...,  1.4040,  0.2221,  0.2700],
        [-0.5170, -0.4645,  0.6339,  ..., -0.1314,  0.2753,  0.8073],
        ...,
        [-2.6290,  0.5621, -0.4083,  ..., -0.4444,  1.5147, -0.2869],
        [ 1.9268,  0.6434, -1.1821,  ...,  0.0537,  1.0854,  0.1743],
        [-0.0758, -1.6519,  2.5185,  ..., -0.7546,  1.6106, -0.4367]],
       requires_grad=True)

In [5]:
token_ids = torch.tensor([78, 15, 18, 59, 136])
out = emb(token_ids)

out             # shape: (3, embedding_dim)

tensor([[-0.5533,  0.1166, -0.4037,  ...,  0.4473,  1.9527,  2.4356],
        [ 1.0676,  0.7293,  0.0139,  ..., -0.5089,  0.1529,  1.7803],
        [ 0.1888,  0.6157,  0.2174,  ...,  0.2663,  1.2283, -1.0712],
        [-0.3183, -0.7785, -2.7941,  ...,  0.9561,  1.8599,  1.2893],
        [ 0.7906,  0.4859,  1.5388,  ..., -1.2838, -1.1070, -0.8138]],
       grad_fn=<EmbeddingBackward0>)

# II. Transformer

## 1. Vanilla Encoder–Decoder (Sutskever et al., 2014)

**Architecture:** RNN encoder $\to$ fixed vector $\to$ RNN decoder.  
No attention.

### Math

- Encoder reads inputs $ x_1, \dots, x_T $:

$$
h_t = \mathrm{RNN}_{\text{enc}}(h_{t-1}, x_t), \quad h_0 = 0
$$

- Final hidden state $ h_T $ is the **context vector** $ c $.

- Decoder generates outputs sequentially:

$$
s_t = \mathrm{RNN}_{\text{dec}}(s_{t-1}, y_{t-1}, c)
$$
$$
P(y_t \mid y_{<t}, x) = \mathrm{Softmax}(W_o s_t + b_o)
$$

## 2. Bahdanau (Additive) Attention (Bahdanau et al., 2014)

**Architecture:** RNN encoder–decoder with additive attention.  
Attention is computed **before** the decoder RNN step (_pre-RNN attention_).

## 3. Luong (Multiplicative) Attention (Luong et al., 2015)

**Architecture:** RNN encoder–decoder with multiplicative attention.  
Attention is computed **after** the decoder RNN step (_post-RNN attention_).  

## 4. Transformer Attention ([Attention Is All You Need [2017]](https://arxiv.org/abs/1706.03762))

**Architecture:** Encoder–decoder model without any RNN / LSTM / CNN / ..., do only attention.

So 2. and 3. did attention only _between_ encoder & decoder (that were RNN / ...), now encoder & decoder are attention themselves (and we also keep doing attention between).

### Step 1: Input Representations

$$
X \in \mathbb{R}^{n_{sequence} \times d_{\text{embedding}}}
$$
where row $X_i$ is the embedding of token $i$.  

### Step 2: $Q, K, V$

The Transformer learns **three linear projection matrices per head**:

$$
W^Q \in \mathbb{R}^{d_{\text{model}} \times d_k}, \quad
W^K \in \mathbb{R}^{d_{\text{model}} \times d_k}, \quad
W^V \in \mathbb{R}^{d_{\text{model}} \times d_v}
$$

Then:

$$
Q = X W^Q, \quad K = X W^K, \quad V = X W^V
$$

- $Q \in \mathbb{R}^{n \times d_k}$ (queries) - "What am I looking for?"

- $K \in \mathbb{R}^{n \times d_k}$ (keys) - "What properties do I have?"

- $V \in \mathbb{R}^{n \times d_v}$ (values) - "What information should I pass on if selected?"

### Step 3: Scoring Queries Against Keys

How much should token $i$ pay attention to token $j$?

The raw **attention score** between query $Q_i$ and key $K_j$ is:

$$
\text{score}(i,j) = Q_i \cdot K_j^T = \sum_{m=1}^{d_k} Q_{i,m} K_{j,m}
$$

$$
\text{Scores} = Q K^T \in \mathbb{R}^{n \times n}
$$

Row $i$ = how much token $i$ attends to all tokens.

### Step 4: Scaling by $\sqrt{d_k}$

#### **Assumptions**

1. $$
    \mathbb{E}[k_i] = \mathbb{E}[q_i] = 0, \quad \mathrm{Var}[k_i] = \mathrm{Var}[q_i] = 1
    $$

    - Under standard weight initialization (e.g., Xavier) that is okay

2. $ k_i, q_i $ are _independent_ across dimensions

Then:

$$
\mathrm{Var}[K^\top Q]
= \sum_{i=1}^{d_k} \mathrm{Var}[k_i q_i]
= \sum_{i=1}^{d_k} \mathrm{Var}[k_i] \mathrm{Var}[q_i]
= d_k
$$

#### **Scaling**

Because large values push softmax into saturation, leading to vanishing gradients, we scale by $\sqrt{d_k}$ (like temperature):

$$
\text{ScaledScores} = \frac{Q K^T}{\sqrt{d_k}}
$$

### Step 5: Attention

**Attention weights (scores)**:

$$
A = \text{softmax}\!\left(\frac{Q K^T}{\sqrt{d_k}}\right) \in \mathbb{R}^{n \times n}
$$

- Row $i$: distribution over which tokens $i$ attends to.  
- Each row sums to 1.  

**Attention output (pattern)**:

$$
\text{Attention}(Q, K, V) = AV
$$

### Step 6: Multi-Head Attention

> _multi_-head attention instead of a _single_ head to focus on different representation subspaces: Syntactic structure, Positional patterns, Semantic roles, ...

For $h$ heads, we repeat the above with different projection matrices:

$$
\text{head}_i = \text{Attention}(X W_i^Q, \, X W_i^K, \, X W_i^V)
$$

Then concatenate:

$$
\text{MHA}(X) = \text{Concat}(\text{head}_1, \dots, \text{head}_h) W^O
$$

with $W^O \in \mathbb{R}^{hd_v \times d_{\text{model}}}$.


## Transformer Architecture Overview

<p align="center">
  <img src="transformer.png" alt="transformer" />
</p>

- **Encoder** (left block): processes the input sequence into a contextual representation.

- **Decoder** (right block): generates the output sequence one token at a time.

  Attends both to previously generated tokens and the encoder’s representation:

  - Queries $\{Q_i\}$ come from the decoder.
  - Keys $\{K_i\}$ & Values $\{V_i\}$ come from encoder output.

  Allows decoder to focus on relevant input parts when generating output.

> Embeddings flow from the last layer of Encoder block into all Decoder block's attention (not from the respective (соответствующих) layers).

### Masked Multi-Head Attention

The mask is just 0&1 matrix for decoder not to look into future.

## BERT-GPT-BART

1. BERT (encoder-only)

  Bidirectional Encoder Representations from Transformers.

2. GPT-x (decoder-only)

  Generative Pretrained Transformer.

3. BART (encoder-decoder)

  Encoder reads corrupted text in full (like BERT).

  Decoder generates clean text autoregressively (like GPT).

### Why GPT-x, but not BART?


In [6]:
class DotProductAttentionLayer(nn.Module):
    def __init__(self, enc_size, dec_size, hid_size):
        """
        Scaled dot-product attention with projections
        :param enc_size: num units in encoder state
        :param dec_size: num units in decoder state
        :param hid_size: dimension of query/key space (d_k)
        """
        super().__init__()

        # Define projection layers
        # W_q, W_k, W_v project decoder states into query, key, value
        self.wq = nn.Linear(dec_size, hid_size, bias=False)
        self.wk = nn.Linear(enc_size, hid_size, bias=False)
        self.wv = nn.Linear(enc_size, hid_size, bias=False)

    def forward(self, enc, dec, input_mask):
        """
        # B - batch_size
        # T - seq_len
        # H - hid_dim

        Compute attention response and weights
        :param enc: encoder sequence [B, T, enc_size]
        :param dec: decoder state [B, dec_size]
        :param input_mask: mask [B, T] (0 = padding)
        :returns: attn [B, H], probs [B, T]
        """

        q = self.wq(dec).unsqueeze(1)                           # [B, 1, H]; unsqueeze, so we could multiply with k.T
        k = self.wk(enc)                                        # [B, T, H]
        v = self.wv(enc)                                        # [B, T, H]

        logits = torch.bmm(q, k.transpose(1, 2)).squeeze(1)     # [B, T]
        scaled_logits = logits / math.sqrt(self.hid_size)
        masked_logits = scaled_logits.masked_fill(~input_mask, float('-inf'))

        probs = torch.softmax(masked_logits, dim=-1)            # [B, T]

        attention = torch.bmm(probs.unsqueeze(1), v).squeeze(1) # [B, H]

        return attention, probs


In [7]:
class Transformer(nn.Module):
    """
    Minimal GPT-style (decoder-only) Transformer using torch.nn building blocks.
    - Token + positional embeddings
    - Stack of TransformerEncoderLayer with a causal (future-masking) attention mask
    - LM head tied to token embeddings (optional but typical for GPT-style)
    """

    def __init__(
        self,
        vocab_size: int,
        max_seq_len: int,
        d_model: int = 512,
        n_heads: int = 8,
        n_layers: int = 6,
        d_ff: int = 2048,
        dropout: float = 0.1,
        tie_weights: bool = True,
    ) -> None:
        super().__init__()

        if d_model % n_heads != 0:
            raise ValueError("d_model must be divisible by n_heads")

        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.d_model = d_model

        self.tok_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(max_seq_len, d_model)
        self.drop = nn.Dropout(dropout)

        # EncoderLayer works fine for GPT-style decoder-only if we apply a causal mask.
        layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            activation="gelu",
            batch_first=True,   # (B, T, C)
            norm_first=True,    # closer to many GPT variants
        )
        self.blocks = nn.TransformerEncoder(layer, num_layers=n_layers)

        self.ln_f = nn.LayerNorm(d_model)
        self.lm_head = nn.Linear(d_model, vocab_size, bias=False)

        if tie_weights:
            self.lm_head.weight = self.tok_emb.weight

        # Precompute a boolean causal mask for max_seq_len.
        # True = masked out (disallowed attention) for Transformer modules when using bool masks.
        causal = torch.triu(torch.ones(max_seq_len, max_seq_len, dtype=torch.bool), diagonal=1)
        self.register_buffer("causal_mask", causal, persistent=False)

    def forward(
        self,
        idx: torch.Tensor,             # (B, T) token ids
        targets: torch.Tensor | None = None,  # (B, T) token ids
    ) -> tuple[torch.Tensor, torch.Tensor | None]:
        """
        Returns:
          logits: (B, T, vocab_size)
          loss:   scalar or None
        """
        if idx.dim() != 2:
            raise ValueError("idx must have shape (B, T)")
        bsz, T = idx.shape
        if T > self.max_seq_len:
            raise ValueError(f"Sequence length {T} exceeds max_seq_len={self.max_seq_len}")

        device = idx.device
        pos = torch.arange(T, device=device)  # (T,)

        x = self.tok_emb(idx) + self.pos_emb(pos)[None, :, :]
        x = self.drop(x)

        # Slice causal mask to current sequence length.
        attn_mask = self.causal_mask[:T, :T]  # (T, T), bool

        x = self.blocks(x, mask=attn_mask)
        x = self.ln_f(x)

        logits = self.lm_head(x)

        return logits

    @torch.no_grad()
    def generate(
        self,
        idx: torch.Tensor,          # (B, T)
        max_new_tokens: int,
        temperature: float = 1.0,
        top_k: int | None = None,
    ) -> torch.Tensor:
        """
        Simple autoregressive sampling.
        """
        self.eval()

        for _ in range(max_new_tokens):
            # Crop context if needed
            idx_cond = idx[:, -self.max_seq_len :]

            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]  # (B, V)

            if temperature <= 0:
                # greedy
                next_token = torch.argmax(logits, dim=-1, keepdim=True)
            else:
                logits = logits / temperature
                if top_k is not None and top_k > 0:
                    v, _ = torch.topk(logits, k=min(top_k, logits.size(-1)), dim=-1)
                    cutoff = v[:, -1].unsqueeze(-1)
                    logits = torch.where(logits < cutoff, torch.full_like(logits, -float("inf")), logits)

                probs = F.softmax(logits, dim=-1)
                next_token = torch.multinomial(probs, num_samples=1)

            idx = torch.cat([idx, next_token], dim=1)

        return idx
