In [1]:
#загружаем нужные версии
!pip uninstall -y torch torchtext
!pip install torch==2.3.0
!pip install torchtext==0.18
!pip install sacrebleu

Found existing installation: torch 2.5.1+cu121
Uninstalling torch-2.5.1+cu121:
  Successfully uninstalled torch-2.5.1+cu121
[0mCollecting torch==2.3.0
  Downloading torch-2.3.0-cp310-cp310-manylinux1_x86_64.whl.metadata (26 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch==2.3.0)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch==2.3.0)
  Downloading nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch==2.3.0)
  Downloading nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch==2.3.0)
  Downloading nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch==2.3.0)
  Downloading nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl.metadata

In [2]:
# Основные библиотеки PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch import Tensor
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torch.optim.lr_scheduler import LambdaLR

# Токенизация и работа с текстом
import sentencepiece as spm
from sentencepiece import SentencePieceTrainer, SentencePieceProcessor

# Обработка данных
import numpy as np
import pandas as pd
import re
import os
from typing import Union, List, Tuple, Type, Optional, Any

# Утилиты
from tqdm import tqdm
from sklearn.model_selection import train_test_split




In [3]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import sentencepiece as spm
from sentencepiece import SentencePieceTrainer, SentencePieceProcessor

class TextDataset(Dataset):
    def __init__(self, german_file: str, english_file: str = None, 
                 tokenizer_prefix: str = "tokenizer_", vocab_capacity: int = 12000, 
                 token_model: str = "word", norm_rule: str = "nmt_nfkc_cf", 
                 seq_max_len: int = 128):
        """
        german_file: путь к файлу с предложениями на немецком
        english_file: путь к файлу с предложениями на английском (целевой язык)
        tokenizer_prefix: префикс имени файлов для сохранения обученных моделей токенизаторов
        vocab_capacity: размер vocab (количество токенов) для SentencePiece
        token_model: тип модели токенизатора ('word', 'bpe', 'unigram', etc.)
        norm_rule: правило нормализации (по умолчанию 'nmt_nfkc_cf' для Unicode NFKC)
        seq_max_len: ограничение на максимальную длину последовательности (с учетом BOS/EOS)
        """
        # Если модели токенизаторов не сохранены, обучаем их на тексте корпусов
        if not os.path.isfile(tokenizer_prefix + 'german.model'):
            # Обучаем SentencePiece для немецкого корпуса
            SentencePieceTrainer.train(
                input=german_file, model_prefix=tokenizer_prefix + 'german',
                vocab_size=vocab_capacity, model_type=token_model,
                normalization_rule_name=norm_rule,
                pad_id=0, unk_id=1, bos_id=2, eos_id=3
            )
            # Обучаем SentencePiece для английского корпуса
            SentencePieceTrainer.train(
                input=english_file, model_prefix=tokenizer_prefix + 'english',
                vocab_size=vocab_capacity, model_type=token_model,
                normalization_rule_name=norm_rule,
                pad_id=0, unk_id=1, bos_id=2, eos_id=3
            )
        # Загружаем обученные модели токенизаторов
        self.tokenizer_de = SentencePieceProcessor(model_file=tokenizer_prefix + 'german.model')
        self.tokenizer_en = SentencePieceProcessor(model_file=tokenizer_prefix + 'english.model')
        
        # Читаем все предложения из файлов
        with open(german_file, encoding='utf-8') as f:
            text_german = f.readlines()
        total_sentences = len(text_german)
        self.texts_de = text_german[:total_sentences]
        self.indices_de = self.tokenizer_de.encode(self.texts_de)  # токенизируем весь немецкий корпус
        
        if english_file is not None:
            with open(english_file, encoding='utf-8') as f:
                text_english = f.readlines()
            self.texts_en = text_english[:total_sentences]
            self.indices_en = self.tokenizer_en.encode(self.texts_en)  # токенизируем весь английский корпус
        else:
            self.texts_en = None
            self.indices_en = None
        
        # Сохраняем ID специальных токенов из токенизатора (должны совпадать с заданными выше)
        self.pad_token = self.tokenizer_en.pad_id()    # == 0
        self.unk_token = self.tokenizer_en.unk_id()    # == 1
        self.start_token = self.tokenizer_en.bos_id()  # == 2 (BOS)
        self.end_token = self.tokenizer_en.eos_id()    # == 3 (EOS)
        self.seq_max_len = seq_max_len
        # Размеры словарей исходного и целевого языков
        self.vocab_size_de = self.tokenizer_de.vocab_size()
        self.vocab_size_en = self.tokenizer_en.vocab_size()
    
    def __len__(self):
        return len(self.texts_de)
    
    def _encode_text(self, text: str, lang: str):
        """Токенизировать строку с помощью соответствующего токенизатора"""
        return (self.tokenizer_en.encode(text) if lang == 'English'
                else self.tokenizer_de.encode(text))
    
    def _process_sentence(self, sentence: str, lang: str):
        """Добавить BOS, EOS и привести предложение к списку токенов (с обрезкой по seq_max_len)."""
        tokens = [self.start_token] + self._encode_text(sentence, lang) + [self.end_token]
        # Ограничиваем длину последовательности, если она превышает максимум
        if len(tokens) > self.seq_max_len:
            tokens = tokens[:self.seq_max_len]
        return tokens
    
    def __getitem__(self, idx: int):
        # Получаем список токенов для немецкого предложения (источник)
        src_tokens = self._process_sentence(self.texts_de[idx].strip(), lang='German')
        if self.texts_en is not None:
            # Если есть целевой язык (обучение/валидация) - возвращаем пару (src, tgt)
            tgt_tokens = self._process_sentence(self.texts_en[idx].strip(), lang='English')
            return torch.tensor(src_tokens, dtype=torch.long), torch.tensor(tgt_tokens, dtype=torch.long)
        else:
            # Если target не задан (например, тестовый датасет) - возвращаем (src, пустой таргет)
            dummy_target = [self.pad_token] * len(src_tokens)
            return torch.tensor(src_tokens, dtype=torch.long), torch.tensor(dummy_target, dtype=torch.long)

def collate_fn(batch):
    """
    Функция для формирования батча из списка пар (src, tgt),
    дополняет последовательности паддингом до максимальной длины в батче.
    """
    src_batch, tgt_batch = zip(*batch)
    # Преобразуем список тензоров в один тензор [batch_size, seq_len] с padding
    src_batch = pad_sequence(src_batch, padding_value=0, batch_first=True)
    tgt_batch = pad_sequence(tgt_batch, padding_value=0, batch_first=True)
    return src_batch, tgt_batch


In [4]:
import math
import torch.nn as nn
from torch.nn import Transformer

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Модуль позиционного кодирования: добавляет информацию о позиции токена
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout: float = 0.1, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        # Вычисляем позиционные приучения для maxlen позиций заранее
        pos = torch.arange(0, maxlen).unsqueeze(1)           # shape: [maxlen, 1]
        i = torch.arange(0, emb_size, 2)                     # индексы 0,2,4,... для синусоид
        # Формула из работы "Attention is All You Need": PE(pos, 2i) = sin(pos/10000^(2i/emb_size))
        angle_rates = 1 / torch.pow(10000, (i.float()/emb_size))
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * angle_rates)  # синус для четных индексов
        pos_embedding[:, 1::2] = torch.cos(pos * angle_rates)  # косинус для нечетных индексов
        pos_embedding = pos_embedding.unsqueeze(1)  # shape: [maxlen, 1, emb_size]
        self.register_buffer('pos_embedding', pos_embedding)  # не обучаемый параметр
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, token_embedding: torch.Tensor) -> torch.Tensor:
        # token_embedding: shape [seq_len, batch_size, emb_size] (если batch_first=False)
        # Добавляем позиционные векторы к эмбеддингам и применяем Dropout
        seq_len = token_embedding.size(0)
        return self.dropout(token_embedding + self.pos_embedding[:seq_len, :])

# Модуль токен-эмбеддинга: обучаемый слой, преобразующий индексы в векторы
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size: int):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size
    def forward(self, tokens: torch.Tensor) -> torch.Tensor:
        # Возвращаем эмбеддинги, масштабированные на sqrt(emb_size) для стабильности
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Полная модель Seq2Seq с механизмом Transformer
class Seq2SeqTransformer(nn.Module):
    def __init__(self, num_encoder_layers: int, num_decoder_layers: int,
                 emb_size: int, nhead: int,
                 src_vocab_size: int, tgt_vocab_size: int,
                 dim_feedforward: int = 512, dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        # Инициализируем энкодер-декодер Transformer из PyTorch
        self.transformer = Transformer(d_model=emb_size, nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        # Финальный линейный генератор вероятностей слов
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        # Обучаемые эмбеддинги для источника и цели + позиционное кодирование
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)
    
    def forward(self, src: torch.Tensor, tgt: torch.Tensor, 
                src_mask: torch.Tensor, tgt_mask: torch.Tensor,
                src_padding_mask: torch.Tensor, tgt_padding_mask: torch.Tensor,
                memory_key_padding_mask: torch.Tensor) -> torch.Tensor:
        """
        Выполняет проход (encoder + decoder) и возвращает сырые логиты размером [tgt_seq_len, batch_size, tgt_vocab_size].
        """
        # Получаем эмбеддинги входа и выхода с позиц.кодированием
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(tgt))
        # Прогоняем через Transformer: 
        # src_mask - маска для энкодера (0 будущее), tgt_mask - маска будущего для декодера
        # src_padding_mask/tgt_padding_mask - маски паддинга для энкодера и декодера
        output = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, 
                                  None,  # нет явной маски между энкодером и декодером
                                  src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        logits = self.generator(output)  # Преобразуем скрытое представление в логиты слов
        return logits
    
    def encode(self, src: torch.Tensor, src_mask: torch.Tensor):
        """Пропустить только через энкодер (для инференса)."""
        return self.transformer.encoder(self.positional_encoding(self.src_tok_emb(src)), src_mask)
    
    def decode(self, tgt: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor):
        """Пропустить только через декодер (для инференса, принимает уже полученную память энкодера)."""
        return self.transformer.decoder(self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask)


In [5]:
def generate_square_subsequent_mask(sz: int) -> torch.Tensor:
    """Создаёт маску размера sz x sz для запрета вниманию видеть последующие элементы."""
    mask = torch.triu(torch.ones(sz, sz, device=DEVICE)) == 1  # верхнетреугольная матрица из True
    mask = mask.transpose(0, 1)  # приводим к нижнетреугольной (True на диагонали и ниже)
    mask = mask.float().masked_fill(mask == False, float('-inf')).masked_fill(mask == True, float(0.0))
    return mask

def create_mask(src: torch.Tensor, tgt: torch.Tensor):
    src_seq_len = src.shape[0]   # длина последовательности источника (seq_len)
    tgt_seq_len = tgt.shape[0]   # длина последовательности цели
    # Маска будущих слов для декодера
    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    # Маска для энкодера (нам не нужно ограничивать видимость, делаем булеву матрицу 0)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)
    # Маски заполнения (True там, где PAD) для входа и выхода (требуется форма [batch, seq_len])
    src_padding_mask = (src == PAD_IDX).transpose(0, 1)  # [batch_size, src_seq_len]
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)  # [batch_size, tgt_seq_len]
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask


In [7]:
from tqdm import tqdm

def train_epoch(model: nn.Module, optimizer, data_loader: DataLoader):
    model.train()
    total_loss = 0
    for src, tgt in tqdm(data_loader, desc="Training"):
        src = src.to(DEVICE).transpose(0, 1)   # [seq_len, batch_size]
        tgt = tgt.to(DEVICE).transpose(0, 1)   # [seq_len, batch_size]
        # Разделяем целевые последовательности: 
        tgt_input = tgt[:-1, :]   # все кроме последнего токена (вход декодеру)
        tgt_out   = tgt[1:, :]    # все кроме первого токена (ожидаемый выход)
        # Генерируем маски
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
        # Прямой проход (forward)
        logits = model(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, 
                       memory_key_padding_mask=src_padding_mask)
        # Вычисляем loss: приводим прогноз и целевые токены к размерности [-1] для функционала потерь
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        # Обратное распространение и шаг оптимизатора
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(data_loader)

def evaluate(model: nn.Module, data_loader: DataLoader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for src, tgt in tqdm(data_loader, desc="Validation"):
            src = src.to(DEVICE).transpose(0, 1)
            tgt = tgt.to(DEVICE).transpose(0, 1)
            tgt_input = tgt[:-1, :]
            tgt_out   = tgt[1:, :]
            src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
            logits = model(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask,
                           memory_key_padding_mask=src_padding_mask)
            loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
            total_loss += loss.item()
    return total_loss / len(data_loader)


In [10]:
from timeit import default_timer as timer
import torch.optim.lr_scheduler as lr_scheduler

torch.manual_seed(0)

# Параметры данных
BATCH_SIZE = 128

# Инициализируем датасеты для обучения и валидации
train_dataset = TextDataset(german_file="train.de", english_file="train.en", 
                            tokenizer_prefix="tokenizer_", is_training=True)
val_dataset   = TextDataset(german_file="val.de", english_file="val.en", 
                            tokenizer_prefix="tokenizer_", is_training=False)

# DataLoader для итерации по батчам
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

# Специальные индексы (PAD, UNK, BOS, EOS) – должны соответствовать настройкам токенизатора
PAD_IDX, UNK_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3

# Размеры словарей исходного и целевого языков
SRC_VOCAB_SIZE = train_dataset.vocab_size_de
TGT_VOCAB_SIZE = train_dataset.vocab_size_en

# Гиперпараметры модели
EMB_SIZE = 512     # размер эмбеддингов и скрытого представления модели
NHEAD = 8          # количество "голов" multi-head attention
FFN_HID_DIM = 1024 # размер слоя feed-forward внутри трансформера
NUM_ENCODER_LAYERS = 6
NUM_DECODER_LAYERS = 6

# Инициализируем модель
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE, 
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)
# Инициализация параметров весов
for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)
# Переносим модель на устройство (CPU/GPU)
transformer = transformer.to(DEVICE)

# Функция потерь (кросс-энтропия), игнорируем индекс PAD
loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

# Оптимизатор Adam
optimizer = torch.optim.Adam(transformer.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9, weight_decay=1e-5)

# Планировщик OneCycleLR: один цикл изменения learning rate за все эпохи
NUM_EPOCHS = 20
total_steps = NUM_EPOCHS * len(train_loader)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=5e-4, total_steps=total_steps, 
                                                pct_start=0.1, anneal_strategy='cos', final_div_factor=10)




In [None]:
from timeit import default_timer as timer

for epoch in range(1, NUM_EPOCHS + 1):
    start_time = timer()
    train_loss = train_epoch(transformer, optimizer, train_loader)
    val_loss   = evaluate(transformer, val_loader)
    end_time = timer()
    scheduler.step()  # шаг изменения learning rate
    
    print(f"Эпоха {epoch}: средний loss на обучении = {train_loss:.3f}, на валидации = {val_loss:.3f}, время эпохи = {end_time - start_time:.2f}с")
    # Сохраняем модель и оптимизатор
    checkpoint_path = f"model_epoch_{epoch}.pth"
    torch.save({
        "model_state_dict": transformer.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
        "epoch": epoch,
        "val_loss": val_loss
    }, checkpoint_path)
    print(f"✔️ Модель сохранена в {checkpoint_path}")


In [None]:
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    model.eval()
    # Прогоняем через энкодер
    memory = model.encode(src, src_mask)
    # Начинаем с последовательности, состоящей только из BOS
    ys = torch.tensor([[start_symbol]], dtype=torch.long, device=DEVICE)  # shape [1,1]
    for i in range(max_len - 1):
        # Создаем маску для уже набранной последовательности (размер i+1)
        tgt_mask = generate_square_subsequent_mask(ys.size(0)).to(DEVICE)
        # Пропускаем через декодер текущую последовательность (ys) и память энкодера
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)  # теперь размер [batch=1, seq_len, emb_size]
        # Прогнозируем следующий токен через генератор модели
        prob = model.generator(out[:, -1])        # логиты для последнего токена
        _, next_word = torch.max(prob, dim=1)     # выбираем индекс с максимальной вероятностью
        next_word = next_word.item()
        # Добавляем предсказанный токен к последовательности
        ys = torch.cat([ys, torch.tensor([[next_word]], device=DEVICE)], dim=0)
        if next_word == EOS_IDX:
            break
    return ys

def translate_sentence(model, sentence: str, tokenizer_src, tokenizer_tgt, max_len: int = 128):
    model.eval()
    # Токенизируем входное предложение (без добавления BOS/EOS, т.к. модель ожидает чистый текст)
    src_tokens = tokenizer_src.encode(sentence)
    src_tensor = torch.tensor(src_tokens, dtype=torch.long).unsqueeze(1).to(DEVICE)  # [seq_len, 1]
    src_mask = torch.zeros(src_tensor.shape[0], src_tensor.shape[0], device=DEVICE).type(torch.bool)
    # Выполняем greedy decode
    tgt_tokens = greedy_decode(model, src_tensor, src_mask, max_len=max_len, start_symbol=BOS_IDX)
    tgt_tokens = tgt_tokens.flatten().cpu().tolist()  # в список индексов
    # Преобразуем последовательность индексов в текст, исключая BOS
    # (SentencePiece.decode самостоятельно игнорирует специальные токены)
    translated_text = tokenizer_tgt.decode(tgt_tokens)
    return translated_text


In [None]:
def translate_sentence(model, sentence, tokenizer_src, tokenizer_tgt):
    model.eval()

    src_tokens = tokenizer_src.encode(sentence)
    src_tensor = torch.tensor(src_tokens, dtype=torch.long).unsqueeze(1).to(DEVICE)

    src_mask = torch.zeros(src_tensor.shape[0], src_tensor.shape[0], device=DEVICE).type(torch.bool)

    tgt_tokens = greedy_decode(model, src_tensor, src_mask, max_len=128, start_symbol=2).flatten()

    translated_sentence = tokenizer_tgt.decode(tgt_tokens.cpu().tolist())

    return translated_sentence


In [None]:
# Загрузка обученных токенизаторов (если не сохранены ранее)
tokenizer_de = spm.SentencePieceProcessor(model_file="tokenizer_german.model")
tokenizer_en = spm.SentencePieceProcessor(model_file="tokenizer_english.model")

# Пример перевода
sentence_de = "Eine Gruppe von Menschen steht vor einem Iglu."
translation = translate_sentence(transformer, sentence_de, tokenizer_de, tokenizer_en)
print("🔹 Исходное (DE):", sentence_de)
print("🔹 Перевод (EN):", translation)
