# Task 1

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy as np

In [None]:
def cross_attention(q, k, v):
    attn_scores = torch.matmul(q, k.transpose(-2, -1)) / np.sqrt(k.shape[1])  # ваш код
    attn_weights = F.softmax(attn_scores, dim=-1)
    outputs = torch.matmul(attn_weights, v) # ваш код
    return outputs, attn_weights

In [None]:
class CrossAttentionLayer(nn.Module):
  def __init__(self, d_model, d_k, d_v): # добавьте нужные гиперпараметры
    super().__init__()
    # Init layers #
    self.W_q = nn.Linear(d_model, d_k)
    self.W_k = nn.Linear(d_model, d_k)
    self.W_v = nn.Linear(d_model, d_v)

  def forward(self, enc_output, dec_output):
    Q = self.W_q(dec_output)
    K = self.W_k(enc_output)
    V = self.W_v(enc_output)
    output, attn_weights = cross_attention(Q, K, V)
    return output, attn_weights

In [None]:
# Проверка
Q = [[10, 0, 0, 0],
     [0, 10, 0, 0]]
K = [[10, 0, 0, 0],
     [0, 10, 0, 0],
     [0, 0, 10, 0]]
V = [[10, 0, 0, 0],
     [0, 20, 0, 0],
     [0, 0, 30, 0]]

Q = torch.tensor(Q).to(torch.float32)
K = torch.tensor(K).to(torch.float32)
V = torch.tensor(V).to(torch.float32)

assert 30 == cross_attention(Q, K, V)[0].sum()

# Task 2

In [None]:
class EncoderDecoderWithAttention(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, start_token_id, max_len):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.encoder = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.decoder = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.cross_attn = CrossAttentionLayer(d_model=hidden_dim, d_k=hidden_dim, d_v=hidden_dim)
        self.lm_head = nn.Linear(hidden_dim * 2, vocab_size)  # concat(dec_output, context)
        self.start_token_id = start_token_id
        self.max_len = max_len

    def forward(self, src, tgt=None):
        batch_size = src.size(0)

        # 1. Шаг энкодера
        embedded_src = self.embedding(src)                           # [B, T_src, E]
        encoder_outputs, (h, c) = self.encoder(embedded_src)          # enc_outputs: [B, T_src, H]

        # 2. Цикл декодера (без teacher forcing)
        input_token = torch.full((batch_size,), self.start_token_id,
                                 dtype=torch.long, device=src.device)

        logits_history = []
        attn_history = []
        dec_hidden, dec_cell = h, c

        for _ in range(self.max_len):
            embedded_t = self.embedding(input_token).unsqueeze(1)     # [B, 1, E]
            dec_output, (dec_hidden, dec_cell) = self.decoder(embedded_t, (dec_hidden, dec_cell))  # [B, 1, H]

            # CrossAttention
            context, attn = self.cross_attn(encoder_outputs, dec_output)   # [B, 1, H], [B, 1, T_src]
            attn_history.append(attn)

            # Конкатенируем decoder state + context
            concat_vec = torch.cat([dec_output, context], dim=-1)     # [B, 1, 2H]
            step_logits = self.lm_head(concat_vec)                    # [B, 1, vocab]
            logits_history.append(step_logits)

            # Берём токен с максимальной вероятностью
            input_token = step_logits.argmax(dim=-1).squeeze(1)       # [B]

        logits = torch.cat(logits_history, dim=1)                     # [B, max_len, vocab]
        attn_history = torch.cat(attn_history, dim=1)                 # [B, max_len, T_src]
        return logits, attn_history

In [None]:
def test_shapes():
    vocab_size = 50
    embed_dim = 16
    hidden_dim = 32
    start_token_id = 1
    max_len = 5

    model = EncoderDecoderWithAttention(vocab_size, embed_dim, hidden_dim,
                                        start_token_id=start_token_id,
                                        max_len=max_len)

    src = torch.randint(0, vocab_size, (2, 7))   # batch=2, src_len=7

    logits, attn = model(src)

    assert logits.shape == (2, max_len, vocab_size), f"Неправильный размер логитов: {logits.shape}"
    assert attn.shape == (2, max_len, src.size(1)), f"Неправильный размер весов внимания: {attn.shape}"
    print("Shapes test passed")

In [None]:
def test_greedy_generation():
    vocab_size = 10
    embed_dim = 8
    hidden_dim = 16
    start_token_id = 0
    max_len = 3

    model = EncoderDecoderWithAttention(vocab_size, embed_dim, hidden_dim,
                                        start_token_id=start_token_id,
                                        max_len=max_len)

    src = torch.randint(0, vocab_size, (1, 4))   # batch=1
    logits, attn = model(src)

    preds = logits.argmax(dim=-1)  # \[1, max_len\]
    print("Предсказанная последовательность:", preds.tolist())
    print("Веса внимания:\\n", attn)

In [None]:
# Run tests
test_shapes()
test_greedy_generation()

Shapes test passed
Предсказанная последовательность: [[1, 1, 1]]
Веса внимания:\n tensor([[[0.2481, 0.2556, 0.2512, 0.2451],
         [0.2483, 0.2549, 0.2504, 0.2463],
         [0.2485, 0.2545, 0.2502, 0.2469]]], grad_fn=<CatBackward0>)


# Task 3

In [None]:
class EncoderDecoderWithAttention(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, start_token_id):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.encoder = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.decoder = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.cross_attn = CrossAttentionLayer(d_model=hidden_dim, d_k=hidden_dim, d_v=hidden_dim)
        self.lm_head = nn.Linear(hidden_dim * 2, vocab_size)
        self.start_token_id = start_token_id

    def forward(self, src, tgt):
        """
        src: [B, T_src]
        tgt: [B, T_tgt]   (целевая последовательность с <eos>-токеном в конце)
        """
        batch_size, tgt_len = tgt.shape
        # 1. Шаг энкодера
        embedded_src = self.embedding(src)
        encoder_outputs, (h, c) = self.encoder(embedded_src)

        # 2. Вход декодера
        start_tokens = torch.full((batch_size, 1), self.start_token_id,
                                  dtype=torch.long, device=src.device)  # [B, 1]
        decoder_inputs = torch.cat([start_tokens, tgt[:, :-1]], dim=1)  # shift right
        embedded_trg = self.embedding(decoder_inputs)


        # 3. Шаг декодера (с teacher forcing)
        dec_output, (dec_hidden, dec_cell) = self.decoder(embedded_trg, (h, c))

        context, attn = self.cross_attn(encoder_outputs, dec_output)

        # 4. Линейный слой (LM head)
        concat_vec = torch.cat([dec_output, context], dim=-1)
        logits = self.lm_head(concat_vec)
        return logits, attn

In [None]:
def test_teacher_forcing():
    vocab_size = 20
    embed_dim = 8
    hidden_dim = 16
    start_token_id = 0

    model = EncoderDecoderWithAttention(vocab_size, embed_dim, hidden_dim, start_token_id)

    src = torch.randint(0, vocab_size, (2, 5))   # batch=2, src_len=5
    tgt = torch.randint(0, vocab_size, (2, 6))   # batch=2, tgt_len=6

    logits, attn = model(src, tgt)

    # Check shapes
    assert logits.shape == (2, 6, vocab_size)
    assert attn.shape == (2, 6, src.size(1))
    print("Размерности логитов и весов внимания совпадают")


    # Check loss computation works
    criterion = nn.CrossEntropyLoss()
    loss = criterion(logits.view(-1, vocab_size), tgt.reshape(-1))
    print("Значение лосса:", loss.item())


# Run test
test_teacher_forcing()


Размерности логитов и весов внимания совпадают
Значение лосса: 2.994875192642212


# Lesson 2 Как работает универсальная Seq2Seq-модель T5

# Task 2-1

In [None]:
import math
import random
from typing import List, Tuple

In [None]:
random.seed(42)

MASK_RATE = 0.15
MEAN_SPAN = 3.0

In [None]:
def sample_poisson(lam: float) -> int:
    """Сэмплирование из распределения Пуассона."""
    return int(np.random.poisson(lam, 1).squeeze())

In [None]:
def span_corruption(tokens: List[str], mask_rate: float = MASK_RATE, mean_span: float = MEAN_SPAN) -> List[Tuple[int, int]]:
    """
    TODO: Верните список спанов для маскирования.
    Каждый спан — это (start, length).
    Маскируем ~mask_rate от общего числа токенов.
    """
    spans = []
    n = len(tokens)
    total_to_mask = max(1, int(round(n * mask_rate)))

    # ===== ВАШ КОД =====
    covered = 0
    i = 0
    # Двигаемся слева направо и сэмплируем спаны, пока не наберется нужная доля
    while covered < total_to_mask and i < n:
        span_len = sample_poisson(mean_span)
        spans.append((i, span_len))
        covered += span_len
        i += span_len + 1  # оставляем зазор
    return spans

In [None]:
def prepare_pair(tokens: List[str], spans: List[Tuple[int, int]]) -> Tuple[str, str]:
    """
    TODO: Построить corrupted_input и target_output.
    Правила:
      - Во входе каждый спан заменяем на <extra_id_k>.
      - В выходе: <extra_id_k> + содержимое спана (все токены).
      - Сентинелы нумеруются слева направо.
    """
    corrupted = []
    target = []
    last_idx = 0
    sentinel_id = 0


    # ===== ВАШ КОД =====
    for start, length in spans:
        # Копируем токены до спана
        corrupted.extend(tokens[last_idx:start])
        sentinel = f"<extra_id_{sentinel_id}>"
        corrupted.append(sentinel)

        # Заполняем target: сентинел + вырезанный спан
        target.append(sentinel)
        target.extend(tokens[start:start+length])

        last_idx = start + length
        sentinel_id += 1

    # Добавляем хвост после последнего спана
    corrupted.extend(tokens[last_idx:])
    # Добавляем eos
    target.append("<eos>")

    corrupted_text = " ".join(corrupted)
    target_text = " ".join(target)
    return corrupted_text, target_text

In [None]:
# ===== Пример использования =====
tokens = "Модель T5 обучается с помощью span corruption".split()
spans = span_corruption(tokens, mask_rate=0.3)
inp, out = prepare_pair(tokens, spans)

print("Tokens :", tokens)
print("Spans  :", spans)
print("Input  :", inp)
print("Target :", out)

Tokens : ['Модель', 'T5', 'обучается', 'с', 'помощью', 'span', 'corruption']
Spans  : [(0, 3)]
Input  : <extra_id_0> с помощью span corruption
Target : <extra_id_0> Модель T5 обучается <eos>


In [None]:
# >>> print("Tokens :", tokens)
# Tokens : ['Модель', 'T5', 'обучается', 'с', 'помощью', 'span', 'corruption']
# >>> print("Spans  :", spans)
# Spans  : [(0, 1), (2, 2)]
# >>> print("Input  :", inp)
# <extra_id_0> T5 <extra_id_1> помощью span corruption
# >>> print("Target :", out)
# <extra_id_0> Модель <extra_id_1> обучается с <eos>

# LEsson 3 Метрики в задачах Seq2Seq

# Task 3-1   BLEU

In [None]:
import math
from collections import Counter

In [None]:
pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.6-py3-none-any.whl.metadata (9.5 kB)
Downloading evaluate-0.4.6-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: evaluate
Successfully installed evaluate-0.4.6


In [None]:
def compute_bleu(candidate, reference, max_order=2):
    cand_tokens = candidate.split()
    ref_tokens = reference.split()

    precisions = []
    for n in range(1, max_order+1):
        # === ВАШ КОД ===
        # Посчитайте n-граммную точность для разных n
        cand_ngrams = Counter([tuple(cand_tokens[i:i+n]) for i in range(len(cand_tokens)-n+1)])
        ref_ngrams = Counter([tuple(ref_tokens[i:i+n]) for i in range(len(ref_tokens)-n+1)])

        overlap = {ng: min(count, ref_ngrams[ng]) for ng, count in cand_ngrams.items()}
        p_n = sum(overlap.values()) / max(1, sum(cand_ngrams.values()))
        precisions.append(p_n)

    # Brevity Penalty
    c, r = len(cand_tokens), len(ref_tokens)
    BP = 1 if c > r else math.exp(1 - r/c)

    bleu = BP * math.exp(sum([1/max_order * math.log(p+1e-6) for p in precisions]))
    return bleu

In [None]:
print("BLEU (ваша реализация): ", compute_bleu("Ходор держал дверь", "Ходор закрыл дверь"))

import evaluate
reference_bleu = evaluate.load("bleu")


BLEU (ваша реализация):  0.000816497193299932


In [None]:
results = reference_bleu.compute(predictions=['candidate'], references=['reference'], tokenizer=lambda x: x.split(), max_order=2)
print("BLEU (референс): ", results["bleu"])

BLEU (референс):  0.0


# Task 3-2   ROUGE

In [None]:
def lcs(X, Y):  # поиск lcs с помощью динамического программирования
    m, n = len(X), len(Y)
    dp = [[0]*(n+1) for _ in range(m+1)]
    for i in range(m):
        for j in range(n):
            if X[i] == Y[j]:
                dp[i+1][j+1] = dp[i][j] + 1
            else:
                dp[i+1][j+1] = max(dp[i][j+1], dp[i+1][j])
    return dp[m][n]

In [None]:
pip install rouge_score

Collecting rouge_score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge_score
  Building wheel for rouge_score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge_score: filename=rouge_score-0.1.2-py3-none-any.whl size=24934 sha256=1b9f6273f041b50d0035c05ccbb333824bc407e8420456fbd505338aefcb863d
  Stored in directory: /root/.cache/pip/wheels/85/9d/af/01feefbe7d55ef5468796f0c68225b6788e85d9d0a281e7a70
Successfully built rouge_score
Installing collected packages: rouge_score
Successfully installed rouge_score-0.1.2


In [None]:
def rouge_l(candidate, reference):
    cand_tokens, ref_tokens = candidate.split(), reference.split()
    lcs_len = lcs(cand_tokens, ref_tokens)

    precision = lcs_len / len(cand_tokens)
    recall = lcs_len / len(ref_tokens)
    f1 = 2 * precision * recall / (precision + recall)
    return precision, recall, f1

candidate, reference = "Ходор держал дверь", "Ходор держал дверь, чтобы Бран мог спастись"
print("Ваш Rouge-L: ", rouge_l(candidate, reference)[-1])

reference_rouge = evaluate.load('rouge')

print("Референсный Rouge-L: ", reference_rouge.compute(predictions=[candidate], references=[reference], tokenizer=lambda x: x.split())['rougeL'])

Ваш Rouge-L:  0.4
Референсный Rouge-L:  0.4


# Task 3-3

In [None]:
import torch
from transformers import AutoTokenizer, AutoModel

In [None]:
# Загружаем модель
tokenizer = AutoTokenizer.from_pretrained("cointegrated/rubert-tiny2")
model = AutoModel.from_pretrained("cointegrated/rubert-tiny2")
model.eval()

tokenizer_config.json:   0%|          | 0.00/401 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/693 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/118M [00:00<?, ?B/s]

BertModel(
  (embeddings): BertEmbeddings(
    (word_embeddings): Embedding(83828, 312, padding_idx=0)
    (position_embeddings): Embedding(2048, 312)
    (token_type_embeddings): Embedding(2, 312)
    (LayerNorm): LayerNorm((312,), eps=1e-12, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): BertEncoder(
    (layer): ModuleList(
      (0-2): 3 x BertLayer(
        (attention): BertAttention(
          (self): BertSdpaSelfAttention(
            (query): Linear(in_features=312, out_features=312, bias=True)
            (key): Linear(in_features=312, out_features=312, bias=True)
            (value): Linear(in_features=312, out_features=312, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (output): BertSelfOutput(
            (dense): Linear(in_features=312, out_features=312, bias=True)
            (LayerNorm): LayerNorm((312,), eps=1e-12, elementwise_affine=True)
            (dropout): Dropout(p=0.1, inplace=False)

In [None]:
def get_embeddings(text: str):
    """Возвращает эмбеддинги токенов без CLS/SEP."""
    inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
    with torch.no_grad():
        outputs = model(**inputs)
    # Последний слой: [batch, seq_len, hidden_size]
    embeddings = outputs.last_hidden_state.squeeze(0)
    return embeddings

In [None]:
def bertscore_pair(hyp, ref):
    # Получаем эмбеддинги
    h = get_embeddings(hyp)   # [len_h, d]
    r = get_embeddings(ref)   # [len_r, d]

    # Нормировка эмбеддингов
    h = torch.nn.functional.normalize(h, p=2, dim=1)
    r = torch.nn.functional.normalize(r, p=2, dim=1)

    # Косинусное расстояние
    sim = torch.matmul(h, r.T)  # [len_h, len_r]

    # Precision: для каждого токена h берем max по r
    P = sim.max(dim=1).values.mean().item()
    # Recall: для каждого токена r берем max по h
    R = sim.max(dim=0).values.mean().item()
    # F1
    F1 = 2 * P * R / (P + R + 1e-8)

    return P, R, F1

In [None]:
# Пример
hyp = "Сегодня будет краткий дождь и прохладный ветер."
ref = "Сегодня ожидается непродолжительный дождь и прохладный ветер."

P, R, F1 = bertscore_pair(hyp, ref)
print(f"P={P:.4f}, R={R:.4f}, F1={F1:.4f}") # P=0.9045, R=0.9048, F1=0.9047

P=0.9045, R=0.9048, F1=0.9047


# Lesson 4 Машинный перевод. История, сбор данных и бенчмарки

# Task 1

In [1]:
import numpy as np
from scipy.spatial.distance import cosine
from sentence_transformers import SentenceTransformer



In [2]:
model_name = 'distiluse-base-multilingual-cased'
model_st = SentenceTransformer(model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/341 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/607 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/539M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/528 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/114 [00:00<?, ?B/s]

2_Dense/model.safetensors:   0%|          | 0.00/1.58M [00:00<?, ?B/s]

In [3]:
src = """Ветер свистел над стенами Винтерфелла, заставляя знамена домов дрожать в преддверии заката. Джон Сноу стоял на бастионе, наблюдая за тёмными лесами, где тени деревьев казались живыми. Он чувствовал надвигающуюся опасность, словно невидимые шаги Белых ходоков приближались к стенам крепости."""
trg = """The wind whistled over the walls of Winterfell, making the banners of the houses flutter in the approaching dusk. Jon Snow stood on the battlement, watching the dark forests where the shadows of the trees seemed alive. He felt the impending danger, as if the invisible footsteps of the White Walkers were drawing closer to the castle walls."""

In [4]:
srcs = src.split(".")
trgs = trg.split(".")

In [5]:
src_embeds = model_st.encode(srcs)
trg_embeds = model_st.encode(trgs)

In [6]:
def get_sim_matrix(a, b):
    sim_matrix = np.zeros((len(a), len(b)))
    for i in range(len(a)):
        for j in range(len(b)):
            sim = 1 - cosine(a[i], b[j])
            sim_matrix[i,j] = sim
    return sim_matrix

In [7]:
sim_matrix = get_sim_matrix(src_embeds, trg_embeds)

In [8]:
np.testing.assert_array_equal(sim_matrix.argmax(1), np.arange(len(src_embeds)))

In [9]:
sim_matrix.argmax(1)

array([0, 1, 2, 3])

In [10]:
sim_matrix

array([[ 0.86990279,  0.16831946,  0.39739895, -0.01088417],
       [ 0.19795883,  0.91262597,  0.24400645,  0.00389701],
       [ 0.45714658,  0.27789557,  0.85473549,  0.00680411],
       [-0.01302063, -0.00837553,  0.02163905,  1.        ]])