## Архитетура модели

Я буду использовать модель, описанную в этом ноутбке

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [None]:
def generate_causal_mask(size):
    return torch.triu(torch.ones(size, size), diagonal=1).bool()

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        self.d_model = d_model
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x * math.sqrt(self.d_model)  # Теперь self.d_model доступен
        x = x + self.pe[:, :x.size(1), :]
        return x

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        
        assert self.head_dim * num_heads == d_model, "d_model must be divisible by num_heads"
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask, -1e9)  # Изменено == 0 на mask
        
        attn_probs = F.softmax(attn_scores, dim=-1)
        return torch.matmul(attn_probs, V)
        
    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        
        output = self.W_o(attn_output)
        return output

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, x):
        x = self.dropout(F.relu(self.linear1(x)))
        x = self.linear2(x)
        return x

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, x, mask=None):
        # Self attention
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        # Feed forward
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        # Self attention
        attn_output = self.self_attn(x, x, x, tgt_mask)  # Убрано создание маски
        x = self.norm1(x + self.dropout(attn_output))
        
        # Cross attention
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        
        # Feed forward
        ffn_output = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_output))
        return x

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8, num_layers=6):
        super().__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads) for _ in range(num_layers)])
        
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        
    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # Encoding
        src_emb = self.positional_encoding(self.encoder_embedding(src))
        enc_output = src_emb
        for layer in self.encoder_layers:
            enc_output = layer(enc_output, src_mask)
        
        # Decoding
        tgt_emb = self.positional_encoding(self.decoder_embedding(tgt))
        dec_output = tgt_emb
        for layer in self.decoder_layers:
            dec_output = layer(dec_output, enc_output, src_mask, tgt_mask)
        
        # Output layer
        output = self.fc_out(dec_output)
        return output

In [None]:
import re
import pandas as pd
import numpy as np

import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from nltk.translate.bleu_score import corpus_bleu

In [None]:
df = pd.read_csv('/kaggle/input/frenchenglish/fra.txt', sep="\t", header=None)
df = df.drop(columns=2)
df = df.rename(columns={0: 'en', 1: 'fr'})

In [None]:
df

In [None]:
# 1. Подготовка данных
class TranslationDataset(Dataset):
    def __init__(self, file_path, src_lang='en', tgt_lang='fr', max_samples=50000):
        self.data = pd.read_csv(file_path, sep='\t', header=None).sample(max_samples)

        self.data = self.data.drop(columns=2)
        self.data = self.data.rename(columns={0: 'en', 1: 'fr'})
        self.data = self.data.dropna()
        self.data = self.data.drop_duplicates()

        # Добавление специальных токенов
        self.special_tokens = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
        
        # Создание словарей
        self.src_vocab = self.build_vocab(self.data[src_lang])
        self.tgt_vocab = self.build_vocab(self.data[tgt_lang])
        
        
    def build_vocab(self, sentences, min_freq=2):
        vocab = {}
        word_counts = {}
        
        # Токенизация простым split
        for sentence in sentences:
            for word in re.findall(r"\w+|[^\w\s]", sentence.lower()):
                word_counts[word] = word_counts.get(word, 0) + 1
                
        # Фильтрация редких слов
        vocab = {word:i+len(self.special_tokens) for i, (word, count) in 
                enumerate([(k,v) for k,v in word_counts.items() if v >= min_freq])}
        
        # Добавление специальных токенов
        vocab.update(self.special_tokens)
        return vocab
    
    def sentence_to_ids(self, sentence, vocab):
        tokens = re.findall(r"\w+|[^\w\s]", sentence.lower())
        return [vocab.get(token, vocab['<unk>']) for token in tokens]
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        src_sentence = self.data.iloc[idx]['en']
        tgt_sentence = self.data.iloc[idx]['fr']
        
        # Конвертация в индексы
        src_ids = [self.src_vocab['<sos>']] + self.sentence_to_ids(src_sentence, self.src_vocab) + [self.src_vocab['<eos>']]
        tgt_ids = [self.tgt_vocab['<sos>']] + self.sentence_to_ids(tgt_sentence, self.tgt_vocab) + [self.tgt_vocab['<eos>']]
        
        return {
            'src': torch.LongTensor(src_ids),
            'tgt': torch.LongTensor(tgt_ids)
        }

# 2. Функция для паддинга
def collate_fn(batch):
    src_batch = [item['src'] for item in batch]
    tgt_batch = [item['tgt'] for item in batch]
    
    # Паддинг
    src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=0)
    tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, padding_value=0)
    
    return {
        'src': src_batch.transpose(0, 1),  # [batch, seq_len]
        'tgt': tgt_batch.transpose(0, 1)
    }

# 3. Инициализация данных
dataset = TranslationDataset('/kaggle/input/frenchenglish/fra.txt')
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, collate_fn=collate_fn, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, collate_fn=collate_fn)

# 4. Инициализация модели
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = Transformer(
    src_vocab_size=len(dataset.src_vocab),
    tgt_vocab_size=len(dataset.tgt_vocab),
    d_model=256,
    num_heads=4,
    num_layers=2
).to(device)

# 5. Обучение
optimizer = optim.Adam(model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss(ignore_index=0)

def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        if m.bias is not None:
            nn.init.zeros_(m.bias)
    elif isinstance(m, nn.Embedding):
        nn.init.normal_(m.weight, mean=0, std=0.01)
        
model.apply(init_weights)

def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    
    for batch in loader:
        src = batch['src'].to(device)
        tgt = batch['tgt'].to(device)
        
        # Подготовка данных для декодера
        tgt_input = tgt[:, :-1]  # Исключаем последний токен
        tgt_output = tgt[:, 1:]   # Исключаем первый токен (SOS)
        
        # Создание масок
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)  # [batch, 1, 1, src_len]
        
        # Маска для декодера: padding + causal
        tgt_padding_mask = (tgt_input != 0).unsqueeze(1).unsqueeze(2)  # [batch, 1, 1, tgt_len-1]
        causal_mask = generate_causal_mask(tgt_input.size(1)).to(device)
        causal_mask = causal_mask.unsqueeze(0).unsqueeze(0)  # [1, 1, tgt_len-1, tgt_len-1]
        tgt_mask = (tgt_padding_mask.to(torch.bool) | causal_mask.to(torch.bool))
        
        optimizer.zero_grad()
        
        output = model(src, tgt_input, src_mask, tgt_mask)
        
        # Расчет потерь
        loss = criterion(
            output.reshape(-1, output.size(-1)),
            tgt_output.reshape(-1)
        )
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) 
        optimizer.step()
        total_loss += loss.item()
    
    return total_loss / len(loader)

# 6. Валидация
def evaluate(model, loader, criterion, device, src_vocab, tgt_vocab):
    model.eval()
    total_loss = 0
    references = []
    hypotheses = []
    
    idx_to_word = {v: k for k, v in tgt_vocab.items()}
    
    with torch.no_grad():
        for batch in loader:
            # Исправляем передачу device
            src = batch['src'].to(device)
            tgt = batch['tgt'].to(device)
            
            # Подготовка данных для декодера
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]
            
            # Создание масок
            src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
            tgt_padding_mask = (tgt_input != 0).unsqueeze(1).unsqueeze(2)
            causal_mask = generate_causal_mask(tgt_input.size(1)).to(device)
            causal_mask = causal_mask.unsqueeze(0).unsqueeze(0)
            tgt_mask = tgt_padding_mask | causal_mask
            
            # Перенос масок на устройство
            src_mask = src_mask.to(device)
            tgt_mask = tgt_mask.to(device)
            
            # Forward pass
            output = model(src, tgt_input, src_mask, tgt_mask)
            
            # Расчет потерь
            loss = criterion(
                output.reshape(-1, output.size(-1)),
                tgt_output.reshape(-1)
            )
            total_loss += loss.item()
            
            # Генерация переводов для BLEU
            preds = output.argmax(dim=-1)
            for i in range(preds.size(0)):
                ref = [idx_to_word.get(idx.item(), '<unk>') for idx in tgt[i, 1:]]
                hyp = [idx_to_word.get(idx.item(), '<unk>') for idx in preds[i]]
                
                references.append([ref])
                hypotheses.append(hyp)
    
    bleu = corpus_bleu(references, hypotheses)
    return bleu

# 7. Основной цикл обучения
num_epochs = 20

for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device="cuda")
    val_bleu = evaluate(
        model, 
        val_loader, 
        criterion,  # Добавляем criterion для вычисления потерь
        device="cuda", 
        src_vocab=dataset.src_vocab, 
        tgt_vocab=dataset.tgt_vocab
    )
    
    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Train Loss: {train_loss:.4f} | Val BLEU: {val_bleu:.4f}')
    print('-'*50)

In [None]:
def translate(sentence, model, src_vocab, tgt_vocab, device, max_len=50, temperature=0.7):
    model.eval()
    
    # Токенизация с обработкой неизвестных слов
    tokens = re.findall(r"\w+|[^\w\s]", sentence.lower())
    src_ids = [src_vocab.get('<sos>', 1)] 
    src_ids += [src_vocab.get(tok, src_vocab['<unk>']) for tok in tokens]
    src_ids.append(src_vocab.get('<eos>', 2))
    
    src = torch.LongTensor(src_ids).unsqueeze(0).to(device)
    
    # Кодирование
    with torch.no_grad():
        src_emb = model.encoder_embedding(src)
        src_emb = model.positional_encoding(src_emb)
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        enc_output = src_emb
        for layer in model.encoder_layers:
            enc_output = layer(enc_output, src_mask.to(device))
    
    # Генерация с температурой
    tgt_ids = [tgt_vocab['<sos>']]
    for _ in range(max_len):
        tgt = torch.LongTensor(tgt_ids).unsqueeze(0).to(device)
        
        tgt_padding_mask = (tgt != 0).unsqueeze(1).unsqueeze(2).to(device)
        causal_mask = generate_causal_mask(tgt.size(1)).to(device)
        tgt_mask = (tgt_padding_mask | causal_mask.unsqueeze(0)).bool()
        
        with torch.no_grad():
            tgt_emb = model.decoder_embedding(tgt)
            tgt_emb = model.positional_encoding(tgt_emb)
            dec_output = tgt_emb
            for layer in model.decoder_layers:
                dec_output = layer(dec_output, enc_output, src_mask.to(device), tgt_mask)
            
            logits = model.fc_out(dec_output[:, -1, :]) / temperature
            probs = F.softmax(logits, dim=-1)
            
        next_token = torch.multinomial(probs, 1).item()
        
        if next_token == tgt_vocab.get('<eos>', 2):
            break
            
        tgt_ids.append(next_token)
    
    # Конвертация в текст
    idx_to_word = {v:k for k,v in tgt_vocab.items()}
    return ' '.join([idx_to_word.get(idx, '<unk>') for idx in tgt_ids[1:]])

In [None]:
# Пример использования
test_sentence = "Hello, how are you?"
print("Input:", test_sentence)
print("Translation:", translate(test_sentence, model, dataset.src_vocab, dataset.tgt_vocab, device))

In [None]:
test_tokens = dataset.sentence_to_ids("Hello, how are you?", dataset.src_vocab)
print("Tokenized test sentence:", test_tokens)

In [None]:
# Пример использования
test_sentence = "Top-down economics never works, said Obama."
print("Input:", test_sentence)
print("Translation:", translate(test_sentence, model, dataset.src_vocab, dataset.tgt_vocab, device))