In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import numpy as np

In [4]:
def load_pairs(file_path):
    pairs = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            pair = line.strip().split('\t')
            pairs.append(pair)
    return pairs

pairs = load_pairs('pairs.txt')


In [5]:
pairs[200:205]

[['Back off.', 'Посторонитесь.'],
 ['Be a man.', 'Будь мужчиной!'],
 ['Be brave.', 'Будь храбр.'],
 ['Be brief.', 'Будь краток.'],
 ['Be quiet.', 'Тихо.']]

In [7]:
# Определение специальных токенов
PAD_TOKEN = "<PAD>"
EOS_TOKEN = "<EOS>"
SOS_TOKEN = "<SOS>"
UNK_TOKEN = "<UNK>"

# Функция токенизации предложения: приводит все символы к нижнему регистру и разбивает предложение на слова
def tokenize(sentence):
    return sentence.lower().split()

# функция построения словаря ключ-значение для любого языка
def build_vocab(pairs):
    russian_tokens = set()
    english_tokens = set()
    for rus, eng in pairs:
        russian_tokens.update(tokenize(rus))
        english_tokens.update(tokenize(eng))
    return russian_tokens, english_tokens

def create_mappings(tokens_language):
    vocab = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN] + sorted(tokens_language)
    word2idx = {word: i for i, word in enumerate(vocab)}
    idx2word = {i: word for word, i in word2idx.items()}
    return word2idx, idx2word

# Создание словарей для английских и русских предложений на основе пар
english_vocab, russian_vocab = build_vocab(pairs)

# Создание отображений с добавлением специальных токенов
eng_word2int, eng_int2word = create_mappings(english_vocab)
rus_word2int, rus_int2word = create_mappings(russian_vocab)
    

In [8]:
# Печать размеров словарей (с учетом 4 специальных токенов)
print('English vocabulary size:', len(english_vocab) + 4)
print('Russian vocabulary size:', len(russian_vocab) + 4)

# Пример использования: кодирование английского и русского предложения
eng_example = "Who are you"
rus_example = "как ты"

# Кодирование с учетом UNK_TOKEN для неизвестных слов
eng_encoded = np.array([eng_word2int.get(word, eng_word2int[UNK_TOKEN])
                        for word in tokenize(eng_example)], dtype=np.int32)

rus_encoded = np.array([rus_word2int.get(word, rus_word2int[UNK_TOKEN])
                        for word in tokenize(rus_example)], dtype=np.int32)

print('English text encoded:', eng_encoded)
print('Russian text encoded:', rus_encoded)

# Декодирование: восстановление текста из кодов
decoded_eng = ' '.join([eng_int2word[i] for i in eng_encoded])
decoded_rus = ' '.join([rus_int2word[i] for i in rus_encoded])

print('Decoded English:', decoded_eng)
print('Decoded Russian:', decoded_rus)

English vocabulary size: 34195
Russian vocabulary size: 86949
English text encoded: [33425  2292 34085]
Russian text encoded: [25873 77975]
Decoded English: who are you
Decoded Russian: как ты


In [9]:
class TranslationDataset(Dataset):
    def __init__(self, pairs, eng_word2int, rus_word2int):
        self.pairs, self.eng_word2int, self.rus_word2int = pairs, eng_word2int, rus_word2int
        
    def __len__(self):
        return len(pairs)
    
    def __getitem__(self, idx):
        eng, rus = self.pairs[idx]
        
        eng_tensor = torch.tensor([self.eng_word2int.get(word, self.eng_word2int[UNK_TOKEN])
                                   for word in tokenize(eng)]
                                 + [self.eng_word2int[EOS_TOKEN]], dtype=torch.long)
        
        rus_tensor = torch.tensor([self.rus_word2int.get(word, self.rus_word2int[UNK_TOKEN])
                                   for word in tokenize(rus)]
                                  + [self.rus_word2int[EOS_TOKEN]], dtype=torch.long)
        
        return eng_tensor, rus_tensor

In [11]:
# Функция для объединения батчей: паддинг (дополнение) предложений до одной длины в батче
def collate_fn(batch):
    eng_batch, rus_batch = zip(*batch)
    eng_batch_padded = pad_sequence(eng_batch, batch_first=True,
                                    padding_value=eng_word2int[PAD_TOKEN])
    rus_batch_padded = pad_sequence(rus_batch, batch_first=True,
                                    padding_value=rus_word2int[PAD_TOKEN])
    
    return eng_batch_padded, rus_batch_padded



# Создание экземпляра датасета и загрузчика данных
translation_dataset = TranslationDataset(pairs, eng_word2int, rus_word2int)
batch_size = 64
translation_dataloader = DataLoader(translation_dataset, batch_size=batch_size,
                                    shuffle=True, drop_last=True, collate_fn=collate_fn)

# Печать информации о количестве образцов и батчей в датасете
print("Translation samples: ", len(translation_dataset))
print("Translation batches: ", len(translation_dataloader))

Translation samples:  323711
Translation batches:  5057


In [12]:
class Encoder(nn.Module):
    def __init__(self, embedding_size, hidden_size, len_vocab, layers=1):
        super(Encoder, self).__init__()
        self.embedding_size, self.hidden_size, self.len_vocab = embedding_size,
                                                                hidden_size,
                                                                len_vocab
        self.embed = nn.Embedding(num_embeddings=len_vocab,
                                  embedding_dim=embedding_size,
                                  padding_idx=0)
        
        self.rnn = nn.RNN(input_size=embedding_size,
                          hidden_size=hidden_size,
                          num_layers=1,
                          batch_first=True)
        # (N, seq_len)
    def forward(self, x):
        #x: (64, 100)
        #x: (64, 100, 512)
        embed = self.embed(x)
        # out: (64, 100, 128)
        # hidden: (1, 64, 128)
        out, hidden = self.rnn(embed)
        return out, hidden

NameError: name 'eng_vocab_size' is not defined

In [5]:
a = torch.tensor([1, 2, 3, 4])
a

tensor([1, 2, 3, 4])

In [6]:
torch.flip(a, [0])

tensor([4, 3, 2, 1])

In [53]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        self.input_linear_layer = nn.Linear(in_features=input_size,
                                            out_features=hidden_size,
                                            bias=True)
        
        self.input_hidden_layer = nn.Linear(in_features=hidden_size,
                                            out_features=hidden_size,
                                            bias=True)
        
        self.activation = nn.Tanh()
        

    def forward(self, x: torch.Tensor, state: torch.Tensor | None = None):
        if state is None:
            state = torch.zeros((x.shape[0], self.hidden_size))
        states = [state]
        
        for i in range(x.shape[1]):
            states.append(self.activation(self.input_linear_layer(x[:, i, :]) 
                        + self.input_hidden_layer(states[-1])
                        ))
            
        return torch.stack(states[1:]).permute((1, 0, 2)), states[-1].unsqueeze(0)


my_enn = RNN(10, 32)



In [61]:
class Decoder(nn.Module):
    def __init__(self, embedding_size, hidden_size, len_vocab, layers=1):
        super(Decoder, self).__init__()
        self.embedding_size, self.hidden_size, self.len_vocab = embedding_size,
                                                                hidden_size,
                                                                len_vocab
        self.embed = nn.Embedding(num_embeddings=len_vocab,
                                  embedding_dim=embedding_size,
                                  padding_idx=0)
        
        self.rnn = nn.RNN(input_size=embedding_size,
                          hidden_size=hidden_size,
                          num_layers=1,
                          batch_first=True)
        
        self.fc = nn.Linear(in_features=hidden_size,
                            out_features=len_vocab,
                            bias=True)


    def forward(self, x, hidden):
        # x: [N, 1]
        embed = self.embed(x)
        # embed: (N, 1, embedd_size)
        
        out, hidden = self.rnn(embed)
        # out: (N, 1, hidden_size)
        # hidden: (1, N, hidden_size)
        out = self.fc(out)
        # out: (N, 1, vocab_size)
        return out, hidden
    


Почему использовать torch.flip?

В некоторых реализациях двунаправленных RNN вместо явной обработки последовательности в обратном порядке используется предварительный разворот последовательности. Это упрощает архитектуру и позволяет использовать общие механизмы обработки для обоих направлений.

При генерации перевода (декодировании) модели может быть полезно, если важная информация, находящаяся в конце исходной последовательности, становится доступной на первых шагах декодирования.

Как уже упоминалось, оригинальная архитектура Seq2Seq с развёрнутой входной последовательностью показала улучшенные результаты при переводе сложных предложений.

 Здесь x — это входной тензор с размерностями (batch_size, sequence_length). Операция torch.flip(x, [1]) переворачивает последовательность вдоль измерения 1 (длина последовательности). То есть, каждая последовательность в пакете данных будет обращена задом наперёд.

In [64]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cpu')

In [65]:
eng_vocab_size = len(eng_word2int)
rus_vocab_size = len(rus_word2int)
embed_size = 256
hidden_size = 512
num_layers = 1

encoder = Encoder(embedding_size=embed_size, hidden_size=hidden_size, len_vocab=eng_vocab_size)
decoder = Decoder(embedding_size=embed_size, hidden_size=hidden_size, len_vocab=rus_vocab_size)

In [66]:
embedding_size = 512
hidden_size = 128
rus_len_vocab = len(rus_word2int)
eng_len_vocab = len(eng_word2int)
layers = 1

encoder = Encoder(embedding_size, hidden_size, len_vocab, layers=1)
decoder = Decoder(embedding_size, hidden_size, len_vocab, layers=1)

device = "cuda" if torch.cuda.is_available() else "cpu"

def translate(encoder: nn.Module, decoder: nn.Module, eng_word2int,
              rus_int2word, sentence, seq_len=15):
    
    encoder.eval()
    encoder.eval()
    
    with torch.no_grad():
        
        sentence_input = [eng_word2int.get(word, eng_word2int["<UNK>"])
                          for word in tokenize(sentence) + [eng_word2int["<EOS>"]]]
        
        sentence_tensor = torch.tensor(sentence_input, dtype=torch.long, device=device)
        sentence_tensor = sentence_tensor.view(1, -1)
        
        _, hidden_encoder = encoder(sentence_tensor)
        hidden_decoder = hidden_encoder
        
        last_token = torch.full((1, 1), fill_value=eng_word2int["<SOS>"])
        
        predict_words = []
        
        for t in range(seq_len):
            logits, hidden_decoder = decoder(last_token, hidden_decoder)
            
            next_token = logits.argmax(dim=2)
            last_token = next_token
            
            if next_token.item() == rus_word2int["<EOS>"]:
                break
            
            predict_words.append(rus_int2word[next_token.item()])
        
        return " ".join(predict_words)
        
text = "Hello ml"
translate(encoder,
              decoder,
              eng_word2int,
              rus_int2word,
              text,
              seq_len=15)



'пополам микроскоп. обыкновение опавший принцессу ксерокопии. зданием. христа подъём постели, объединимся курсе. аккуратный занятая могут.'

```
Для обучения нам нужны:
- Оптимизатор
- Функция потерь
- Сама модель
- Данные / батчи

Цикл обучения:

   • Для каждой эпохи (полного прохода по обучающим данным):

     1. Установка модели в режим обучения. model.train()

     2. Инициализация переменной для отслеживания потерь. epoch_loss = 0

     3. Для каждого пакета данных: (для каждого батча)

        • Обнуление градиентов оптимизатора. zero_grad()

        • Прямой проход: получение предсказаний модели на основе входных данных. model(data)

        • Вычисление функции потерь, сравнивая предсказания с истинными значениями. crossentropyloss

        • Обратный проход: вычисление градиентов. backward()

        • Обновление параметров модели с помощью оптимизатора. optimizer.step()

        • Накопление значений потерь для мониторинга.

   • Вывод средней потери за эпоху для отслеживания прогресса.
```

In [67]:
eng_word2int["<PAD>"]

0

In [74]:
from tqdm import tqdm

eng_vocab_size = len(eng_word2int)
rus_vocab_size = len(rus_word2int)
embed_size = 256
hidden_size = 512
num_layers = 1

encoder = Encoder(embedding_size=embed_size, hidden_size=hidden_size, len_vocab=eng_vocab_size)
decoder = Decoder(embedding_size=embed_size, hidden_size=hidden_size, len_vocab=rus_vocab_size)

optimizer_encoder = torch.optim.AdamW(encoder.parameters())
optimizer_decoder = torch.optim.AdamW(encoder.parameters())

criterion = nn.CrossEntropyLoss(ignore_index=0)

epochs = 2

def train_loop(encoder, decoder, 
              optimizer_encoder, optimizer_decoder,
              criterion, epochs,
              train_dataloader,
              eng_word2int, rus_int2word):
    
    for epoch in range(epochs):
        encoder.train()
        decoder.train()
        
        for i, (input_tensor, target_tensor) in enumerate(tqdm(train_dataloader,
                                                               desc=f"Epoch {epoch}/{epochs}")):
            loss = 0.0
            input_tensor, target_tensor = input_tensor.to(device), target_tensor.to(device)
            
            optimizer_encoder.zero_grad()
            optimizer_decoder.zero_grad()
            
            _, hidden_encoder = encoder(input_tensor)
            hidden_decoder = hidden_encoder
            
            batch_size = input_tensor.size(0)
            target_length = target_tensor.size(1)
            
            decoder_input = torch.full((batch_size, 1),
                                       fill_value=eng_word2int["<SOS>"],
                                       dtype=torch.long)
            
            for t in range(target_length):
                logits, hidden_decoder = decoder(decoder_input, hidden_decoder)
                
                logits = logits.view(-1, len(rus_int2word))
                
                loss += criterion(logits, target_tensor[:, t])
                
                decoder_input = target_tensor[:, t].view(batch_size, 1)
                
            loss.backward()
            
            optimizer_decoder.step()
            optimizer_encoder.step()
            
            if i % 100 == 0:
                print(f"Epoch {epoch}, Batch {i}, Loss: {loss.item() / target_length}")
                

train_loop(encoder,
              decoder, 
              optimizer_encoder,
              optimizer_decoder,
              criterion,
              epochs,
              translation_dataloader,
              eng_word2int,
              rus_int2word)
        

Epoch 0/2:   0%|                                                                    | 1/5057 [00:02<2:51:07,  2.03s/it]

Epoch 0, Batch 0, Loss: 11.39383316040039


Epoch 0/2:   0%|▏                                                                  | 17/5057 [00:37<3:06:25,  2.22s/it]


KeyboardInterrupt: 

In [75]:
text = "Hello ml"
translate(encoder,
              decoder,
              eng_word2int,
              rus_int2word,
              text,
              seq_len=15)

'возобновилась жирной конфиденциальная стоимости. дико, мерцают? британцы, обеспеченный чрезмерной обедал. меняется, приглянулась 1939 подвергли написании'