# RoFormer: Enhanced Transformer with Rotary Position Embedding.

Su, Jianlin, Yu Lu, Shengfeng Pan, Ahmed Murtadha, Bo Wen, and Yunfeng Liu. "RoFormer: Enhanced Transformer with Rotary Position Embedding." arXiv:2104.09864. Preprint, arXiv, November 8, 2023. https://doi.org/10.48550/arXiv.2104.09864.


In [None]:
!pip install torchinfo

In [56]:
from typing import Optional

import math
from dataclasses import dataclass

import torch
import torch.nn as nn
from torch import Tensor
from torch.nn import functional as F

from torchinfo import summary

In [57]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
torch.manual_seed(42)

In [59]:
@dataclass
class ModelArgs:
    block_size: int = 1024              # seq_len
    vocab_size: int = 32_000
    hidden_size: int = 768              # embedding_size
    type_vocab_size: int = 2
    intermediate_size: int = 3072       # 4 x hidden size
    num_attention_heads: int = 8
    n_layers: int = 4
    n_heads: int = 4
    dim: int = 1024
    head_dim: int = 64
    rope_base: float = 100_0000.0
    norm_eps: float = 1e-5
    dropout: float = 0.1
    attn_dropout: float = 0.1
    layer_norm_eps: float = 1e-12

In [60]:
config = ModelArgs()

In [61]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.c_fc = nn.Linear(config.hidden_size, config.intermediate_size)
        self.c_proj = nn.Linear(config.intermediate_size, config.hidden_size)
        self.act = nn.GELU()

        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        hidden_states = x
        x = self.c_fc(x)
        x = self.act(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        x = self.layer_norm(x + hidden_states)
        return x

Legyen az $\mathbb{S}_N=\left\{w_i\right\}_{i=1}^N$ egy $N$ bemeneti tokenből álló szekvencia, ahol $w_i$ az $i$-edik elem.

Az $\mathbb{S}_N$-hez tartozó (szó)beágyazást jelölje a $\mathbb{E}_N=\left\{\mathbf{x}_i\right\}_{i=1}^N$, ahol $\mathbf{x}_i \in \mathbb{R}^d$ a $w_i$ token $d$ dimenziós (szó)beágyazás vektora *pozicionális információ nélkül*.

Az önfigyelem mechanizmus először a szóbeágyazások $q$,$k$, és $v$ reprezentációkká alakítja.

$$
\begin{aligned}
\mathbf{q}_m & =f_q\left(\mathbf{x}_m, m\right) = \mathbf{W}_q \mathbf{x}_m \\
\mathbf{k}_n & =f_k\left(\mathbf{x}_n, n\right) = \mathbf{W}_k \mathbf{x}_n \\
\mathbf{v}_n & =f_v\left(\mathbf{x}_n, n\right)
\end{aligned}
$$

ahol a $\mathbf{q}_m, \mathbf{k}_n$ és $\mathbf{v}_n$ az $m$-edik és $n$-edik pozíciót jelöli.

A önfigyelem képletében a $$\mathbf{q}_m^{\top} \mathbf{k}_n$$ rész jelenti a tudás átadását különböző tokenek között a különböző pozíciókban.

A korábbi (abszolút és relatív) pozícionális beágyazások során a figyelem mechanizmus előtt történik meg a művelet, azaz

$$
f_{t: t \in\{q, k, v\}}\left(\mathbf{x}_i, i\right) := \mathbf{W}_{t: t \in\{q, k, v\}}\left(\mathbf{x}_i+\mathbf{p}_i\right),
$$

ahol $\mathbf{p}$ adja a pozícionális infromációt és $\mathbf{p}_i \in \mathbb{R}^d$ egy $d$ dimenziós vektor, a az $\mathbf{x}_i$ token pozíciója.

A rotációs pozíció beágyazás (rotary position embedding, röviden RoPE) a vektorok geometriai tulajdonságait a két dimenziós síkon és komplex számként is értelmezhető alakját felhasználva az alábbi megoldást javasolják:

$$
\begin{aligned}
f_q\left(\mathbf{x}_m, m\right) & =\left(\mathbf{W}_q \mathbf{x}_m\right) e^{i m \theta} \\
f_k\left(\mathbf{x}_n, n\right) & =\left(\mathbf{W}_k \mathbf{x}_n\right) e^{i n \theta} \\
\end{aligned}
$$

Adott $(x_1, x_2)$ vektort fel lehet írni a $z = x_1 + x_2 i$ komplex számmal (algebrai alak). Tetszőleges $z \in \mathbb{C}$ adott $\theta$ szöggel való forgatását az $z \times e^{i \theta}$ adja.

Ha $m \theta$ a pozíciófüggő (szög)frekvencia, akkor a forgatást a komplex számísíkon a $e^{i m \theta}$ forgatás jelöli.

Az $e^{i m \theta}$ az Euler-képlet alapján kifejezhető:
$$
e^{i m \theta} = \cos(m \theta) +i\sin (m \theta)
$$

Ezért, tetszőleges $z \in \mathbb{C}$ szám forgatása felírható:
$$
z^{\prime}=z \cdot e^{i m \theta}=\left(x_1+i x_2\right)(\cos (m \theta)+i \sin (m \theta)) .
$$

$$
z^{\prime}=\left(x_1 \cos (m \theta)-x_2 \sin (m \theta)\right)+i\left(x_1 \sin (m \theta)+x_2 \cos (m \theta)\right)
$$

A RoPE során a $\cos(m \theta)$ és $\sin(m \theta)$ értékek tárolásra kerülnek egy rotációs mátrixban ($\mathbf{R}_{\Theta}$) a
$$
e^{i m \theta} = \cos(m \theta) +i\sin (m \theta)
$$

alapján, ahol $\Theta$ a szögfrekvenciák skálája

$$
\Theta=\left\{\theta_i=10000^{-2(i-1) / d}, i \in[1,2, \ldots, d / 2]\right\}.
$$

Mivel az eredményeket a két dimenziós sík alapján akarták általánosítani bármely $ \mathbf{x}_i \in \mathbb{R}^d$ vektorra, ahol $d$ páros, ezért a dimenziót kettővel osztva, $d/2$ altér lesz.

A számítást a `precompute_freqs_cis` függvény valósítja meg.

A $f_q$ és $f_k$ függvények felírhatóak az alábbi alakban:

$$
f_{\{q, k\}}\left(\mathbf{x}_m, m\right) =
\mathbf{R}_{\Theta, m}^d \mathbf{W}_{\{q, k\}} \mathbf{x}_m
$$

A komputációs hatékonyság miatt a $\mathbf{R}_{\Theta, m}^d$ és $\boldsymbol{x} \in \mathbb{R}^d$ realizációja a

$$
\mathbf{R}_{\Theta, m}^d \mathbf{x}=\left(\begin{array}{c}
x_1 \\
x_2 \\
x_3 \\
x_4 \\
\vdots \\
x_{d-1} \\
x_d
\end{array}\right) \otimes\left(\begin{array}{c}
\cos m \theta_1 \\
\cos m \theta_1 \\
\cos m \theta_2 \\
\cos m \theta_2 \\
\vdots \\
\cos m \theta_{d / 2} \\
\cos m \theta_{d / 2}
\end{array}\right)+\left(\begin{array}{c}
-x_2 \\
x_1 \\
-x_4 \\
x_3 \\
\vdots \\
-x_d \\
x_{d-1}
\end{array}\right) \otimes\left(\begin{array}{c}
\sin m \theta_1 \\
\sin m \theta_1 \\
\sin m \theta_2 \\
\sin m \theta_2 \\
\vdots \\
\sin m \theta_{d / 2} \\
\sin m \theta_{d / 2}
\end{array}\right)
$$

amelyet az `apply_rotary_emb` függvény valósít meg.

Megjegyzések

- A rotációs mátrix olyan transzformációs mátrix, amely euklideszi térben forgatás végrehajtására szolgál.

- Két complex vektor, $\mathbf{a}$ és $\mathbf{b}$ skaláris szorzata (dot product)
$$
\mathbf{a} \cdot \mathbf{b}=\sum_i a_i \overline{b_i},
$$

- Komplex számokon az összeadás és szorás művelet értelmezése
$$
\begin{aligned}
&(a, b)+(c, d) \doteq(a+c, b+d), \\
&(a, b)(c, d) \doteq(a c-b d, b c+a d) .
\end{aligned}
$$

- Az Euler-képlet a komplex matematikai analízis egy formulája, mely megmutatja, hogy szoros kapcsolat van a szögfüggvények és a komplex exponenciális függvény között. Az Euler-képlet azt állítja, hogy minden valós $x$ számra igaz (és ahol $e$ az Euler-féle szám, a természetes logaritmus alapja, e=2,718):

$$
e^{ix} = \cos(x) +i\sin (x)
$$



----

In [62]:
def precompute_freqs_cis(
        seq_len: int,
        head_dim: int,
        base: int = 10_000
) -> torch.Tensor:

    freqs = 1.0 / (base ** (torch.arange(0, head_dim, 2)[: (head_dim // 2)].float() / head_dim))    # (d // 2)

    t = torch.arange(seq_len, device=freqs.device)                                                 # [0, 1, 2, ..., seq_len-1], (seql_len)

    freqs = torch.outer(t, freqs)                                                                  # (seq_len, d // 2)

    # x' = x * cos(theta) + x * sin(theta) * i, ahol i az imaginárius szám
    freqs_cis = torch.polar(torch.ones_like(freqs), freqs)                                         # (seq_len, d // 2)

    cache = torch.stack([freqs_cis.real, freqs_cis.imag], dim=-1)                                  # (seq_len, d // 2, 2)

    return cache.to(dtype=torch.bfloat16)

In [63]:
def apply_rotary_emb(
    x: Tensor,                                                                          # (bs, block_size, n_heads, head_dim)
    freqs_cis: Tensor                                                                   # (block_size, head_dim // 2, 2)
) -> torch.Tensor:
    xshaped = x.float().reshape(*x.shape[:-1], -1, 2)                                    # (bs, block_size, n_heads, head_dim // 2, 2)
    freqs_cis = freqs_cis.view(1, xshaped.size(1), 1, xshaped.size(3), 2)               # (1, block_size, 1, head_dim // 2, 2)
    x_out = torch.stack(
        [
            # első komponens rotáció: x1 * cos(theta) - x2 * sin(theta)
            xshaped[..., 0] * freqs_cis[..., 0] - xshaped[..., 1] * freqs_cis[..., 1],
            # második komponens rotáció: x2 * cos(theta) + x1 * sin(theta)
            xshaped[..., 1] * freqs_cis[..., 0] + xshaped[..., 0] * freqs_cis[..., 1],
        ],
        -1,
    )
    x_out = x_out.flatten(3)
    return x_out.type_as(x)

In [64]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()

        assert config.hidden_size % config.n_heads == 0, "hidden size is indivisible by n_heads (number of attention heads)"

        self.n_heads = config.n_heads                               # h
        self.head_dim = config.hidden_size // config.n_heads         # d_h
        self.all_head_size = config.n_heads * self.head_dim

        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)

        self.dropout = nn.Dropout(config.attn_dropout)

        self.w_o = nn.Linear(config.hidden_size, config.hidden_size) # (d, d)

    def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor, attention_mask: Optional[torch.Tensor] = None) -> torch.Tensor:

        batch_size, seq_len, hidden_size = x.shape # (b,t,d)

        q = self.query(x).view(batch_size, seq_len, self.n_heads, self.head_dim)
        k = self.key(x).view(batch_size, seq_len, self.n_heads, self.head_dim)
        v = self.value(x).view(batch_size, seq_len, self.n_heads, self.head_dim)

        q = apply_rotary_emb(q, freqs_cis)
        k = apply_rotary_emb(k, freqs_cis)

        # (b, h, t, d_h) <- (b, t, h, d_h)
        q, k, v = map(lambda x: x.transpose(1, 2), (q, k, v))

        # (b, h, t, t) = (b, h, t, d_h) @ (b, h, d_h, t)
        attn_scores = torch.matmul(q, k.transpose(2, 3)) / math.sqrt(self.head_dim)

        if attention_mask is not None:
            attn_scores = attn_scores + attention_mask

        attention_probs = F.softmax(attn_scores, dim=-1)

        attention_probs = self.dropout(attention_probs)

        # (b, h, t, d_v) = (b, h, t, t) @ (b, h, t, d_v)
        # (b, t, h, d_v) <- (b, h, t, d_v)
        context = torch.matmul(attention_probs, v).transpose(1, 2).contiguous()

        context = context.view(batch_size, seq_len, hidden_size)

        return self.w_o(context)


In [65]:
class EncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)
        self.ffn_norm = nn.LayerNorm(config.hidden_size, config.layer_norm_eps)
        self.attention_norm = nn.LayerNorm(config.hidden_size, config.layer_norm_eps)

    def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor, attention_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        h = x + self.attention(self.attention_norm(x), freqs_cis, attention_mask)
        out = h + self.feed_forward(self.ffn_norm(h))
        return out

In [66]:
class RoFormer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        self.layers = nn.ModuleList([EncoderLayer(config) for i in range(config.n_layers)])
        self.output = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

        self.freqs_cis = precompute_freqs_cis(config.block_size, config.hidden_size // config.n_heads, config.rope_base)

    def forward(
            self,
            x: torch.Tensor,
            attention_mask: Optional[torch.Tensor] = None,
            input_pos: Optional[torch.Tensor] = None # (0, seq_len-1)
    ) -> torch.Tensor:

        batch_size, seq_length = x.size()
        freqs_cis = self.freqs_cis[input_pos]

        if attention_mask is None:
            attention_mask = torch.ones(((batch_size, seq_length)), device=x.device)
        attention_mask = self.get_extended_attention_mask(attention_mask, x.shape)

        x = self.token_embeddings(x)

        for i, layer in enumerate(self.layers):
            x = layer(x, freqs_cis, attention_mask)
        logits = self.output(x)
        return logits

    def get_extended_attention_mask(self, attention_mask: torch.Tensor, input_shape) -> torch.Tensor:
        # attention_mask is 1.0 for positions we want to attend and 0.0 for masked positions
        # create attention mask of shape (batch_size, 1, 1, seq_len), where 1 will be 0 and
        # 0 will the smallest value for a given dtype (-inf)
        extended_attention_mask = attention_mask[:, None, None, :] #
        extended_attention_mask = extended_attention_mask.to(dtype=attention_mask.dtype)
        extended_attention_mask = (1.0 - extended_attention_mask) * torch.finfo(attention_mask.dtype).min
        return extended_attention_mask

In [67]:
roformer = RoFormer(config).to(device)

In [68]:
x = torch.randint(0, config.vocab_size, (4, config.block_size), device=device).to(torch.long)

In [69]:
summary(roformer, input_data=x)

Layer (type:depth-idx)                   Output Shape              Param #
RoFormer                                 [4, 1024, 32000]          --
├─Embedding: 1-1                         [4, 1024, 768]            24,576,000
├─ModuleList: 1-2                        --                        --
│    └─EncoderLayer: 2-1                 [4, 1024, 768]            --
│    │    └─LayerNorm: 3-1               [4, 1024, 768]            1,536
│    │    └─MultiHeadAttention: 3-2      [4, 1024, 768]            2,362,368
│    │    └─LayerNorm: 3-3               [4, 1024, 768]            1,536
│    │    └─FeedForward: 3-4             [4, 1024, 768]            4,723,968
│    └─EncoderLayer: 2-2                 [4, 1024, 768]            --
│    │    └─LayerNorm: 3-5               [4, 1024, 768]            1,536
│    │    └─MultiHeadAttention: 3-6      [4, 1024, 768]            2,362,368
│    │    └─LayerNorm: 3-7               [4, 1024, 768]            1,536
│    │    └─FeedForward: 3-8             [4,