# Mistral


![](https://ucarecdn.com/0b4b9601-ccc9-4062-a62b-cca67a82f82b/)


1-е поколение Mistral вышло в сентябре 2023 года.

Mistral получил те же улучшения, что и у Llama: RMSNorm, SwiGLU и RoPE. Но мистраль пошел дальше и добавил еще две оптимизационные фишки:


- Grouped-Query Attention (GQA)
- Sliding Window Attention (SWA)


Обе модифицируют механизм внимания. И обе предназначены для экономии памяти и ускорения вычислений.

# Masked Multi-Head Attention, ver. 2.0

В текущей реализации **Multi-Head Attention** у нас каждая голова живет своей жизнью и обрабатывается по отдельности:

<p align="center">
  <img src="https://ucarecdn.com/e4d8fadc-9817-4147-9b91-520855ba0d19/" alt="multi_head_1" width="1000" height="331">
</p>

* Класс `MultiHeadAttention` получает на вход тензор размером `batch_size × seq_len × emb_size` и передает его в каждую голову.
* В голове тензор перемножается с тремя матрицами весов: `W_k`, `W_q`, `W_v`, каждая размером `emb_size × head_size`.
* В результате получаются три матрицы: запроса (query), ключа (key) и значения (value). Каждая из них имеет размер `batch_size × seq_len × head_size`.
* Матрицы запроса (query) и ключа (key) мы поворачиваем с помощью техники RoPE.
* Матрицу ключа (key) транспонируем и перемножаем с матрицей запроса (query). В результате получается матрица внимания.
* Далее перемножаем матрицу внимания и матрицу значения (value).
* На выходе из головы получается тензор размера `batch_size × seq_len × head_size`.
* Выходы из всех голов конкатенируются и умножаются на выходные веса, что уменьшает их размер.
* На выходе из `MultiHeadAttention` у нас получается тензор такого же размера, какой поступил на вход: `batch_size × seq_len × emb_size`.

Теперь нам нужно оптимизировать вычисления и сделать так, чтобы все головы вычислялись одновременно в классе `MultiHeadAttention`. Для этого изменим алгоритм следующим образом:

![multi\_head\_2](https://ucarecdn.com/1686165f-7632-4b94-89bc-e0ed7e2ffe07/)

* Класс `MultiHeadAttention` получает на вход тензор размером `batch_size × seq_len × emb_size`.
* Тензор перемножается с тремя матрицами весов: `W_q`, `W_k`, `W_v`. Но на этот раз они имеют размер `emb_size × (num_heads * head_size)`.
  То есть, мы как бы расширили каждую матрицу весов по горизонтали на число голов.
* После перемножения получаются три матрицы: запроса (query), ключа (key) и значения (value). Каждая из них также стала шире на количество голов: `batch_size × seq_len × (num_heads * head_size)`.
* Переводим матрицы запроса (query), ключа (key) и значения (value) в форму четырехмерного тензора:
  `batch_size × num_heads × seq_len × head_size`. Это необходимо для дальнейших матричных операций.
* Матрицы запроса (query) и ключа (key) мы поворачиваем с помощью техники RoPE.
* Транспонируем тензор ключа и перемножаем его с тензором запроса. Получится матрица внимания, которая будет иметь размер
  `batch_size × num_heads × seq_len × seq_len`.
* Далее перемножаем матрицу внимания и тензор значения (value). Получается тензор размером
  `batch_size × num_heads × seq_len × head_size`. Переводим тензор в «плоский» вид:
  `batch_size × seq_len × (num_heads * head_size)`.
* Пропускаем тензор через выходную проекцию (`batch_size × (num_heads * head_size) × emb_size`), чтобы уменьшить его размер.
* На выходе из класса получается тензор точно такого же размера, какой поступил на вход:
  `batch_size × seq_len × emb_size`.

Ну и также версия с кэшем (когда на вход приходит только один токен):

![multi\_head\_3](https://ucarecdn.com/067ce912-2932-418f-9249-09a3564ca82b/)

Единственное изменение: после выполнения поворота мы объединяем текущий тензор с тензором кэшей (для векторов ключа и значения).

# RoPE, ver. 2.0 (разработка)

Первым делом нам нужно подредактировать класс `RoPE`. Сейчас он используется внутри класса `HeadAttention`, а будет использоваться внутри `MultiHeadAttention`.

Единственное явное отличие старой версии от новой — что подается на вход (в метод `forward`):

* Сейчас в него приходит тензор размера `batch_size × seq_len × head_size`.
* А будет приходить тензор размера `batch_size × num_heads × seq_len × head_size`.

<p align="center">
  <img src="https://ucarecdn.com/3aefbeed-a4e8-49a2-a950-db7d4f413d3d/" alt="rope" width="250" height="328">
</p>


In [1]:
import torch
from torch import nn
from typing import Optional


class RoPE(nn.Module):

    def __init__(self, head_size: int, max_seq_len: int, base: int = 10_000):
        super().__init__()
        assert head_size % 2 == 0, "head_size должен быть четным"

        # Вычисление частот: θ_i = base^(-2i/d) для i ∈ [0, d/2-1]
        freqs = 1.0 / (base ** (2 * torch.arange(head_size // 2).float() / head_size))

        # Позиции от 0 до max_seq_len-1
        positions = torch.arange(max_seq_len).float()

        # Внешнее произведение: m * θ_i для всех позиций и частот
        freq_matrix = positions.unsqueeze(1) * freqs.unsqueeze(0)

        # Предвычисление матриц косинусов и синусов
        self.register_buffer("cos_matrix", torch.cos(freq_matrix))
        self.register_buffer("sin_matrix", torch.sin(freq_matrix))

    def forward(self, x: torch.Tensor, start_pos: int = 0) -> torch.Tensor: # [batch_size × seq_len × head_size] [batch_size × num_heads × seq_len × head_size]
        batch_size, num_heads, seq_len, head_size = x.shape

        # Берем нужную часть матриц и приводим к типу x
        cos = self.cos_matrix[start_pos:start_pos+seq_len].to(x.dtype)  # [seq_len, head_size//2]
        sin = self.sin_matrix[start_pos:start_pos+seq_len].to(x.dtype)  # [seq_len, head_size//2]

        # Явное изменение формы для broadcasting
        cos = cos.reshape(1, 1, seq_len, head_size // 2)
        sin = sin.reshape(1, 1, seq_len, head_size // 2)

        # Разделяем на четные и нечетные компоненты по ПОСЛЕДНЕМУ измерению
        x_even = x[..., 0::2]  # [batch_size, num_heads, seq_len, head_size//2]
        x_odd = x[..., 1::2]   # [batch_size, num_heads, seq_len, head_size//2]

        # Применяем поворот: q' = q * cos(mθ) + rotate(q) * sin(mθ)
        x_rotated_even = x_even * cos - x_odd * sin
        x_rotated_odd = x_even * sin + x_odd * cos

        # Объединяем обратно в исходную размерность
        x_rotated = torch.stack([x_rotated_even, x_rotated_odd], dim=-1)
        x_rotated = x_rotated.flatten(-2)  # [batch_size, seq_len, head_size]

        return x_rotated


In [2]:
rope = RoPE(head_size=64, max_seq_len=512)
x = torch.randn(2, 8, 128, 64)  # batch=2, heads=8, seq=128, dim=64
output = rope(x)
assert output.shape == x.shape
print("✓ Форма корректна")

✓ Форма корректна


## MultiHeadAttention v2

In [3]:
import torch
from torch import nn
import torch.nn.functional as F
from typing import Optional, Tuple


class MultiHeadAttentionV2(nn.Module):

    def __init__(
        self,
        num_heads: int,
        emb_size: int,
        head_size: int,
        max_seq_len: int,
        rope: RoPE = None,
        dropout: float = 0.1,
    ):
        super().__init__()
        self._num_heads = num_heads
        self._head_size = head_size
        self._max_seq_len = max_seq_len
        self._rope = rope

        self._q = nn.Linear(emb_size, num_heads * head_size)
        self._k = nn.Linear(emb_size, num_heads * head_size)
        self._v = nn.Linear(emb_size, num_heads * head_size)

        # Создание causal маски
        mask = torch.tril(torch.ones(max_seq_len, max_seq_len))
        self.register_buffer(
            "_tril_mask", mask.bool() if hasattr(torch, "bool") else mask.byte()
        )
        
        self._layer = nn.Linear(head_size * num_heads, emb_size)
        self._dropout = nn.Dropout(dropout)

    def forward(
        self,
        x: torch.Tensor,
        mask: torch.Tensor = None,
        use_cache: bool = True,
        cache: list = None,
    ):
        batch_size, seq_len, emb_size = x.shape

        if seq_len > self._max_seq_len:
            raise ValueError(
                f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}"
            )

        # Пропустите тензор x через матрицы Wq, Wk , Wv, чтобы получить матрицы запроса, ключа и значения.
        k = self._k(x)  # [B, T, hs]
        q = self._q(x)  # [B, T, hs]
        v = self._v(x)  # [B, T, hs]

        # Шаг 2: Изменение формы для multi-head
        # [batch_size, seq_len, num_heads * head_size] 
        # -> [batch_size, seq_len, num_heads, head_size]
        q = q.reshape(batch_size, seq_len, self._num_heads, self._head_size)
        k = k.reshape(batch_size, seq_len, self._num_heads, self._head_size)
        v = v.reshape(batch_size, seq_len, self._num_heads, self._head_size)
        

        # 3. Transpose: [B, T, H, hs] -> [B, H, T, hs]
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        start_pos = 0
        if cache is not None:
            k_cache, v_cache = cache
            cache_len = k_cache.shape[2]
            start_pos = cache_len
        
        # Пропустите матрицы запроса и ключа через экземпляр rope, чтобы выполнить поворот.
        if self._rope is not None:
            # ✅ Применяем RoPE к Q и K (НЕ к V!)
            q = self._rope(q, start_pos=start_pos)  # [B, T, hs]
            k = self._rope(k, start_pos=start_pos)  # [B, T, hs]

        # Если cache пришел, то объединяем кэш и одну строку из ключа и значения. Это будут новые key и value  для последующих вычислений.
        # 5. Кэширование (для autoregressive generation)
        if cache is not None:
            k_cache, v_cache = cache
            k = torch.cat([k_cache, k], dim=2)  # Concat по seq_len (dim=2)
            v = torch.cat([v_cache, v], dim=2)

        # Перемножим матрицы запроса и ключа (транспонированную), чтобы вычислить матрицу внимания.
        # И разделить все значения в матрице внимания на корень из head_size.
        scores = q @ k.transpose(-2, -1) / (self._head_size ** 0.5)

        # Если cache пришел, то маску не накладываем. Иначе наложите на матрицу внимания треугольную маску, созданную при инициализации. Все скрытые значения должны быть приведены к минус бесконечности: float('-inf').
        if cache is None:
            scores = scores.masked_fill(
                ~self._tril_mask[:seq_len, :seq_len], float("-inf")
            )

        # Применить к матрице внимания (построчно) функцию Softmax.
        weights = F.softmax(scores, dim=-1)

        # Перемножим матрицу внимания и матрицу значения.
        x_out = weights @ v  # [B, T, hs]

        # Измените форму тензора на batch_size × seq_len × num_heads*head_size.
        # Transpose обратно и concatenate heads
        x_out = x_out.transpose(1, 2)  # [B, T_q, H, hs]
        x_out = x_out.contiguous()  # Важно для reshape!
        concatenated_attention = x_out.reshape(batch_size, seq_len, self._num_heads * self._head_size)

        #concatenated_attention = x_out.reshape(batch_size, seq_len, self._num_heads * self._head_size)

        # Пропустите получившийся тензор через последний линейный слой.
        # 3. Проецируем в пространство эмбеддингов
        projected_output = self._layer(concatenated_attention)

        # 4. Применяем dropout для регуляризации
        final_output = self._dropout(projected_output)

        if use_cache is True:
            return (final_output, (k, v))
        else:
            return (final_output, None)

In [4]:

# Параметры
batch_size = 2
seq_len = 10
emb_size = 512
num_heads = 8
head_size = 64
max_seq_len = 512

 # Создание модели
rope = RoPE(head_size=head_size, max_seq_len=max_seq_len)
mha = MultiHeadAttentionV2(
    num_heads=num_heads,
    emb_size=emb_size,
    head_size=head_size,
    max_seq_len=max_seq_len,
    rope=rope,
    dropout=0.1,
)

 # Тест 1: Обычный forward pass
x = torch.randn(batch_size, seq_len, emb_size)
output, cache = mha(x, use_cache=False)
print(f"✅ Test 1 - Output shape: {output.shape}")  # [2, 10, 512]
assert output.shape == (batch_size, seq_len, emb_size)

 # Тест 2: С кэшированием
x1 = torch.randn(batch_size, 5, emb_size)
output1, cache1 = mha(x1, use_cache=True)
print(f"✅ Test 2 - First output shape: {output1.shape}")  # [2, 5, 512]

x2 = torch.randn(batch_size, 1, emb_size)
output2, cache2 = mha(x2, use_cache=True, cache=cache1)
print(f"✅ Test 2 - Second output shape: {output2.shape}")  # [2, 1, 512]

print("\n✅ Все тесты пройдены!")

✅ Test 1 - Output shape: torch.Size([2, 10, 512])
✅ Test 2 - First output shape: torch.Size([2, 5, 512])
✅ Test 2 - Second output shape: torch.Size([2, 1, 512])

✅ Все тесты пройдены!


### Промежуточный вариант Mistral

In [5]:
import torch
from torch import nn
from torch import Tensor
import torch.nn.functional as F
from math import sqrt



class SiLU(nn.Module):
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        return torch.sigmoid(x) * x
    
class RMSNorm(nn.Module):
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self._eps = eps
        self._w = nn.Parameter(torch.ones(dim))
    
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        rms = (x.pow(2).mean(-1, keepdim=True) + self._eps) ** 0.5
        norm_x = x / rms
        return self._w * norm_x

class SwiGLU(nn.Module):
    def __init__(self, emb_size: int, dropout: float = 0.1):
        super().__init__()

        self._gate = nn.Linear(emb_size, 4 * emb_size)
        self._up = nn.Linear(emb_size, 4 * emb_size)
        self._down = nn.Linear(4 * emb_size, emb_size)
        self._activation = SiLU()
        self._dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size].
        gate_out = self._gate(x)                          # [batch, seq, 4*emb]
        activation_out = self._activation(gate_out)       # [batch, seq, 4*emb]
        up_out = self._up(x)                              # [batch, seq, 4*emb]
        out = up_out * activation_out                     # поэлементное!
        out = self._down(out)                             # [batch, seq, emb]
        return self._dropout(out)


class TokenEmbeddings(nn.Module):
    def __init__(self, vocab_size: int, emb_size: int):
        super().__init__()
        self._embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=emb_size
        )

    def forward(self, x: Tensor) -> Tensor:
        return self._embedding(x)

    @property
    def num_embeddings(self) -> int:
        return self._embedding.num_embeddings

    @property
    def embedding_dim(self) -> int:
        return self._embedding.embedding_dim


class GELU(nn.Module):
    def __init__(self):
        super().__init__()
        self.sqrt_2_over_pi = torch.sqrt(torch.tensor(2.0) / math.pi)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return 0.5 * x * (1 + torch.tanh(
            self.sqrt_2_over_pi * (x + 0.044715 * torch.pow(x, 3))
        ))
    
class Decoder(nn.Module):
    def __init__(self, 
        num_heads: int,
        emb_size: int,
        head_size: int,
        max_seq_len: int,
        rope: RoPE,
        dropout: float = 0.1
    ):
        super().__init__()
        self._heads = MultiHeadAttentionV2(
            num_heads=num_heads, 
            emb_size=emb_size, 
            head_size=head_size, 
            max_seq_len=max_seq_len,
            rope=rope,
            dropout=dropout
        )
        self._ff = SwiGLU(emb_size=emb_size, dropout=dropout)
        self._norm1 = RMSNorm(emb_size)
        self._norm2 = RMSNorm(emb_size)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None, use_cache: bool = True, cache: list = None) -> torch.Tensor:
        norm1_out = self._norm1(x)
        attention, kv_caches = self._heads(norm1_out, mask, use_cache=use_cache, cache=cache)
        out = attention + x
        
        norm2_out = self._norm2(out)
        ffn_out = self._ff(norm2_out)

        if use_cache is True:
            return (ffn_out + out, kv_caches)
        else:
            return (ffn_out + out, None)



from torch import nn
import torch
import torch.nn.functional as F

class Mistral(nn.Module):
    def __init__(self,
        vocab_size: int,
        max_seq_len: int,
        emb_size: int,
        num_heads: int,
        head_size: int,
        num_layers: int,
        dropout: float = 0.1,
        device: str = 'cpu'
    ):
        super().__init__()
        self._vocab_size = vocab_size
        self._max_seq_len = max_seq_len
        self._emb_size = emb_size
        self._num_heads = num_heads
        self._head_size = head_size
        self._num_layers = num_layers
        self._dropout = dropout
        self._device = device
        
        self.validation_loss = None

        # Инициализация слоев
        self._token_embeddings = TokenEmbeddings(
            vocab_size=vocab_size, 
            emb_size=emb_size
        )
        self._position_embeddings = RoPE(
            head_size=head_size,
            max_seq_len=max_seq_len
        )
        #self._position_embeddings = PositionalEmbeddings(
        #    max_seq_len=max_seq_len, 
        #    emb_size=emb_size
        #)
        self._dropout = nn.Dropout(dropout)
        self._decoders = nn.ModuleList([Decoder(
            num_heads=num_heads,
            emb_size=emb_size,
            head_size=head_size,
            max_seq_len=max_seq_len,
            rope=self._position_embeddings,
            dropout=dropout 
        ) for _ in range(num_layers)])
        self._norm = RMSNorm(emb_size)
        self._linear = nn.Linear(emb_size, vocab_size)

    def forward(self, x: torch.Tensor, use_cache: bool = True, cache: list = None) -> tuple:
        # Проверка длины последовательности (только при отсутствии кэша)
        if cache is None and x.size(1) > self._max_seq_len:
            raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}")
        
        # Эмбеддинги токенов и позиций
        tok_out = self._token_embeddings(x)  # [batch, seq_len, emb_size]
       #pos_out = self._position_embeddings(x)  # [batch, seq_len, emb_size]
        
        # Комбинирование
        out = self._dropout(tok_out)  # [batch, seq_len, emb_size]
        
        # Стек декодеров с передачей кэша
        new_cache = []
        for i, decoder in enumerate(self._decoders):
            decoder_cache = cache[i] if cache is not None else None
            decoder_result = decoder(out, use_cache=use_cache, cache=decoder_cache)

            # Извлекаем результат из кортежа
            if use_cache:
                out, decoder_new_cache = decoder_result
                new_cache.append(decoder_new_cache)
            else:
                out = decoder_result[0]

        out = self._norm(out)
        logits = self._linear(out)
            
        # Возвращаем результат с учетом use_cache
        if use_cache:
            return (logits, new_cache)
        else:
            return (logits, None)

    def generate(self,
        x: torch.Tensor, 
        max_new_tokens: int, 
        do_sample: bool,
        temperature: float = 1.0,
        top_k: int = None,
        top_p: float = None,
        use_cache: bool = True
    ) -> torch.Tensor:
        cache = None

        for _ in range(max_new_tokens):
            if use_cache and cache is not None:
                # Используем кэш - передаем только последний токен
                x_input = x[:, -1:]  # [batch_size, 1]
            else:
                # Первая итерация или кэш отключен - передаем всю последовательность
                x_input = x
            
            # Прямой проход с кэшем
            logits, new_cache = self.forward(x_input, use_cache=use_cache, cache=cache)
            
            # Обновляем кэш для следующей итерации
            if use_cache:
                cache = new_cache

            last_logits = logits[:, -1, :]  # [batch_size, vocab_size]

            # Масштабируем логиты температурой
            if temperature > 0:
                logits_scaled = last_logits / temperature
            else:
                logits_scaled = last_logits

            if do_sample == True and top_k != None:
                _, topk_indices = torch.topk(logits_scaled, top_k, dim=-1)

                # # Заменим все НЕ top-k логиты на -inf
                masked_logits = logits_scaled.clone()
                vocab_size = logits_scaled.size(-1)

                # создаём маску: 1, если токен НЕ в topk_indices
                mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
                mask.scatter_(1, topk_indices, 0)  # 0 там, где top-k индексы
                masked_logits[mask.byte()] = float('-inf')

                logits_scaled = masked_logits

            if do_sample == True and top_p != None:
                # 1. Применим softmax, чтобы получить вероятности:
                probs = F.softmax(logits_scaled, dim=-1)  # [B, vocab_size]
                # 2. Отсортируем токены по убыванию вероятностей:
                sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
                # 3. Посчитаем кумулятивную сумму вероятностей:
                cum_probs = torch.cumsum(sorted_probs, dim=-1)  # [B, vocab_size]
                # 4. Определим маску: оставить токены, пока сумма < top_p
                sorted_mask = (cum_probs <= top_p).byte()  # [B, vocab_size]
                # Гарантируем, что хотя бы первый токен останется
                sorted_mask[:, 0] = 1
                # 5. Преобразуем маску обратно в оригинальный порядок:
                # Создаём полную маску из 0
                mask = torch.zeros_like(probs, dtype=torch.uint8)
                # Устанавливаем 1 в местах нужных токенов
                mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
                # 6. Зануляем логиты токенов вне топ-p:
                logits_scaled[~mask] = float('-inf')

            # 4. Применяем Softmax
            probs = F.softmax(logits_scaled, dim=-1)  # [batch_size, vocab_size]


            if do_sample == True:
                # 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
                next_token = torch.multinomial(probs, num_samples=1)  # [batch_size, 1]
            else:
                # 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
                next_token = torch.argmax(probs, dim=-1, keepdim=True)  # [batch_size, 1]
            
            # 6. Добавляем его к последовательности
            x = torch.cat([x, next_token], dim=1)  # [batch_size, seq_len+1]
        return x

    def save(self, path):
        torch.save({
            'model_state_dict': self.state_dict(),
            'vocab_size': self._vocab_size,
            'max_seq_len': self._max_seq_len,
            'emb_size': self._emb_size,
            'num_heads': self._num_heads,
            'head_size': self._head_size,
            'num_layers': self._num_layers
        }, path)

    @classmethod
    def load(cls, path, device):
        checkpoint = torch.load(path, map_location=device)
        model = cls(
            vocab_size=checkpoint['vocab_size'],
            max_seq_len=checkpoint['max_seq_len'],
            emb_size=checkpoint['emb_size'],
            num_heads=checkpoint['num_heads'],
            head_size=checkpoint['head_size'],
            num_layers=checkpoint['num_layers']
        )
        model.load_state_dict(checkpoint['model_state_dict'])
        model.to(device)
        return model

    @property
    def max_seq_len(self) -> int:
        return self._max_seq_len






# Grouped-Query Attention

**Grouped-Query Attention (GQA)** — это оптимизированный механизм внимания.

В чем суть: в классическом **Multi-Head Attention (MHA)** на каждую голову приходится по три вектора: запроса, ключа и значения. Эти вектора существуют только внутри голов, где они взаимодействуют между собой.

<p align="center">
  <img src="https://ucarecdn.com/12f3e161-dbc8-4bf5-acb2-4f78cebfb3ee/" alt="gqa_1" width="399" height="237">
</p>

А в **GQA** предложили сэкономить на матрицах: разделить головы на группы и на каждую группу назначить по одному вектору ключа и значения.
При этом на каждую голову по-прежнему приходится один вектор запроса.

<p align="center">
  <img src="https://ucarecdn.com/3e3dce50-29ee-4705-9e2d-84478d581c34/" alt="gqa_2" width="399" height="237">
</p>

Что мы в результате получаем:

* **Скорость:** генерация текста происходит на 30–40% быстрее, чем в MHA.
* **Память:** экономия места в Q/G раз (где Q — количество векторов запроса, G — количество групп). Также снижается трафик памяти до 8–10 раз по сравнению с MHA.
* **Качество:** близко к MHA.

> В первом Mistral было 32 Query Heads и 8 K/V Heads.

---

### Как это работает технически?

На первых шагах этого урока мы переделали механизм внимания.
Избавились от отдельных голов и сделали единое пространство для вычислений всех голов одновременно.
Каждая голова теперь представлена отдельными измерениями в одном длинном тензоре.
Вот как это выглядит (здесь представлена часть механизма внимания):

<p align="center">
  <img src="https://ucarecdn.com/ff92d0ec-987e-48e3-b108-dad75e55866e/" alt="gqa_3" width="800" height="278">
</p>

* На вход мы получаем тензор размером `batch_size × seq_len × emb_size`.
* Тензор перемножается с тремя матрицами весов: $W_q$, $W_k$, $W_v$, каждая размером `emb_size × num_heads * head_size`.
* После перемножения получаются три матрицы: запроса (query), ключа (key) и значения (value), каждая размером `batch_size × seq_len × num_heads * head_size`.
* Переводим матрицы запроса (query), ключа (key) и значения (value) в форму четырехмерного тензора:
  `batch_size × num_heads × seq_len × head_size`.
* Выполняем поворот тензоров запроса (query) и ключа (key).
* Дальше ничего не меняется...

---

И вот как нам надо это переделать:

<p align="center">
  <img src="https://ucarecdn.com/4b5bd1e6-3aaa-4fd9-8ceb-6f521f5a23a6/" alt="gqa_4" width="800" height="279">
</p>

* На вход мы получаем тензор размером `batch_size × seq_len × emb_size`.
* Тензор перемножается с тремя матрицами весов: $W_q$, $W_k$, $W_v$:

  * $W_q$ — такого же размера, как и раньше: `emb_size × num_q_heads * head_size`.
  * А вот $W_k$ и $W_v$ уменьшились на количество K/V голов: `emb_size × num_kv_heads * head_size`.
* После перемножения получаются три матрицы:

  * **Запрос (query)** — `batch_size × seq_len × num_q_heads * head_size`.
  * **Ключ (key)** и **значение (value)** — `batch_size × seq_len × num_kv_heads * head_size`.
* Переводим их в форму четырехмерного тензора:

  * **Query:** `batch_size × num_q_heads × seq_len × head_size`.
  * **Key, Value:** `batch_size × num_kv_heads × seq_len × head_size`.
* Выполняем поворот тензоров запроса (query) и ключа (key).
* Затем проводим **уникальную операцию — размножение**.
  Нам нужно произвести матричные операции с тензорами, но у них разный размер, что делает перемножение невозможным.
  Чтобы исправить это, нужно продублировать головы в тензорах **query** и **key**, чтобы их размер стал одинаковым:
  `batch_size × num_q_heads × seq_len × head_size`.
  Копии располагаются последовательно — после каждой головы идут её дубликаты.
* Дальнейшие операции остаются без изменений.

> Может показаться, что с точки зрения использования памяти мы пришли к тому, с чего начали.
> У нас тензор K и V получился такого же размера, как и тензор Q.
> Но это только по внешнему виду. Расширение происходит **виртуально** — в памяти место не дублируется.

---

Ну и версия для кэша:

<p align="center">
  <img src="https://ucarecdn.com/1f66d02b-ff97-4a33-ae76-297eb002533d/" alt="gqa_5" width="800" height="281">
</p>

Единственное отличие: после операции поворота и до размножения голов мы склеиваем текущий токен с кэшем.

---

### Почему именно K и V?

Любопытный читатель спросит: а почему мы сократили количество именно **K** и **V**?
Почему не **Q и V**, или не **Q и K**?

Дело в роли, которую играют вектора. Уже знакомая нам аналогия с библиотекой:

* **Query** — это читатели с разными запросами (один ищет научную книгу, другой — художественную).
* **Key** — это каталог карточек (индексы книг).
* **Value** — это сами книги на полках.

У каждого читателя свой уникальный запрос (**Q**), очевидно, их нельзя копировать на других читателей.
Одни и те же каталог (**K**) и книги (**V**) разделены на секции (группы).
Несколько читателей могут использовать одну секцию каталога/книг, но их запросы остаются уникальными.



### Grouped-Query Attention (разработка)

In [6]:
import torch
from torch import nn
import torch.nn.functional as F
from typing import Optional, Tuple


class GroupedQueryAttention(nn.Module):

    def __init__(
        self,
        num_heads: int,
        num_kv_heads: int,
        emb_size: int,
        head_size: int,
        max_seq_len: int,
        rope: RoPE = None,
        dropout: float = 0.1,
    ):
        super().__init__()
        self._num_heads = num_heads
        self._num_kv_heads = num_kv_heads
        self._head_size = head_size
        self._max_seq_len = max_seq_len
        self._rope = rope

        self._q = nn.Linear(emb_size, num_heads * head_size)
        self._k = nn.Linear(emb_size, num_kv_heads * head_size)
        self._v = nn.Linear(emb_size, num_kv_heads * head_size)

        # Создание causal маски
        mask = torch.tril(torch.ones(max_seq_len, max_seq_len))
        self.register_buffer(
            "_tril_mask", mask.bool() if hasattr(torch, "bool") else mask.byte()
        )
        
        self._layer = nn.Linear(head_size * num_heads, emb_size)
        self._dropout = nn.Dropout(dropout)

    def forward(
        self,
        x: torch.Tensor,
        mask: torch.Tensor = None,
        use_cache: bool = True,
        cache: list = None,
    ):
        batch_size, seq_len, emb_size = x.shape

        if seq_len > self._max_seq_len:
            raise ValueError(
                f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}"
            )

        # Пропустите тензор x через матрицы Wq, Wk , Wv, чтобы получить матрицы запроса, ключа и значения.
        k = self._k(x)  # [B, T, hs]
        q = self._q(x)  # [B, T, hs]
        v = self._v(x)  # [B, T, hs]

        # Шаг 2: Изменение формы для multi-head
        # [batch_size, seq_len, num_heads * head_size] 
        # -> [batch_size, seq_len, num_heads, head_size]
        # Измените форму запроса (query) на batch_size × num_q_heads × seq_len × head_size.
        q = q.reshape(batch_size, seq_len, self._num_heads, self._head_size)

        # Измените форму ключа (key) и значения (value) на batch_size × num_kv_heads × seq_len × head_size.
        k = k.reshape(batch_size, seq_len, self._num_kv_heads, self._head_size)
        v = v.reshape(batch_size, seq_len, self._num_kv_heads, self._head_size)
        

        # 3. Transpose: [B, T, H, hs] -> [B, H, T, hs]
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        start_pos = 0
        if cache is not None:
            k_cache, v_cache = cache
            cache_len = k_cache.shape[2]
            start_pos = cache_len
        
        # Пропустите матрицы запроса и ключа через экземпляр rope, чтобы выполнить поворот.
        if self._rope is not None:
            # ✅ Применяем RoPE к Q и K (НЕ к V!)
            q = self._rope(q, start_pos=start_pos)  # [B, T, hs]
            k = self._rope(k, start_pos=start_pos)  # [B, T, hs]

        # Если cache пришел, то объединяем кэш и одну строку из ключа и значения. Это будут новые key и value  для последующих вычислений.
        # 5. Кэширование (для autoregressive generation)
        if cache is not None:
            k_cache, v_cache = cache
            k = torch.cat([k_cache, k], dim=2)  # Concat по seq_len (dim=2)
            v = torch.cat([v_cache, v], dim=2)

        # Если use_cache == True, то сохраните матрицы ключа и значения для кэша (это нужно сделать до дублирования голов).
        if use_cache == True:
            kv_cache = (k, v)

        # Продублируйте головы в тензорах ключа (key) и значения (value), чтобы получился тензор размера на batch_size × num_q_heads × seq_len × head_size.
        k = self._repeat_kv_heads(k, self._num_heads, self._num_kv_heads)
        v = self._repeat_kv_heads(v, self._num_heads, self._num_kv_heads)

        # Перемножим матрицы запроса и ключа (транспонированную), чтобы вычислить матрицу внимания.
        # И разделить все значения в матрице внимания на корень из head_size.
        scores = q @ k.transpose(-2, -1) / (self._head_size ** 0.5)

        # Если cache пришел, то маску не накладываем. Иначе наложите на матрицу внимания треугольную маску, созданную при инициализации. Все скрытые значения должны быть приведены к минус бесконечности: float('-inf').
        if cache is None:
            scores = scores.masked_fill(
                ~self._tril_mask[:seq_len, :seq_len], float("-inf")
            )

        # Применить к матрице внимания (построчно) функцию Softmax.
        weights = F.softmax(scores, dim=-1)

        # Перемножим матрицу внимания и матрицу значения.
        x_out = weights @ v  # [B, T, hs]

        # Измените форму тензора на batch_size × seq_len × num_heads*head_size.
        # Transpose обратно и concatenate heads
        x_out = x_out.transpose(1, 2)  # [B, T_q, H, hs]
        x_out = x_out.contiguous()  # Важно для reshape!
        concatenated_attention = x_out.reshape(batch_size, seq_len, self._num_heads * self._head_size)

        #concatenated_attention = x_out.reshape(batch_size, seq_len, self._num_heads * self._head_size)

        # Пропустите получившийся тензор через последний линейный слой.
        # 3. Проецируем в пространство эмбеддингов
        projected_output = self._layer(concatenated_attention)

        # 4. Применяем dropout для регуляризации
        final_output = self._dropout(projected_output)

        if use_cache is True:
            return (final_output, kv_cache)
        else:
            return (final_output, None)

    def _repeat_kv_heads(
        self,
        kv: torch.Tensor,
        num_q_heads: int,
        num_kv_heads: int
    ) -> torch.Tensor:
        """
        Дублирует головы K/V для соответствия количеству голов Q.

        Args:
            kv: [batch_size, num_kv_heads, seq_len, head_size]
            num_q_heads: Количество голов Query (например, 8)
            num_kv_heads: Количество голов Key/Value (например, 2)

        Returns:
            [batch_size, num_q_heads, seq_len, head_size]

        Example:
            num_q_heads=8, num_kv_heads=2
            Каждая голова KV дублируется 4 раза:
            [KV0, KV1] -> [KV0, KV0, KV0, KV0, KV1, KV1, KV1, KV1]
        """
        batch_size, num_kv_heads, seq_len, head_size = kv.shape

        if num_q_heads == num_kv_heads:
            # Нет необходимости дублировать
            return kv

        # Вычисляем сколько раз нужно повторить каждую голову
        num_repeats = num_q_heads // num_kv_heads

        # repeat_interleave дублирует каждую голову num_repeats раз
        # [B, num_kv_heads, S, hs] -> [B, num_q_heads, S, hs]
        # [B, num_kv_heads, S, hs] -> [B, num_kv_heads, 1, S, hs]
        kv = kv.unsqueeze(2)
        
        # [B, num_kv_heads, 1, S, hs] -> [B, num_kv_heads, num_repeats, S, hs]
        kv = kv.repeat(1, 1, num_repeats, 1, 1)
        
        # [B, num_kv_heads, num_repeats, S, hs] -> [B, num_q_heads, S, hs]
        kv = kv.reshape(batch_size, num_q_heads, seq_len, head_size)
        

        return kv

### Промежуточный вариант Mistral

In [7]:
import torch
from torch import nn
from torch import Tensor
import torch.nn.functional as F
from math import sqrt



class SiLU(nn.Module):
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        return torch.sigmoid(x) * x
    
class RMSNorm(nn.Module):
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self._eps = eps
        self._w = nn.Parameter(torch.ones(dim))
    
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        rms = (x.pow(2).mean(-1, keepdim=True) + self._eps) ** 0.5
        norm_x = x / rms
        return self._w * norm_x

class SwiGLU(nn.Module):
    def __init__(self, emb_size: int, dropout: float = 0.1):
        super().__init__()

        self._gate = nn.Linear(emb_size, 4 * emb_size)
        self._up = nn.Linear(emb_size, 4 * emb_size)
        self._down = nn.Linear(4 * emb_size, emb_size)
        self._activation = SiLU()
        self._dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size].
        gate_out = self._gate(x)                          # [batch, seq, 4*emb]
        activation_out = self._activation(gate_out)       # [batch, seq, 4*emb]
        up_out = self._up(x)                              # [batch, seq, 4*emb]
        out = up_out * activation_out                     # поэлементное!
        out = self._down(out)                             # [batch, seq, emb]
        return self._dropout(out)


class TokenEmbeddings(nn.Module):
    def __init__(self, vocab_size: int, emb_size: int):
        super().__init__()
        self._embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=emb_size
        )

    def forward(self, x: Tensor) -> Tensor:
        return self._embedding(x)

    @property
    def num_embeddings(self) -> int:
        return self._embedding.num_embeddings

    @property
    def embedding_dim(self) -> int:
        return self._embedding.embedding_dim


class GELU(nn.Module):
    def __init__(self):
        super().__init__()
        self.sqrt_2_over_pi = torch.sqrt(torch.tensor(2.0) / math.pi)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return 0.5 * x * (1 + torch.tanh(
            self.sqrt_2_over_pi * (x + 0.044715 * torch.pow(x, 3))
        ))


class Decoder(nn.Module):
    def __init__(self, 
        num_q_heads: int,
        num_kv_heads: int,
        emb_size: int,
        head_size: int,
        max_seq_len: int,
        rope: RoPE,
        dropout: float = 0.1
    ):
        super().__init__()
        self._heads = GroupedQueryAttention(
            num_heads=num_q_heads, 
            num_kv_heads=num_kv_heads,
            emb_size=emb_size, 
            head_size=head_size, 
            max_seq_len=max_seq_len,
            rope=rope,
            dropout=dropout
        )
        self._ff = SwiGLU(emb_size=emb_size, dropout=dropout)
        self._norm1 = RMSNorm(emb_size)
        self._norm2 = RMSNorm(emb_size)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None, use_cache: bool = True, cache: list = None) -> torch.Tensor:
        norm1_out = self._norm1(x)
        attention, kv_caches = self._heads(norm1_out, mask, use_cache=use_cache, cache=cache)
        out = attention + x
        
        norm2_out = self._norm2(out)
        ffn_out = self._ff(norm2_out)

        if use_cache is True:
            return (ffn_out + out, kv_caches)
        else:
            return (ffn_out + out, None)



from torch import nn
import torch
import torch.nn.functional as F

class Mistral(nn.Module):
    def __init__(self,
        vocab_size: int,
        max_seq_len: int,
        emb_size: int,
        num_q_heads: int,
        num_kv_heads: int,
        head_size: int,
        num_layers: int,
        dropout: float = 0.1,
        device: str = 'cpu'
    ):
        super().__init__()
        self._vocab_size = vocab_size
        self._max_seq_len = max_seq_len
        self._emb_size = emb_size
        self._num_q_heads = num_q_heads
        self._num_kv_heads = num_kv_heads
        self._head_size = head_size
        self._num_layers = num_layers
        self._dropout = dropout
        self._device = device
        
        self.validation_loss = None

        # Инициализация слоев
        self._token_embeddings = TokenEmbeddings(
            vocab_size=vocab_size, 
            emb_size=emb_size
        )
        self._position_embeddings = RoPE(
            head_size=head_size,
            max_seq_len=max_seq_len
        )
        #self._position_embeddings = PositionalEmbeddings(
        #    max_seq_len=max_seq_len, 
        #    emb_size=emb_size
        #)
        self._dropout = nn.Dropout(dropout)
        self._decoders = nn.ModuleList([Decoder(
            num_q_heads=num_q_heads,
            num_kv_heads=num_kv_heads,
            emb_size=emb_size,
            head_size=head_size,
            max_seq_len=max_seq_len,
            rope=self._position_embeddings,
            dropout=dropout 
        ) for _ in range(num_layers)])
        self._norm = RMSNorm(emb_size)
        self._linear = nn.Linear(emb_size, vocab_size)

    def forward(self, x: torch.Tensor, use_cache: bool = True, cache: list = None) -> tuple:
        # Проверка длины последовательности (только при отсутствии кэша)
        if cache is None and x.size(1) > self._max_seq_len:
            raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}")
        
        # Эмбеддинги токенов и позиций
        tok_out = self._token_embeddings(x)  # [batch, seq_len, emb_size]
       #pos_out = self._position_embeddings(x)  # [batch, seq_len, emb_size]
        
        # Комбинирование
        out = self._dropout(tok_out)  # [batch, seq_len, emb_size]
        
        # Стек декодеров с передачей кэша
        new_cache = []
        for i, decoder in enumerate(self._decoders):
            decoder_cache = cache[i] if cache is not None else None
            decoder_result = decoder(out, use_cache=use_cache, cache=decoder_cache)

            # Извлекаем результат из кортежа
            if use_cache:
                out, decoder_new_cache = decoder_result
                new_cache.append(decoder_new_cache)
            else:
                out = decoder_result[0]

        out = self._norm(out)
        logits = self._linear(out)
            
        # Возвращаем результат с учетом use_cache
        if use_cache:
            return (logits, new_cache)
        else:
            return (logits, None)

    def generate(self,
        x: torch.Tensor, 
        max_new_tokens: int, 
        do_sample: bool,
        temperature: float = 1.0,
        top_k: int = None,
        top_p: float = None,
        use_cache: bool = True
    ) -> torch.Tensor:
        cache = None

        for _ in range(max_new_tokens):
            if use_cache and cache is not None:
                # Используем кэш - передаем только последний токен
                x_input = x[:, -1:]  # [batch_size, 1]
            else:
                # Первая итерация или кэш отключен - передаем всю последовательность
                x_input = x
            
            # Прямой проход с кэшем
            logits, new_cache = self.forward(x_input, use_cache=use_cache, cache=cache)
            
            # Обновляем кэш для следующей итерации
            if use_cache:
                cache = new_cache

            last_logits = logits[:, -1, :]  # [batch_size, vocab_size]

            # Масштабируем логиты температурой
            if temperature > 0:
                logits_scaled = last_logits / temperature
            else:
                logits_scaled = last_logits

            if do_sample == True and top_k != None:
                _, topk_indices = torch.topk(logits_scaled, top_k, dim=-1)

                # # Заменим все НЕ top-k логиты на -inf
                masked_logits = logits_scaled.clone()
                vocab_size = logits_scaled.size(-1)

                # создаём маску: 1, если токен НЕ в topk_indices
                mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
                mask.scatter_(1, topk_indices, 0)  # 0 там, где top-k индексы
                masked_logits[mask.byte()] = float('-inf')

                logits_scaled = masked_logits

            if do_sample == True and top_p != None:
                # 1. Применим softmax, чтобы получить вероятности:
                probs = F.softmax(logits_scaled, dim=-1)  # [B, vocab_size]
                # 2. Отсортируем токены по убыванию вероятностей:
                sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
                # 3. Посчитаем кумулятивную сумму вероятностей:
                cum_probs = torch.cumsum(sorted_probs, dim=-1)  # [B, vocab_size]
                # 4. Определим маску: оставить токены, пока сумма < top_p
                sorted_mask = (cum_probs <= top_p).byte()  # [B, vocab_size]
                # Гарантируем, что хотя бы первый токен останется
                sorted_mask[:, 0] = 1
                # 5. Преобразуем маску обратно в оригинальный порядок:
                # Создаём полную маску из 0
                mask = torch.zeros_like(probs, dtype=torch.uint8)
                # Устанавливаем 1 в местах нужных токенов
                mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
                # 6. Зануляем логиты токенов вне топ-p:
                logits_scaled[~mask] = float('-inf')

            # 4. Применяем Softmax
            probs = F.softmax(logits_scaled, dim=-1)  # [batch_size, vocab_size]


            if do_sample == True:
                # 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
                next_token = torch.multinomial(probs, num_samples=1)  # [batch_size, 1]
            else:
                # 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
                next_token = torch.argmax(probs, dim=-1, keepdim=True)  # [batch_size, 1]
            
            # 6. Добавляем его к последовательности
            x = torch.cat([x, next_token], dim=1)  # [batch_size, seq_len+1]
        return x

    def save(self, path):
        torch.save({
            'model_state_dict': self.state_dict(),
            'vocab_size': self._vocab_size,
            'max_seq_len': self._max_seq_len,
            'emb_size': self._emb_size,
            'num_heads': self._num_heads,
            'head_size': self._head_size,
            'num_layers': self._num_layers
        }, path)

    @classmethod
    def load(cls, path, device):
        checkpoint = torch.load(path, map_location=device)
        model = cls(
            vocab_size=checkpoint['vocab_size'],
            max_seq_len=checkpoint['max_seq_len'],
            emb_size=checkpoint['emb_size'],
            num_heads=checkpoint['num_heads'],
            head_size=checkpoint['head_size'],
            num_layers=checkpoint['num_layers']
        )
        model.load_state_dict(checkpoint['model_state_dict'])
        model.to(device)
        return model

    @property
    def max_seq_len(self) -> int:
        return self._max_seq_len






# Sliding Window Attention

Sliding Window Attention (SWA) – это еще один из вариантов сэкономить на механизме внимания.

Суть его проста: **SWA ограничивает длину видимого контекста** (в механизме внимания) для оптимизации вычислений.

Чтобы стало понятнее, вспомним урок про внимание — а точнее *матрицу внимания*. Мы накладывали на нее треугольную маску, чтобы каждый токен мог видеть только предыдущие токены. Это нужно было, чтобы воспроизвести реальный инференс, когда модель не видит будущие токены. Но назад эта видимость ограничивалась только максимальной длиной контекста.

**SWA** же предлагает ограничить еще и видимость токенов назад.
Теперь токены не видят ничего вперед и видят только `n` токенов назад.

<p align="center">
  <img src="https://ucarecdn.com/dffea3a2-04a5-4153-a345-a102e8afce8c/" alt="swa" width="650" height="340">
</p>

У такого решения есть три преимущества:

* **Концентрация:** по идее, чем дальше от токена контекст, тем меньше он на него влияет и тем больше шума. И теория гласит, что ограничивая внимание определенным окном, мы тем самым помогаем модели сконцентрироваться на более важных вещах.
* **Вычисления:** для подобных масок разрабатываются специальные CUDA-ядра, благодаря которым в вычислениях участвуют только значения, свободные от маски. В результате мы получаем значительную экономию при инференсе. Нам такая разработка не светит 🙂, но имейте в виду, что в промышленных моделях так и поступают.
* **Кэш:** чем больше мы генерируем текста, тем больше разрастается кэш. И это может стать проблемой. Но при SWA мы смотрим только на определенное количество токенов назад. А значит, нам нужно хранить в кэше не больше токенов, чем задана видимость в SWA.



# Sliding Window Attention (разработка)

В класс `GroupedQueryAttention` необходимо внести следующие изменения:

* Добавьте (перед `dropout`) новый параметр:

  * `window_size` (тип `int`) — определяет, как далеко токены смогут смотреть в прошлое.
* Замените предварительно созданную маску на новую: в ней каждый токен должен видеть только себя и `window_size` предыдущих токенов.
* **Кэш:** при формировании кэша ключа и значения для возврата необходимо обрезать тензор, чтобы остались только последние `window_size` строк.
* **Применение маски.** Теперь у нас есть две версии:

  * Если пришел пустой кэш, то накладывается полная (квадратная) маска.
  * Если на вход пришел кэш, то у нас тензор матрицы внимания будет в виде одной строки. Поэтому наложите на матрицу внимания маску размером `[k_seq_len, :k_seq_len]`, где `k_seq_len` — количество строк в матрице ключа после объединения ее с кэшем.
    **З.Ы.** Раньше мы оставляли одну строку как есть, т.к. это была последняя строка и она должна была видеть все токены. Но теперь и на одну строку надо также накладывать маску, чтобы ограничить видимость прошлых токенов.

In [8]:
import torch
from torch import nn
import torch.nn.functional as F
from typing import Optional, Tuple


    
class GroupedQueryAttention(nn.Module):

    def __init__(
        self,
        num_q_heads: int,
        num_kv_heads: int,
        emb_size: int,
        head_size: int,
        max_seq_len: int,
        window_size: int,
        rope: RoPE = None,
        dropout: float = 0.1,
    ):
        super().__init__()
        self._num_heads = num_q_heads
        self._num_kv_heads = num_kv_heads
        self._head_size = head_size
        self._max_seq_len = max_seq_len
        self._rope = rope
        self._window_size = window_size

        self._q = nn.Linear(emb_size, self._num_heads * head_size)
        self._k = nn.Linear(emb_size, num_kv_heads * head_size)
        self._v = nn.Linear(emb_size, num_kv_heads * head_size)

        # Создание causal маски
        mask = self._create_sliding_window_mask(max_seq_len, self._window_size)
        self.register_buffer(
            "_tril_mask", mask.bool() if hasattr(torch, "bool") else mask.byte()
        )
        
        self._layer = nn.Linear(head_size * self._num_heads, emb_size)
        self._dropout = nn.Dropout(dropout)

    def forward(
        self,
        x: torch.Tensor,
        mask: torch.Tensor = None,
        use_cache: bool = True,
        cache: list = None,
    ):
        batch_size, seq_len, emb_size = x.shape

        if seq_len > self._max_seq_len:
            raise ValueError(
                f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}"
            )

        # Пропустите тензор x через матрицы Wq, Wk , Wv, чтобы получить матрицы запроса, ключа и значения.
        k = self._k(x)  # [B, T, hs]
        q = self._q(x)  # [B, T, hs]
        v = self._v(x)  # [B, T, hs]

        # Шаг 2: Изменение формы для multi-head
        # [batch_size, seq_len, num_heads * head_size] 
        # -> [batch_size, seq_len, num_heads, head_size]
        # Измените форму запроса (query) на batch_size × num_q_heads × seq_len × head_size.
        q = q.reshape(batch_size, seq_len, self._num_heads, self._head_size)

        # Измените форму ключа (key) и значения (value) на batch_size × num_kv_heads × seq_len × head_size.
        k = k.reshape(batch_size, seq_len, self._num_kv_heads, self._head_size)
        v = v.reshape(batch_size, seq_len, self._num_kv_heads, self._head_size)
     

        # 3. Transpose: [B, T, H, hs] -> [B, H, T, hs]
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        start_pos = 0
        if cache is not None:
            k_cache, v_cache = cache
            cache_len = k_cache.shape[2]
            start_pos = cache_len
        
        # Пропустите матрицы запроса и ключа через экземпляр rope, чтобы выполнить поворот.
        if self._rope is not None:
            # Применяем RoPE к Q и K (НЕ к V!)
            q = self._rope(q, start_pos=start_pos)  # [B, T, hs]
            k = self._rope(k, start_pos=start_pos)  # [B, T, hs]

        # Если cache пришел, то объединяем кэш и одну строку из ключа и значения. Это будут новые key и value  для последующих вычислений.
        # 5. Кэширование (для autoregressive generation)
        if cache is not None:
            k_cache, v_cache = cache
            k = torch.cat([k_cache, k], dim=2)  # Concat по seq_len (dim=2)
            v = torch.cat([v_cache, v], dim=2)

        # Если use_cache == True, то сохраните матрицы ключа и значения для кэша (это нужно сделать до дублирования голов).
        #if use_cache == True:
        #    # Обрезаем до последних window_size токенов
        #    k_to_cache = k[:, :, -self._window_size:, :]
        #    v_to_cache = v[:, :, -self._window_size:, :]
        #    kv_cache = (k_to_cache, v_to_cache)

        # Продублируйте головы в тензорах ключа (key) и значения (value), чтобы получился тензор размера на batch_size × num_q_heads × seq_len × head_size.
        #k = self._repeat_kv_heads(k, self._num_heads, self._num_kv_heads)
        #v = self._repeat_kv_heads(v, self._num_heads, self._num_kv_heads)
        k_expanded = self._repeat_kv_heads(k, self._num_heads, self._num_kv_heads)
        v_expanded = self._repeat_kv_heads(v, self._num_heads, self._num_kv_heads)
        
        # Перемножим матрицы запроса и ключа (транспонированную), чтобы вычислить матрицу внимания.
        # И разделить все значения в матрице внимания на корень из head_size.
        scores = q @ k_expanded.transpose(-2, -1) / (self._head_size ** 0.5)

        # 8. Применение маски
        k_seq_len = k_expanded.size(2)  # Длина K после concat с кэшем
    
        if cache is None:
            # Случай 1: Без кэша - полная квадратная маска
            # scores: [B, H, seq_len, seq_len]
            # Применяем маску [:seq_len, :seq_len]
            scores = scores.masked_fill(
                ~self._tril_mask[:seq_len, :seq_len], 
                float("-inf")
            )

        # Применить к матрице внимания (построчно) функцию Softmax.
        weights = F.softmax(scores, dim=-1)

        # Перемножим матрицу внимания и матрицу значения.
        x_out = weights @ v_expanded  # [B, T, hs]

        # Измените форму тензора на batch_size × seq_len × num_heads*head_size.
        # Transpose обратно и concatenate heads
        x_out = x_out.transpose(1, 2)  # [B, T_q, H, hs]
        x_out = x_out.contiguous()  # Важно для reshape!
        concatenated_attention = x_out.reshape(batch_size, seq_len, self._num_heads * self._head_size)

        #concatenated_attention = x_out.reshape(batch_size, seq_len, self._num_heads * self._head_size)

        # Пропустите получившийся тензор через последний линейный слой.
        # 3. Проецируем в пространство эмбеддингов
        projected_output = self._layer(concatenated_attention)

        # 4. Применяем dropout для регуляризации
        output = self._dropout(projected_output)

        if use_cache:
            # Обрезаем оригинальный K и V (до дублирования)
            k_to_cache = k[:, :, -self._window_size:, :]
            v_to_cache = v[:, :, -self._window_size:, :]
            kv_cache = (k_to_cache, v_to_cache)
            return output, kv_cache
        else:
            return output, None

    def _repeat_kv_heads(
        self,
        kv: torch.Tensor,
        num_q_heads: int,
        num_kv_heads: int
    ) -> torch.Tensor:
        """
        Дублирует головы K/V для соответствия количеству голов Q.

        Args:
            kv: [batch_size, num_kv_heads, seq_len, head_size]
            num_q_heads: Количество голов Query (например, 8)
            num_kv_heads: Количество голов Key/Value (например, 2)

        Returns:
            [batch_size, num_q_heads, seq_len, head_size]

        Example:
            num_q_heads=8, num_kv_heads=2
            Каждая голова KV дублируется 4 раза:
            [KV0, KV1] -> [KV0, KV0, KV0, KV0, KV1, KV1, KV1, KV1]
        """
        batch_size, num_kv_heads, seq_len, head_size = kv.shape

        if num_q_heads == num_kv_heads:
            # Нет необходимости дублировать
            return kv

        # Вычисляем сколько раз нужно повторить каждую голову
        num_repeats = num_q_heads // num_kv_heads

        # repeat_interleave дублирует каждую голову num_repeats раз
        # [B, num_kv_heads, S, hs] -> [B, num_q_heads, S, hs]
        # [B, num_kv_heads, S, hs] -> [B, num_kv_heads, 1, S, hs]
        kv = kv.unsqueeze(2)
        
        # [B, num_kv_heads, 1, S, hs] -> [B, num_kv_heads, num_repeats, S, hs]
        kv = kv.repeat(1, 1, num_repeats, 1, 1)
        
        # [B, num_kv_heads, num_repeats, S, hs] -> [B, num_q_heads, S, hs]
        kv = kv.reshape(batch_size, num_q_heads, seq_len, head_size)
        

        return kv

    def _create_sliding_window_mask(
        self,
        max_seq_len: int,
        window_size: int,
        device: torch.device = None
    ) -> torch.Tensor:
        """
        Создает маску для Sliding Window Attention.

        Args:
            max_seq_len: Максимальная длина последовательности
            window_size: Размер окна внимания
            device: Устройство для размещения тензора

        Returns:
            Маска формы [max_seq_len, max_seq_len], где True = разрешено

        Example:
            >>> mask = create_sliding_window_mask(8, 3)
            >>> print(mask.int())
            tensor([[1, 0, 0, 0, 0, 0, 0, 0],
                    [1, 1, 0, 0, 0, 0, 0, 0],
                    [1, 1, 1, 0, 0, 0, 0, 0],
                    [0, 1, 1, 1, 0, 0, 0, 0],
                    [0, 0, 1, 1, 1, 0, 0, 0],
                    [0, 0, 0, 1, 1, 1, 0, 0],
                    [0, 0, 0, 0, 1, 1, 1, 0],
                    [0, 0, 0, 0, 0, 1, 1, 1]])
        """
        row_indices = torch.arange(max_seq_len, device=device).unsqueeze(1)  # [max_seq_len, 1]
        col_indices = torch.arange(max_seq_len, device=device).unsqueeze(0)  # [1, max_seq_len]

        causal_mask = col_indices <= row_indices

        window_mask = (row_indices - col_indices) <= window_size

        mask = causal_mask & window_mask
        
        return mask

In [9]:
import torch
from torch import nn
import torch.nn.functional as F
from typing import Optional, Tuple


# ... (классы RoPE и GroupedQueryAttention как выше) ...

# Параметры
batch_size = 1
emb_size = 64
head_size = 16
num_q_heads = 4
num_kv_heads = 2
max_seq_len = 20
window_size = 2

# Создаем модель
rope = RoPE(head_size=head_size, max_seq_len=max_seq_len)
gqa = GroupedQueryAttention(
    num_q_heads=num_q_heads,
    num_kv_heads=num_kv_heads,
    emb_size=emb_size,
    head_size=head_size,
    max_seq_len=max_seq_len,
    window_size=window_size,
    rope=rope,
    dropout=0.0,
)

print("="*60)
print("Тест 1: Без кэша (prefill)")
print("="*60)

x1 = torch.randn(batch_size, 5, emb_size)
output1, cache1 = gqa(x1, use_cache=True)

print(f"Input:  {x1.shape}")
print(f"Output: {output1.shape}")
print(f"Cache K: {cache1[0].shape}")  # [1, 2, 5, 16]
print(f"Cache V: {cache1[1].shape}")  # [1, 2, 5, 16]

# Проверяем маску
print(f"\nМаска применена: [:5, :5]")
print(gqa._tril_mask[:5, :5].int())

print("\n" + "="*60)
print("Тест 2: С кэшем (generation)")
print("="*60)

x2 = torch.randn(batch_size, 1, emb_size)
output2, cache2 = gqa(x2, use_cache=True, cache=cache1)

print(f"Input:  {x2.shape}")
print(f"Output: {output2.shape}")
print(f"Cache K: {cache2[0].shape}")  # [1, 2, 6, 16]
print(f"Cache V: {cache2[1].shape}")  # [1, 2, 6, 16]

# Проверяем маску
k_seq_len = 6
seq_len = 1
start_pos = k_seq_len - seq_len  # 5
print(f"\nМаска применена: [{start_pos}:{k_seq_len}, :{k_seq_len}]")
print(gqa._tril_mask[start_pos:k_seq_len, :k_seq_len].int())

print("\n" + "="*60)
print("Тест 3: Генерация еще одного токена")
print("="*60)

x3 = torch.randn(batch_size, 1, emb_size)
output3, cache3 = gqa(x3, use_cache=True, cache=cache2)

print(f"Input:  {x3.shape}")
print(f"Output: {output3.shape}")
print(f"Cache K: {cache3[0].shape}")  # [1, 2, 7, 16]
print(f"Cache V: {cache3[1].shape}")  # [1, 2, 7, 16]

# Проверяем маску
k_seq_len = 7
seq_len = 1
start_pos = k_seq_len - seq_len  # 6
print(f"\nМаска применена: [{start_pos}:{k_seq_len}, :{k_seq_len}]")
print(gqa._tril_mask[start_pos:k_seq_len, :k_seq_len].int())

print("\n" + "="*60)
print("Тест 4: Генерация нескольких токенов сразу")
print("="*60)

x4 = torch.randn(batch_size, 3, emb_size)
output4, cache4 = gqa(x4, use_cache=True, cache=cache3)

print(f"Input:  {x4.shape}")
print(f"Output: {output4.shape}")
print(f"Cache K: {cache4[0].shape}")  # [1, 2, 10, 16]
print(f"Cache V: {cache4[1].shape}")  # [1, 2, 10, 16]

# Проверяем маску
k_seq_len = 10
seq_len = 3
start_pos = k_seq_len - seq_len  # 7
print(f"\nМаска применена: [{start_pos}:{k_seq_len}, :{k_seq_len}]")
print(gqa._tril_mask[start_pos:k_seq_len, :k_seq_len].int())

print("\n✅ Все тесты пройдены!")

Тест 1: Без кэша (prefill)
Input:  torch.Size([1, 5, 64])
Output: torch.Size([1, 5, 64])
Cache K: torch.Size([1, 2, 2, 16])
Cache V: torch.Size([1, 2, 2, 16])

Маска применена: [:5, :5]
tensor([[1, 0, 0, 0, 0],
        [1, 1, 0, 0, 0],
        [1, 1, 1, 0, 0],
        [0, 1, 1, 1, 0],
        [0, 0, 1, 1, 1]], dtype=torch.int32)

Тест 2: С кэшем (generation)
Input:  torch.Size([1, 1, 64])
Output: torch.Size([1, 1, 64])
Cache K: torch.Size([1, 2, 2, 16])
Cache V: torch.Size([1, 2, 2, 16])

Маска применена: [5:6, :6]
tensor([[0, 0, 0, 1, 1, 1]], dtype=torch.int32)

Тест 3: Генерация еще одного токена
Input:  torch.Size([1, 1, 64])
Output: torch.Size([1, 1, 64])
Cache K: torch.Size([1, 2, 2, 16])
Cache V: torch.Size([1, 2, 2, 16])

Маска применена: [6:7, :7]
tensor([[0, 0, 0, 0, 1, 1, 1]], dtype=torch.int32)

Тест 4: Генерация нескольких токенов сразу
Input:  torch.Size([1, 3, 64])
Output: torch.Size([1, 3, 64])
Cache K: torch.Size([1, 2, 2, 16])
Cache V: torch.Size([1, 2, 2, 16])

Маска 

In [10]:
import torch

# Параметры
batch_size = 1
emb_size = 64
head_size = 16
num_q_heads = 4
num_kv_heads = 2
max_seq_len = 100
window_size = 3

# Создаем модель
rope = RoPE(head_size=head_size, max_seq_len=max_seq_len)
gqa = GroupedQueryAttention(
    num_q_heads=num_q_heads,
    num_kv_heads=num_kv_heads,
    emb_size=emb_size,
    head_size=head_size,
    max_seq_len=max_seq_len,
    window_size=window_size,
    rope=rope,
    dropout=0.0,
)

print("="*70)
print("ФИНАЛЬНЫЙ ТЕСТ СООТВЕТСТВИЯ ТЗ")
print("="*70)

# Тест 1: Проверка маски
print(f"\n✅ Тест 1: Маска window_size={window_size}")
test_mask = gqa._create_sliding_window_mask(8, 3)
print(test_mask.int())

# Проверка количества видимых токенов
all_correct = True
for i in range(8):
    visible_count = test_mask[i].sum().item()
    expected = min(i + 1, window_size + 1)
    if visible_count != expected:
        all_correct = False
        print(f"❌ Токен {i}: видит {visible_count}, ожидается {expected}")

if all_correct:
    print("✅ Маска работает правильно!")

# Тест 2: Обрезка кэша при prefill
print("\n✅ Тест 2: Обрезка кэша при prefill")
x1 = torch.randn(batch_size, 10, emb_size)
output1, cache1 = gqa(x1, use_cache=True)

print(f"Prefill: 10 токенов")
print(f"K cache size: {cache1[0].shape[2]} (ожидается {window_size})")
assert cache1[0].shape[2] == window_size, f"❌ Кэш должен быть {window_size}"
print("✅ Кэш обрезан правильно!")

# Тест 3: Кэш не растет при генерации
print("\n✅ Тест 3: Кэш не растет при генерации")
cache = cache1
for i in range(10):
    x_new = torch.randn(batch_size, 1, emb_size)
    output_new, cache = gqa(x_new, use_cache=True, cache=cache)
    
    assert cache[0].shape[2] == window_size, \
        f"❌ Шаг {i+1}: кэш {cache[0].shape[2]}, ожидается {window_size}"

print(f"После 10 шагов генерации:")
print(f"K cache size: {cache[0].shape[2]} (ожидается {window_size})")
print("✅ Кэш всегда ограничен window_size!")

# Тест 4: Применение маски с кэшем
print("\n✅ Тест 4: Применение маски с кэшем")
x2 = torch.randn(batch_size, 1, emb_size)
output2, cache2 = gqa(x2, use_cache=True, cache=cache1)

k_seq_len = cache1[0].shape[2] + 1  # 3 + 1 = 4
start_pos = k_seq_len - 1  # 3
expected_mask = gqa._tril_mask[start_pos:k_seq_len, :k_seq_len]

print(f"K seq_len: {k_seq_len}")
print(f"Start pos: {start_pos}")
print(f"Маска применена: [{start_pos}:{k_seq_len}, :{k_seq_len}]")
print(expected_mask.int())
print("✅ Маска применяется правильно!")

# Тест 5: Без кэширования
print("\n✅ Тест 5: Без кэширования")
x_no_cache = torch.randn(batch_size, 5, emb_size)
output_no_cache, cache_no_cache = gqa(x_no_cache, use_cache=False)

assert cache_no_cache is None, "❌ Кэш должен быть None"
print("✅ Кэш не создается при use_cache=False")

print("\n" + "="*70)
print("🎉 ВСЕ ТЕСТЫ ПРОЙДЕНЫ! КОД ПОЛНОСТЬЮ СООТВЕТСТВУЕТ ТЗ!")
print("="*70)

print("\n📊 Итоговая сводка:")
print("  ✅ Параметр window_size добавлен")
print("  ✅ Sliding window маска работает корректно")
print("  ✅ Каждый токен видит себя + window_size предыдущих")
print("  ✅ Кэш обрезается до window_size токенов")
print("  ✅ Кэш не растет при генерации")
print("  ✅ Маска применяется правильно с кэшем и без")
print("  ✅ Grouped Query Attention работает")
print("  ✅ RoPE применяется корректно")

ФИНАЛЬНЫЙ ТЕСТ СООТВЕТСТВИЯ ТЗ

✅ Тест 1: Маска window_size=3
tensor([[1, 0, 0, 0, 0, 0, 0, 0],
        [1, 1, 0, 0, 0, 0, 0, 0],
        [1, 1, 1, 0, 0, 0, 0, 0],
        [1, 1, 1, 1, 0, 0, 0, 0],
        [0, 1, 1, 1, 1, 0, 0, 0],
        [0, 0, 1, 1, 1, 1, 0, 0],
        [0, 0, 0, 1, 1, 1, 1, 0],
        [0, 0, 0, 0, 1, 1, 1, 1]], dtype=torch.int32)
✅ Маска работает правильно!

✅ Тест 2: Обрезка кэша при prefill
Prefill: 10 токенов
K cache size: 3 (ожидается 3)
✅ Кэш обрезан правильно!

✅ Тест 3: Кэш не растет при генерации
После 10 шагов генерации:
K cache size: 3 (ожидается 3)
✅ Кэш всегда ограничен window_size!

✅ Тест 4: Применение маски с кэшем
K seq_len: 4
Start pos: 3
Маска применена: [3:4, :4]
tensor([[1, 1, 1, 1]], dtype=torch.int32)
✅ Маска применяется правильно!

✅ Тест 5: Без кэширования
✅ Кэш не создается при use_cache=False

🎉 ВСЕ ТЕСТЫ ПРОЙДЕНЫ! КОД ПОЛНОСТЬЮ СООТВЕТСТВУЕТ ТЗ!

📊 Итоговая сводка:
  ✅ Параметр window_size добавлен
  ✅ Sliding window маска работает корре

In [11]:
import torch
import torch.nn.functional as F

torch.manual_seed(42)

# === 1️⃣ Проверка RoPE ===
def test_rope():
    print("\n=== Test RoPE ===")

    head_size = 8
    seq_len = 4
    num_heads = 2
    batch_size = 1

    rope = RoPE(head_size=head_size, max_seq_len=16)

    # [B, H, T, D]
    x = torch.randn(batch_size, num_heads, seq_len, head_size)
    x_rot = rope(x)

    print("Input shape:", x.shape)
    print("Output shape:", x_rot.shape)
    assert x.shape == x_rot.shape, "RoPE: shape mismatch!"

    # Проверим, что RoPE сохраняет норму (приблизительно)
    norm_diff = (x.norm(dim=-1) - x_rot.norm(dim=-1)).abs().mean()
    print("Average norm difference:", norm_diff.item())
    assert norm_diff < 1e-4, "RoPE: norms changed too much!"

    print("✅ RoPE test passed.")


# === 2️⃣ Проверка GroupedQueryAttention ===
def test_gqa():
    print("\n=== Test Grouped Query Attention ===")

    emb_size = 16
    num_q_heads = 4
    num_kv_heads = 2
    head_size = 8
    max_seq_len = 16
    window_size = 3
    batch_size = 2
    seq_len = 6

    rope = RoPE(head_size=head_size, max_seq_len=max_seq_len)
    gqa = GroupedQueryAttention(
        num_q_heads=num_q_heads,
        num_kv_heads=num_kv_heads,
        emb_size=emb_size,
        head_size=head_size,
        max_seq_len=max_seq_len,
        window_size=window_size,
        rope=rope,
    )

    x = torch.randn(batch_size, seq_len, emb_size)
    out, kv_cache = gqa(x, use_cache=True)

    print("Input shape:", x.shape)
    print("Output shape:", out.shape)
    assert out.shape == (batch_size, seq_len, emb_size), "GQA output shape mismatch!"

    print("Cache shapes:", kv_cache[0].shape, kv_cache[1].shape)
    assert kv_cache[0].shape == (batch_size, num_kv_heads, window_size, head_size), "K cache shape mismatch!"
    assert kv_cache[1].shape == (batch_size, num_kv_heads, window_size, head_size), "V cache shape mismatch!"

    print("✅ GQA shape test passed.")

    # === Проверка маски ===
    mask = gqa._create_sliding_window_mask(seq_len, window_size)
    print("\nSliding window mask (1=visible, 0=masked):")
    print(mask.int())

    # Проверим, что токен не видит больше, чем window_size назад
    for i in range(seq_len):
        visible_positions = mask[i].nonzero().squeeze()
    
        # Если только одна позиция видима → делаем список из одного элемента
        if visible_positions.ndim == 0:
            visible_positions = [visible_positions.item()]
        else:
            visible_positions = visible_positions.tolist()
    
        max_back = i - min(visible_positions)
        assert max_back <= window_size, f"Token {i} sees too far back!"


    print("✅ Sliding window mask test passed.")


# === 3️⃣ Проверка на автогенерацию (кэш) ===
def test_cache_behavior():
    print("\n=== Test Cache Behavior ===")

    emb_size = 16
    num_q_heads = 4
    num_kv_heads = 2
    head_size = 8
    max_seq_len = 16
    window_size = 3
    batch_size = 1

    gqa = GroupedQueryAttention(
        num_q_heads=num_q_heads,
        num_kv_heads=num_kv_heads,
        emb_size=emb_size,
        head_size=head_size,
        max_seq_len=max_seq_len,
        window_size=window_size,
        rope=None,
    )

    # Первый проход (без кэша)
    x1 = torch.randn(batch_size, 2, emb_size)
    out1, cache1 = gqa(x1, use_cache=True)

    # Второй проход (с кэшем)
    x2 = torch.randn(batch_size, 1, emb_size)
    out2, cache2 = gqa(x2, use_cache=True, cache=cache1)

    print("Cache1 K shape:", cache1[0].shape)
    print("Cache2 K shape:", cache2[0].shape)

    assert cache2[0].shape[-2] == window_size, "Cache not trimmed correctly!"
    print("✅ Cache test passed.")


if __name__ == "__main__":
    test_rope()
    test_gqa()
    test_cache_behavior()



=== Test RoPE ===
Input shape: torch.Size([1, 2, 4, 8])
Output shape: torch.Size([1, 2, 4, 8])
Average norm difference: 0.0
✅ RoPE test passed.

=== Test Grouped Query Attention ===
Input shape: torch.Size([2, 6, 16])
Output shape: torch.Size([2, 6, 16])
Cache shapes: torch.Size([2, 2, 3, 8]) torch.Size([2, 2, 3, 8])
✅ GQA shape test passed.

Sliding window mask (1=visible, 0=masked):
tensor([[1, 0, 0, 0, 0, 0],
        [1, 1, 0, 0, 0, 0],
        [1, 1, 1, 0, 0, 0],
        [1, 1, 1, 1, 0, 0],
        [0, 1, 1, 1, 1, 0],
        [0, 0, 1, 1, 1, 1]], dtype=torch.int32)
✅ Sliding window mask test passed.

=== Test Cache Behavior ===
Cache1 K shape: torch.Size([1, 2, 2, 8])
Cache2 K shape: torch.Size([1, 2, 3, 8])
✅ Cache test passed.


In [12]:
if __name__ == "__main__":
    print("🧪 Тестирование исправленной маски\n")
    
    # Параметры
    batch_size = 1
    emb_size = 64
    head_size = 16
    num_q_heads = 4
    num_kv_heads = 2
    max_seq_len = 20
    window_size = 3
    
    # Создаем модель
    rope = RoPE(head_size=head_size, max_seq_len=max_seq_len)
    gqa = GroupedQueryAttention(
        num_q_heads=num_q_heads,
        num_kv_heads=num_kv_heads,
        emb_size=emb_size,
        head_size=head_size,
        max_seq_len=max_seq_len,
        window_size=window_size,
        rope=rope,
        dropout=0.0,
    )
    
    print("="*60)
    print("Проверка маски")
    print("="*60)
    print(f"\nМаска для max_seq_len=8, window_size=3:")
    test_mask = gqa._create_sliding_window_mask(8, 3)
    print(test_mask.int())
    
    print("\n" + "="*60)
    print("Тест 1: Без кэша (prefill)")
    print("="*60)
    
    x1 = torch.randn(batch_size, 5, emb_size)
    output1, cache1 = gqa(x1, use_cache=True)
    
    print(f"Input:  {x1.shape}")
    print(f"Output: {output1.shape}")
    print(f"Cache K: {cache1[0].shape}")
    print(f"Cache V: {cache1[1].shape}")
    
    print(f"\nМаска применена: [:5, :5]")
    print(gqa._tril_mask[:5, :5].int())
    
    print("\n" + "="*60)
    print("Тест 2: С кэшем (generation)")
    print("="*60)
    
    x2 = torch.randn(batch_size, 1, emb_size)
    output2, cache2 = gqa(x2, use_cache=True, cache=cache1)
    
    print(f"Input:  {x2.shape}")
    print(f"Output: {output2.shape}")
    print(f"Cache K: {cache2[0].shape}")
    print(f"Cache V: {cache2[1].shape}")
    
    k_seq_len = 6
    seq_len = 1
    start_pos = k_seq_len - seq_len
    print(f"\nМаска применена: [{start_pos}:{k_seq_len}, :{k_seq_len}]")
    print(gqa._tril_mask[start_pos:k_seq_len, :k_seq_len].int())
    
    print("\n✅ Все тесты пройдены!")

🧪 Тестирование исправленной маски

Проверка маски

Маска для max_seq_len=8, window_size=3:
tensor([[1, 0, 0, 0, 0, 0, 0, 0],
        [1, 1, 0, 0, 0, 0, 0, 0],
        [1, 1, 1, 0, 0, 0, 0, 0],
        [1, 1, 1, 1, 0, 0, 0, 0],
        [0, 1, 1, 1, 1, 0, 0, 0],
        [0, 0, 1, 1, 1, 1, 0, 0],
        [0, 0, 0, 1, 1, 1, 1, 0],
        [0, 0, 0, 0, 1, 1, 1, 1]], dtype=torch.int32)

Тест 1: Без кэша (prefill)
Input:  torch.Size([1, 5, 64])
Output: torch.Size([1, 5, 64])
Cache K: torch.Size([1, 2, 3, 16])
Cache V: torch.Size([1, 2, 3, 16])

Маска применена: [:5, :5]
tensor([[1, 0, 0, 0, 0],
        [1, 1, 0, 0, 0],
        [1, 1, 1, 0, 0],
        [1, 1, 1, 1, 0],
        [0, 1, 1, 1, 1]], dtype=torch.int32)

Тест 2: С кэшем (generation)
Input:  torch.Size([1, 1, 64])
Output: torch.Size([1, 1, 64])
Cache K: torch.Size([1, 2, 3, 16])
Cache V: torch.Size([1, 2, 3, 16])

Маска применена: [5:6, :6]
tensor([[0, 0, 1, 1, 1, 1]], dtype=torch.int32)

✅ Все тесты пройдены!


# Full Model

In [13]:
import torch
from torch import nn
from torch import Tensor
import torch.nn.functional as F
from math import sqrt



class SiLU(nn.Module):
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        return torch.sigmoid(x) * x
    
class RMSNorm(nn.Module):
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self._eps = eps
        self._w = nn.Parameter(torch.ones(dim))
    
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        rms = (x.pow(2).mean(-1, keepdim=True) + self._eps) ** 0.5
        norm_x = x / rms
        return self._w * norm_x

class SwiGLU(nn.Module):
    def __init__(self, emb_size: int, dropout: float = 0.1):
        super().__init__()

        self._gate = nn.Linear(emb_size, 4 * emb_size)
        self._up = nn.Linear(emb_size, 4 * emb_size)
        self._down = nn.Linear(4 * emb_size, emb_size)
        self._activation = SiLU()
        self._dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size].
        gate_out = self._gate(x)                          # [batch, seq, 4*emb]
        activation_out = self._activation(gate_out)       # [batch, seq, 4*emb]
        up_out = self._up(x)                              # [batch, seq, 4*emb]
        out = up_out * activation_out                     # поэлементное!
        out = self._down(out)                             # [batch, seq, emb]
        return self._dropout(out)


class TokenEmbeddings(nn.Module):
    def __init__(self, vocab_size: int, emb_size: int):
        super().__init__()
        self._embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=emb_size
        )

    def forward(self, x: Tensor) -> Tensor:
        return self._embedding(x)

    @property
    def num_embeddings(self) -> int:
        return self._embedding.num_embeddings

    @property
    def embedding_dim(self) -> int:
        return self._embedding.embedding_dim


class GELU(nn.Module):
    def __init__(self):
        super().__init__()
        self.sqrt_2_over_pi = torch.sqrt(torch.tensor(2.0) / math.pi)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return 0.5 * x * (1 + torch.tanh(
            self.sqrt_2_over_pi * (x + 0.044715 * torch.pow(x, 3))
        ))

    
class Decoder(nn.Module):
    def __init__(self, 
        num_q_heads: int,
        num_kv_heads: int,
        emb_size: int,
        head_size: int,
        max_seq_len: int,
        window_size: int,
        rope: RoPE,
        dropout: float = 0.1
    ):
        super().__init__()
        self._heads = GroupedQueryAttention(
            num_q_heads=num_q_heads, 
            num_kv_heads=num_kv_heads,
            emb_size=emb_size, 
            head_size=head_size, 
            max_seq_len=max_seq_len,
            window_size=window_size,
            rope=rope,
            dropout=dropout
        )
        self._ff = SwiGLU(emb_size=emb_size, dropout=dropout)
        self._norm1 = RMSNorm(emb_size)
        self._norm2 = RMSNorm(emb_size)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None, use_cache: bool = True, cache: list = None) -> torch.Tensor:
        norm1_out = self._norm1(x)
        attention, kv_caches = self._heads(norm1_out, mask, use_cache=use_cache, cache=cache)
        out = attention + x
        
        norm2_out = self._norm2(out)
        ffn_out = self._ff(norm2_out)

        if use_cache is True:
            return (ffn_out + out, kv_caches)
        else:
            return (ffn_out + out, None)



from torch import nn
import torch
import torch.nn.functional as F

class Mistral(nn.Module):
    def __init__(self,
        vocab_size: int,
        max_seq_len: int,
        emb_size: int,
        num_q_heads: int,
        num_kv_heads: int,
        head_size: int,
        num_layers: int,
        window_size: int,
        dropout: float = 0.1,
        device: str = 'cpu'
    ):
        super().__init__()
        self._vocab_size = vocab_size
        self._max_seq_len = max_seq_len
        self._emb_size = emb_size
        self._num_q_heads = num_q_heads
        self._num_kv_heads = num_kv_heads
        self._head_size = head_size
        self._num_layers = num_layers
        self._dropout = dropout
        self._device = device
        
        self.validation_loss = None

        # Инициализация слоев
        self._token_embeddings = TokenEmbeddings(
            vocab_size=vocab_size, 
            emb_size=emb_size
        )
        self._position_embeddings = RoPE(
            head_size=head_size,
            max_seq_len=max_seq_len
        )
        #self._position_embeddings = PositionalEmbeddings(
        #    max_seq_len=max_seq_len, 
        #    emb_size=emb_size
        #)
        self._dropout = nn.Dropout(dropout)
        self._decoders = nn.ModuleList([Decoder(
            num_q_heads=num_q_heads,
            num_kv_heads=num_kv_heads,
            emb_size=emb_size,
            head_size=head_size,
            max_seq_len=max_seq_len,
            window_size=window_size,
            rope=self._position_embeddings,
            dropout=dropout 
        ) for _ in range(num_layers)])
        self._norm = RMSNorm(emb_size)
        self._linear = nn.Linear(emb_size, vocab_size)

    def forward(self, x: torch.Tensor, use_cache: bool = True, cache: list = None) -> tuple:
        # Проверка длины последовательности (только при отсутствии кэша)
        if cache is None and x.size(1) > self._max_seq_len:
            raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}")
        
        # Эмбеддинги токенов и позиций
        tok_out = self._token_embeddings(x)  # [batch, seq_len, emb_size]
       #pos_out = self._position_embeddings(x)  # [batch, seq_len, emb_size]
        
        # Комбинирование
        out = self._dropout(tok_out)  # [batch, seq_len, emb_size]
        
        # Стек декодеров с передачей кэша
        new_cache = []
        for i, decoder in enumerate(self._decoders):
            decoder_cache = cache[i] if cache is not None else None
            decoder_result = decoder(out, use_cache=use_cache, cache=decoder_cache)

            # Извлекаем результат из кортежа
            if use_cache:
                out, decoder_new_cache = decoder_result
                new_cache.append(decoder_new_cache)
            else:
                out = decoder_result[0]

        out = self._norm(out)
        logits = self._linear(out)
            
        # Возвращаем результат с учетом use_cache
        if use_cache:
            return (logits, new_cache)
        else:
            return (logits, None)

    def generate(self,
        x: torch.Tensor, 
        max_new_tokens: int, 
        do_sample: bool,
        temperature: float = 1.0,
        top_k: int = None,
        top_p: float = None,
        use_cache: bool = True
    ) -> torch.Tensor:
        cache = None

        for _ in range(max_new_tokens):
            if use_cache and cache is not None:
                # Используем кэш - передаем только последний токен
                x_input = x[:, -1:]  # [batch_size, 1]
            else:
                # Первая итерация или кэш отключен - передаем всю последовательность
                x_input = x
            
            # Прямой проход с кэшем
            logits, new_cache = self.forward(x_input, use_cache=use_cache, cache=cache)
            
            # Обновляем кэш для следующей итерации
            if use_cache:
                cache = new_cache

            last_logits = logits[:, -1, :]  # [batch_size, vocab_size]

            # Масштабируем логиты температурой
            if temperature > 0:
                logits_scaled = last_logits / temperature
            else:
                logits_scaled = last_logits

            if do_sample == True and top_k != None:
                _, topk_indices = torch.topk(logits_scaled, top_k, dim=-1)

                # # Заменим все НЕ top-k логиты на -inf
                masked_logits = logits_scaled.clone()
                vocab_size = logits_scaled.size(-1)

                # создаём маску: 1, если токен НЕ в topk_indices
                mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
                mask.scatter_(1, topk_indices, 0)  # 0 там, где top-k индексы
                masked_logits[mask.byte()] = float('-inf')

                logits_scaled = masked_logits

            if do_sample == True and top_p != None:
                # 1. Применим softmax, чтобы получить вероятности:
                probs = F.softmax(logits_scaled, dim=-1)  # [B, vocab_size]
                # 2. Отсортируем токены по убыванию вероятностей:
                sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
                # 3. Посчитаем кумулятивную сумму вероятностей:
                cum_probs = torch.cumsum(sorted_probs, dim=-1)  # [B, vocab_size]
                # 4. Определим маску: оставить токены, пока сумма < top_p
                sorted_mask = (cum_probs <= top_p).byte()  # [B, vocab_size]
                # Гарантируем, что хотя бы первый токен останется
                sorted_mask[:, 0] = 1
                # 5. Преобразуем маску обратно в оригинальный порядок:
                # Создаём полную маску из 0
                mask = torch.zeros_like(probs, dtype=torch.uint8)
                # Устанавливаем 1 в местах нужных токенов
                mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
                # 6. Зануляем логиты токенов вне топ-p:
                logits_scaled[~mask] = float('-inf')

            # 4. Применяем Softmax
            probs = F.softmax(logits_scaled, dim=-1)  # [batch_size, vocab_size]


            if do_sample == True:
                # 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
                next_token = torch.multinomial(probs, num_samples=1)  # [batch_size, 1]
            else:
                # 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
                next_token = torch.argmax(probs, dim=-1, keepdim=True)  # [batch_size, 1]
            
            # 6. Добавляем его к последовательности
            x = torch.cat([x, next_token], dim=1)  # [batch_size, seq_len+1]
        return x

    def save(self, path):
        torch.save({
            'model_state_dict': self.state_dict(),
            'vocab_size': self._vocab_size,
            'max_seq_len': self._max_seq_len,
            'emb_size': self._emb_size,
            'num_heads': self._num_heads,
            'head_size': self._head_size,
            'num_layers': self._num_layers
        }, path)

    @classmethod
    def load(cls, path, device):
        checkpoint = torch.load(path, map_location=device)
        model = cls(
            vocab_size=checkpoint['vocab_size'],
            max_seq_len=checkpoint['max_seq_len'],
            emb_size=checkpoint['emb_size'],
            num_heads=checkpoint['num_heads'],
            head_size=checkpoint['head_size'],
            num_layers=checkpoint['num_layers']
        )
        model.load_state_dict(checkpoint['model_state_dict'])
        model.to(device)
        return model

    @property
    def max_seq_len(self) -> int:
        return self._max_seq_len

## 2. Обучение Mistral

Mistral обучается в два этапа:

- 1️⃣ **Предобучение (Unsupervised Pretraining)**  
- 2️⃣ **Дообучение (Supervised Fine-Tuning)**



### 5.1 Предобучение

На первом этапе модель обучается без разметки: она получает большой корпус текстов и учится **предсказывать следующий токен** по предыдущим.

Функция потерь:
$$
L = - \sum_{t=1}^{T} \log P(x_t | x_1, x_2, ..., x_{t-1})
$$

Таким образом, модель учится строить вероятностную модель языка, "угадывая" продолжение текста.


Во время **предобучения** Mistral учится **предсказывать следующий токен** (language modeling task).  
Формально:  
$$ 
P(x_t ,|, x_1, x_2, \dots, x_{t-1})  
$$ 
То есть, если на вход подаётся предложение `"I love deep"`, модель должна предсказать `"learning"`.


### ✅ 5.1.1 Подготовка данных

Создадим **датасет** на основе BPE-токенизатора:

**BPE Tokenizator**

In [14]:
class BPE:
    def __init__(self, vocab_size: int):
        self.vocab_size = vocab_size
        self.id2token = {}
        self.token2id = {}

    def fit(self, text: str):
        # 1. Получаем уникальные токены (символы)
        unique_tokens = sorted(set(text))
        tokens = unique_tokens.copy()

        # 2. Разбиваем текст на токены-символы
        sequence = list(text)

        # 3. Объединяем токены до достижения нужного размера словаря
        while len(tokens) < self.vocab_size:
            #print(f'len={len(tokens)} < {self.vocab_size}')
            # Считаем частоты пар
            pair_freq = {}
            for i in range(len(sequence) - 1):
                pair = (sequence[i], sequence[i + 1])
                #print(f'pair = {pair}')
                if pair not in pair_freq:
                    pair_freq[pair] = 0
                pair_freq[pair] += 1


            #print(f'pair_freq = {pair_freq}')  
            if not pair_freq:
                break  # нет пар — выходим

            #for x in pair_freq.items():
            #    self.debug(x, sequence)

            # Находим самую частую пару (в случае равенства — та, что встретилась первой)
            most_frequent_pair = max(pair_freq.items(), key=lambda x: (x[1], -self._pair_first_index(sequence, x[0])))[0]
            #print(most_frequent_pair)
            # Создаем новый токен
            new_token = most_frequent_pair[0] + most_frequent_pair[1]
            #print(f"new token={new_token}")
            tokens.append(new_token)
            #print(f"tokens={tokens}")

            i = 0
            new_sequence = []

            while i < len(sequence):
                if i < len(sequence) - 1 and (sequence[i], sequence[i + 1]) == most_frequent_pair:
                    new_sequence.append(new_token)
                    i += 2  # пропускаем два символа — заменённую пару
                else:
                    new_sequence.append(sequence[i])
                    i += 1
            sequence = new_sequence
            #break
        
        # 4. Создаем словари
        self.vocab = tokens.copy()
        self.token2id = dict(zip(tokens, range(self.vocab_size)))
        self.id2token = dict(zip(range(self.vocab_size), tokens))

    def _pair_first_index(self, sequence, pair):
        for i in range(len(sequence) - 1):
            if (sequence[i], sequence[i + 1]) == pair:
                return i
        return float('inf')  # если пара не найдена (в теории не должно случиться)


    def encode(self, text: str):
        # 1. Разбиваем текст на токены-символы
        sequence = list(text)
        # 2. Инициализация пустого списка токенов
        tokens = []
        # 3. Установить i = 0
        i = 0
        while i < len(text):
            # 3.1 Найти все токены в словаре, начинающиеся с text[i]
            start_char = text[i]
            result = [token for token in self.vocab if token.startswith(start_char)]
            # 3.2 Выбрать самый длинный подходящий токен
            find_token = self._find_max_matching_token(text[i:], result)
            if find_token is None:
                # Обработка неизвестного символа
                tokens.append(text[i])  # Добавляем сам символ как токен
                i += 1
            else:
                # 3.3 Добавить токен в результат
                tokens.append(find_token)
                # 3.4 Увеличить i на длину токена
                i += len(find_token)

        # 4. Заменить токены на их ID
        return self._tokens_to_ids(tokens)

    def _find_max_matching_token(self, text: str, tokens: list):
        """Находит самый длинный токен из списка, с которого начинается текст"""
        matching = [token for token in tokens if text.startswith(token)]
        return max(matching, key=len) if matching else None

    def _tokens_to_ids(self, tokens):
        """Конвертирует список токенов в их ID с обработкой неизвестных токенов"""
        ids = []
        for token in tokens:
            if token in self.token2id:
                ids.append(self.token2id[token])
            else:
                ids.append(0)  # Специальное значение
        return ids


    def decode(self, ids: list) -> str:
        return ''.join(self._ids_to_tokens(ids))

    def _ids_to_tokens(self, ids: list) -> list:
        """Конвертирует список Ids в их tokens"""
        tokens = []
        for id in ids:
            if id in self.id2token:
                tokens.append(self.id2token[id])
            else:
                tokens.append('')  # Специальное значение
        return tokens


    def save(self, filename):
        with open(filename, 'wb') as f:
            dill.dump(self, f)
        print(f"Объект сохранён в {filename}")


    @classmethod
    def load(cls, filename):
        with open(filename, 'rb') as f:
            obj = dill.load(f)
                
        print(f"Объект загружен из {filename}")
        return obj

In [15]:
import torch
from torch.utils.data import Dataset, DataLoader

class GPTDataset(Dataset):
    def __init__(self, text: str, bpe: BPE, block_size: int):
        self.bpe = bpe
        self.block_size = block_size
        self.data = bpe.encode(text)
        
    def __len__(self):
        return len(self.data) - self.block_size

    def __getitem__(self, idx):
        x = torch.tensor(self.data[idx:idx+self.block_size], dtype=torch.long)
        y = torch.tensor(self.data[idx+1:idx+self.block_size+1], dtype=torch.long)
        return x, y

### ✅ 5.1.2 Цикл обучения

Для обучения создадим функцию:

In [16]:
import torch.nn.functional as F
from torch import optim

def train_mistral(model, dataset, epochs=5, batch_size=32, lr=3e-4, device='cpu'):
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    optimizer = optim.AdamW(model.parameters(), lr=lr)

    model.to(device)
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)

            # Прямой проход
            logits, _ = model(x, use_cache=False)  # [B, T, vocab_size]

            # Перестроим выход под CrossEntropy
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), y.view(-1))

            # Обратное распространение
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    return model

### ✅ 5.1.3 Пример запуска


**🧠 Конфигурация Mistral Mini**


| Параметр        | Значение | Описание                                      |
| --------------- | -------- | --------------------------------------------- |
| **vocab_size**  | `50257`  | Размер словаря (BPE токенизатор OpenAI)       |
| **max_seq_len** | `512`   | Максимальная длина входной последовательности |
| **emb_size**    | `256`    | Размер эмбеддингов (векторное пространство)   |
| **num_heads**   | `4`     | Количество голов в multi-head attention       |
| **head_size**   | `64`     | Размерность одной головы внимания (768 / 12)  |
| **num_layers**  | `4`     | Количество блоков (декодеров)                 |
| **dropout**     | `0.1`    | Вероятность дропаута                          |


In [17]:
# 1. Исходный текст
text = "Deep learning is amazing. Transformers changed the world. Attention is all you need. GPT models revolutionized NLP."

# 2. Обучаем токенизатор
bpe = BPE(vocab_size=100)
bpe.fit(text)

# 3. Создаем датасет
dataset = GPTDataset(text, bpe, block_size=8)
print(f"Dataset length: {len(dataset)}")

# 4. Инициализируем модель
model = Mistral(
    vocab_size=len(bpe.vocab),  # размер словаря BPE
    max_seq_len=512,           # GPT-2 использует контекст в 512 токена
    emb_size=256,               # размер эмбеддингов
    num_q_heads=4,               # количество голов внимания
    num_kv_heads=2,               # количество голов внимания
    head_size=64,               # размер каждой головы (256 / 4)
    num_layers=4,              # количество блоков Transformer
    window_size=8,
    dropout=0.1                 # стандартный dropout GPT-2
)

# 5. Обучаем
train_mistral(model, dataset, epochs=100, batch_size=4)

Dataset length: 20
Epoch 1/100, Loss: 3.6991
Epoch 2/100, Loss: 1.5456
Epoch 3/100, Loss: 0.6310
Epoch 4/100, Loss: 0.3419
Epoch 5/100, Loss: 0.2353
Epoch 6/100, Loss: 0.1615
Epoch 7/100, Loss: 0.1222
Epoch 8/100, Loss: 0.1117
Epoch 9/100, Loss: 0.0890
Epoch 10/100, Loss: 0.0788
Epoch 11/100, Loss: 0.0793
Epoch 12/100, Loss: 0.0612
Epoch 13/100, Loss: 0.0724
Epoch 14/100, Loss: 0.0654
Epoch 15/100, Loss: 0.0873
Epoch 16/100, Loss: 0.0840
Epoch 17/100, Loss: 0.0755
Epoch 18/100, Loss: 0.0572
Epoch 19/100, Loss: 0.0663
Epoch 20/100, Loss: 0.0741
Epoch 21/100, Loss: 0.0635
Epoch 22/100, Loss: 0.0649
Epoch 23/100, Loss: 0.0579
Epoch 24/100, Loss: 0.0617
Epoch 25/100, Loss: 0.0626
Epoch 26/100, Loss: 0.0591
Epoch 27/100, Loss: 0.0580
Epoch 28/100, Loss: 0.0514
Epoch 29/100, Loss: 0.0572
Epoch 30/100, Loss: 0.0567
Epoch 31/100, Loss: 0.0595
Epoch 32/100, Loss: 0.0523
Epoch 33/100, Loss: 0.0508
Epoch 34/100, Loss: 0.0494
Epoch 35/100, Loss: 0.0505
Epoch 36/100, Loss: 0.0588
Epoch 37/100, Loss

Mistral(
  (_token_embeddings): TokenEmbeddings(
    (_embedding): Embedding(100, 256)
  )
  (_position_embeddings): RoPE()
  (_dropout): Dropout(p=0.1, inplace=False)
  (_decoders): ModuleList(
    (0-3): 4 x Decoder(
      (_heads): GroupedQueryAttention(
        (_rope): RoPE()
        (_q): Linear(in_features=256, out_features=256, bias=True)
        (_k): Linear(in_features=256, out_features=128, bias=True)
        (_v): Linear(in_features=256, out_features=128, bias=True)
        (_layer): Linear(in_features=256, out_features=256, bias=True)
        (_dropout): Dropout(p=0.1, inplace=False)
      )
      (_ff): SwiGLU(
        (_gate): Linear(in_features=256, out_features=1024, bias=True)
        (_up): Linear(in_features=256, out_features=1024, bias=True)
        (_down): Linear(in_features=1024, out_features=256, bias=True)
        (_activation): SiLU()
        (_dropout): Dropout(p=0.1, inplace=False)
      )
      (_norm1): RMSNorm()
      (_norm2): RMSNorm()
    )
  )
  (_no


---

### 5.2 Дообучение

После предобучения LLAMA уже знает структуру и грамматику языка.  
На втором этапе она дообучается на конкретных задачах (например, классификация, QA) с помощью размеченных данных.

Технически это почти то же обучение, только:

- Загружаем модель с уже обученными весами.
- Используем новые данные.
- Можно уменьшить скорость обучения.
- Иногда замораживают часть слоёв (например, эмбеддинги).


In [18]:
def fine_tune_mistral(model, dataset, epochs=3, batch_size=16, lr=1e-5, device='cpu', freeze_embeddings=True):
    if freeze_embeddings:
        for param in model._token_embeddings.parameters():
            param.requires_grad = False
        for param in model._position_embeddings.parameters():
            param.requires_grad = False

    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=lr)

    model.to(device)
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            logits, _ = model(x, use_cache=False)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), y.view(-1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Fine-tune Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(dataloader):.4f}")

In [19]:
# Например, мы хотим дообучить модель на стиле коротких технических фраз
fine_tune_text = """
Transformers revolutionize NLP.
Deep learning enables self-attention.
GPT generates text autoregressively.
"""

dataset = GPTDataset(fine_tune_text, bpe, block_size=8)


# Запуск дообучения
fine_tune_mistral(model, dataset, epochs=10, batch_size=4, lr=1e-4)

Fine-tune Epoch 1/10, Loss: 4.8431
Fine-tune Epoch 2/10, Loss: 2.6429
Fine-tune Epoch 3/10, Loss: 1.6542
Fine-tune Epoch 4/10, Loss: 1.2143
Fine-tune Epoch 5/10, Loss: 0.9998
Fine-tune Epoch 6/10, Loss: 0.8404
Fine-tune Epoch 7/10, Loss: 0.6827
Fine-tune Epoch 8/10, Loss: 0.5871
Fine-tune Epoch 9/10, Loss: 0.5183
Fine-tune Epoch 10/10, Loss: 0.4528


## 📝 6. Генерация текста после обучения

In [20]:
def generate_text(model, bpe, prompt: str, max_new_tokens=20, device='cpu'):
    model.eval()
    ids = torch.tensor([bpe.encode(prompt)], dtype=torch.long).to(device)
    out = model.generate(ids, max_new_tokens=max_new_tokens, do_sample=True)
    text = bpe.decode(out[0].tolist())
    return text

In [21]:
print(generate_text(model, bpe, "Deep learning", max_new_tokens=20))

Deep learning ena les self ti att a
