### download data

In [6]:
!wget http://www.manythings.org/anki/rus-eng.zip -O 'data/rus-eng.zip'
!wget https://object.pouta.csc.fi/OPUS-OpenSubtitles/v1/moses/en-ru.txt.zip -O 'data/en-ru.txt.zip'

--2024-09-08 16:10:28--  http://www.manythings.org/anki/rus-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 16305013 (16M) [application/zip]
Saving to: ‘data/rus-eng.zip’


2024-09-08 16:10:52 (672 KB/s) - ‘data/rus-eng.zip’ saved [16305013/16305013]

--2024-09-08 16:10:52--  https://object.pouta.csc.fi/OPUS-OpenSubtitles/v1/moses/en-ru.txt.zip
Resolving object.pouta.csc.fi (object.pouta.csc.fi)... 86.50.254.18, 86.50.254.19
Connecting to object.pouta.csc.fi (object.pouta.csc.fi)|86.50.254.18|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 610036 (596K) [application/zip]
Saving to: ‘en-ru.txt.zip’


2024-09-08 16:10:52 (2.55 MB/s) - ‘en-ru.txt.zip’ saved [610036/610036]



In [64]:
import zipfile
from collections import Counter
import torch
from torch.nn.utils.rnn import pad_sequence
import re


### read data

In [65]:
dataset_anki_path = 'data/rus-eng.zip'
with zipfile.ZipFile(dataset_anki_path, 'r') as zip_ref:
    zip_ref.extractall('anki_data')
dataset_opensub_path = 'data/en-ru.txt.zip'
with zipfile.ZipFile(dataset_opensub_path, 'r') as zip_ref:
    zip_ref.extractall('opensubtitles_data')

BadZipFile: File is not a zip file

### prepare data

In [None]:
def load_anki_data(file_path, num_samples=100000):
    input_texts = []
    target_texts = []
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines[:num_samples]:
            en_text, ru_text = line.strip().split('\t')[:-1]
            input_texts.append(en_text)
            target_texts.append('\t' + ru_text + '\n') 
    return input_texts, target_texts

anki_input_texts, anki_target_texts = load_anki_data('anki_data/rus.txt')


In [66]:
def load_opensubtitles_data(file_path_ru, file_path_en, num_samples=100000):
    input_texts = []
    with open(file_path_en, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines[:num_samples]:
            en_text = line.strip()
            input_texts.append(en_text)
    target_texts = []
    with open(file_path_ru, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines[:num_samples]:
            ru_text = line.strip()
            target_texts.append('\t' + ru_text + '\n')
    return input_texts, target_texts

opensub_input_texts, opensub_target_texts = load_opensubtitles_data('opensubtitles_data/OpenSubtitles.en-ru.ru',
                                                                   'opensubtitles_data/OpenSubtitles.en-ru.en')


In [67]:
opensub_input_texts[0], opensub_target_texts[0]

('LOS ANGELES 2029 A. D.', '\t2029 год нашей эры.\n')

### tokenize data

In [68]:
def basic_english_tokenizer(text):
    return re.findall(r'\w+', text.lower())

def build_vocab(sentences, tokenizer):
    counter = Counter()
    for sentence in sentences:
        counter.update(tokenizer(sentence))
    vocab = {word: i for i, (word, _) in enumerate(counter.items(), 4)}
    vocab['<unk>'] = 0
    vocab['<pad>'] = 1
    vocab['<bos>'] = 2
    vocab['<eos>'] = 3
    return vocab

def tokenizer_sentences(sentences, vocab, tokenizer):
    return [[vocab['<bos>']] + [vocab.get(token, vocab['<unk>']) for token in tokenizer(sentence)]  + \
            [vocab['<eos>']] for sentence in sentences]

def pad_sentences(sequences, padding_value):
    return pad_sequence([torch.tensor(seq) for seq in sequences], padding_value=padding_value, batch_first=True)

tokenizer = basic_english_tokenizer

opensub_vocab_input = build_vocab(opensub_input_texts, tokenizer)
opensub_vocab_target = build_vocab(opensub_target_texts, tokenizer)
# anki_vocab_input = build_vocab(anki_input_texts, tokenizer)
# anki_vocab_target = build_vocab(anki_target_texts, tokenizer)

opensub_input_sequences = tokenize_sentences(opensub_input_texts, opensub_vocab_input, tokenizer)
opensub_target_sequences = tokenize_sentences(opensub_target_texts, opensub_vocab_target, tokenizer)

# anki_input_sequences = tokenize_sentences(anki_input_texts, anki_vocab_input, tokenizer)
# anki_target_sequences = tokenize_sentences(anki_target_texts, anki_vocab_target, tokenizer)

opensub_input_padded = pad_sentences(opensub_input_sequences, opensub_vocab_input['<pad>'])
opensub_target_padded = pad_sentences(opensub_target_sequences, opensub_vocab_input['<pad>'])

# anki_input_padded = pad_sentences(anki_input_sequences, anki_vocab_input['<pad>'])
# anki_target_padded = pad_sentences(anki_target_sequences, anki_vocab_input['<pad>'])


In [69]:
# from collections import Counter
# import torch
# from torch.nn.utils.rnn import pad_sequence
# import re

# # Простой токенизатор на основе регулярных выражений
# def basic_english_tokenizer(text):
#     return re.findall(r'\w+', text.lower())

# # Создание словаря
# def build_vocab(sentences, tokenizer):
#     counter = Counter()
#     for sentence in sentences:
#         counter.update(tokenizer(sentence))
#     vocab = {word: i for i, (word, _) in enumerate(counter.items(), 4)}
#     vocab['<unk>'] = 0
#     vocab['<pad>'] = 1
#     vocab['<bos>'] = 2
#     vocab['<eos>'] = 3
#     return vocab

# # Токенизация предложений с использованием созданного словаря
# def tokenize_sentences(sentences, vocab, tokenizer):
#     return [[vocab['<bos>']] + [vocab.get(token, vocab['<unk>']) for token in tokenizer(sentence)] + [vocab['<eos>']] for sentence in sentences]

# # Паддинг последовательностей
# def pad_sequences(sequences, padding_value):
#     return pad_sequence([torch.tensor(seq) for seq in sequences], padding_value=padding_value, batch_first=True)


# # Создание словарей для обоих наборов данных
# tokenizer = basic_english_tokenizer

# opensub_vocab_input = build_vocab(opensub_input_texts, tokenizer)
# opensub_vocab_target = build_vocab(opensub_target_texts, tokenizer)

# anki_vocab_input = build_vocab(anki_input_texts, tokenizer)
# anki_vocab_target = build_vocab(anki_target_texts, tokenizer)

# # Токенизация данных
# opensub_input_sequences = tokenize_sentences(opensub_input_texts, opensub_vocab_input, tokenizer)
# opensub_target_sequences = tokenize_sentences(opensub_target_texts, opensub_vocab_target, tokenizer)

# anki_input_sequences = tokenize_sentences(anki_input_texts, anki_vocab_input, tokenizer)
# anki_target_sequences = tokenize_sentences(anki_target_texts, anki_vocab_target, tokenizer)

# # Паддинг последовательностей
# opensub_input_padded = pad_sequences(opensub_input_sequences, opensub_vocab_input['<pad>'])
# opensub_target_padded = pad_sequences(opensub_target_sequences, opensub_vocab_target['<pad>'])

# anki_input_padded = pad_sequences(anki_input_sequences, anki_vocab_input['<pad>'])
# anki_target_padded = pad_sequences(anki_target_sequences, anki_vocab_target['<pad>'])


In [70]:
import torch
import torch.nn as nn
import torch.optim as optim

class Attention(nn.Module):
    def __init__(self, hidden_size, max_length):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.max_length = max_length
        
        # Линейные преобразования для скрытого состояния и выходов кодировщика
        self.attn = nn.Linear(hidden_size * 2, hidden_size)  # Изменено на hidden_size
        self.v = nn.Parameter(torch.rand(hidden_size))  # Изменено на hidden_size

    def forward(self, hidden, encoder_outputs):
        # hidden: (1, 1, hidden_size)
        # encoder_outputs: (input_length, hidden_size)

        timestep = encoder_outputs.size(0)  # input_length

        # Повторяем скрытое состояние для каждого шага во времени
        hidden = hidden.repeat(timestep, 1, 1).transpose(0, 1)  # (1, timestep, hidden_size)

        # Объединяем скрытое состояние с выходами кодировщика
        encoder_outputs = encoder_outputs.unsqueeze(0)  # (1, timestep, hidden_size)

        # Вычисляем "энергии" внимания
        attn_energies = self.score(hidden, encoder_outputs)  # (1, timestep)

        # Возвращаем веса внимания
        return torch.softmax(attn_energies, dim=1)

    def score(self, hidden, encoder_outputs):
        # Объединяем скрытое состояние и выходы кодировщика
        energy = torch.tanh(self.attn(torch.cat([hidden, encoder_outputs], 2)))  # (1, timestep, hidden_size)

        # Линейная проекция
        energy = energy.transpose(2, 1)  # (1, hidden_size, timestep)
        v = self.v.unsqueeze(0).unsqueeze(0)  # (1, 1, hidden_size)

        # Скаляное произведение для получения окончательных значений внимания
        attn_energies = torch.bmm(v, energy)  # (1, 1, timestep)
        return attn_energies.squeeze(1)  # (1, timestep)

class AttentionDecoder(nn.Module):
    def __init__(self, output_size, hidden_size, max_length):
        super(AttentionDecoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.max_length = max_length
        
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)
        self.attention = Attention(hidden_size, max_length)
        self.attn_combine = nn.Linear(hidden_size * 2, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden, cell, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)  # (1, 1, hidden_size)
        
        # Применяем внимание
        attn_weights = self.attention(hidden, encoder_outputs)  # (1, input_length)
        
        # Добавляем размерность для применения bmm
        attn_weights = attn_weights.unsqueeze(1)  # (1, 1, input_length)

        # Вставляем размерность для encoder_outputs
        encoder_outputs = encoder_outputs.unsqueeze(0)  # (1, input_length, hidden_size)
        
        # Взвешиваем скрытые состояния кодировщика через bmm
        context = attn_weights.bmm(encoder_outputs)  # (1, 1, hidden_size)

        # Соединяем контекст и эмбеддинг
        output = torch.cat((embedded[0], context[0]), 1)  # (1, hidden_size * 2)
        output = self.attn_combine(output).unsqueeze(0)  # (1, 1, hidden_size)

        # Пропускаем через LSTM
        output, (hidden, cell) = self.lstm(output, (hidden, cell))
        output = self.softmax(self.out(output[0]))  # (1, output_size)
        return output, hidden, cell, attn_weights


class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)
        self.hidden_size = hidden_size

    def forward(self, input, hidden, cell):
        embedded = self.embedding(input).view(1, 1, -1)  # (1, 1, hidden_size)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        return output, (hidden, cell)

    def init_hidden(self):
        return (torch.zeros(1, 1, self.hidden_size), torch.zeros(1, 1, self.hidden_size))




def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length):
    encoder_hidden, encoder_cell = encoder.init_hidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    # Создаём encoder_outputs с длиной input_length, а не max_length
    encoder_outputs = torch.zeros(input_length, encoder.hidden_size)

    # Кодирование входного тензора
    for ei in range(input_length):
        encoder_output, (encoder_hidden, encoder_cell) = encoder(input_tensor[ei], encoder_hidden, encoder_cell)
        encoder_outputs[ei] = encoder_output[0, 0]  # Сохранение выходов кодировщика

    decoder_input = torch.tensor([[opensub_vocab_target['<bos>']]])  # Токен начала
    decoder_hidden, decoder_cell = encoder_hidden, encoder_cell

    loss = 0

    # Декодирование с вниманием
    for di in range(target_length):
        decoder_output, decoder_hidden, decoder_cell, attn_weights = decoder(
            decoder_input, decoder_hidden, decoder_cell, encoder_outputs
        )
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()

        loss += criterion(decoder_output, target_tensor[di].unsqueeze(0))
        if decoder_input.item() == opensub_vocab_target['<eos>']:
            break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length


# Параметры
hidden_size = 256
learning_rate = 0.01
n_iters = 10000
print_every = 1000
max_length = 10  # Максимальная длина последовательности

# Создание моделей
encoder = Encoder(len(opensub_vocab_input), hidden_size)
decoder = AttentionDecoder(len(opensub_vocab_target), hidden_size, max_length)

# Оптимизаторы и функция потерь
encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
criterion = nn.NLLLoss()

# Цикл обучения
for iter in range(1, n_iters + 1):
    training_pair = [opensub_input_padded[iter % len(opensub_input_padded)], opensub_target_padded[iter % len(opensub_target_padded)]]
    input_tensor = training_pair[0]
    target_tensor = training_pair[1]

    loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length)

    if iter % print_every == 0:
        print(f'Iteration {iter}, Loss: {loss:.4f}')


Iteration 1000, Loss: 0.4616
Iteration 2000, Loss: 0.2236
Iteration 3000, Loss: 0.2779
Iteration 4000, Loss: 0.3412
Iteration 5000, Loss: 0.2966
Iteration 6000, Loss: 0.4601
Iteration 7000, Loss: 0.2534
Iteration 8000, Loss: 0.2977
Iteration 9000, Loss: 0.3404
Iteration 10000, Loss: 0.1520


In [36]:
# Создаем обратный словарь для целевого языка
opensub_vocab_target_reverse = {index: word for word, index in opensub_vocab_target.items()}
opensub_vocab_input_reverse = {index: word for word, index in opensub_vocab_input.items()}

# Функция для выполнения перевода одного предложения
def translate_sentence(input_tensor, encoder, decoder, max_length=10):
    with torch.no_grad():
        encoder_hidden, encoder_cell = encoder.init_hidden()

        input_length = input_tensor.size(0)
        encoder_outputs = torch.zeros(input_length, encoder.hidden_size)

        for ei in range(input_length):
            encoder_output, (encoder_hidden, encoder_cell) = encoder(input_tensor[ei], encoder_hidden, encoder_cell)
            encoder_outputs[ei] = encoder_output[0, 0]  # Сохранение выходов кодировщика

        decoder_input = torch.tensor([[opensub_vocab_target['<bos>']]])
        decoder_hidden, decoder_cell = encoder_hidden, encoder_cell

        decoded_words = []

        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_cell, _ = decoder(
                decoder_input, decoder_hidden, decoder_cell, encoder_outputs
            )
            topv, topi = decoder_output.topk(1)
            if topi.item() == opensub_vocab_target['<eos>']:
                break
            else:
                decoded_words.append(opensub_vocab_target_reverse[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words




# Пример для предложений разной длины
sentence_lengths = [3, 6, 10]
for length in sentence_lengths:
    input_sentence = opensub_input_padded[length]  # Предложение из определенного количества слов
    translated_sentence = translate_sentence(input_sentence, encoder, decoder, max_length=length)
    print(f'Original sentence of {length} words: {" ".join([opensub_vocab_input_reverse[word.item()] for word in input_sentence])}')
    print(f'Translated sentence: {" ".join(translated_sentence)}')
    print()


Original sentence of 3 words: <bos> lt would be fought here ln our present <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>
Translated sentence: <bos> это

Original sentence of 6 words: <bos> what the hell <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pa

In [46]:
def get_word_vector_lstm_attention(word, encoder, decoder, vocab, device='cpu'):
    # Преобразование слова в тензор
    input_tensor = torch.tensor([vocab[word]]).to(device)
    
    # Получение эмбеддинга слова
    embedded = encoder.embedding(input_tensor).view(1, 1, -1)  # (1, 1, hidden_size)
    
    # Инициализация скрытых состояний
    hidden, cell = encoder.init_hidden()  # (1, 1, hidden_size)
    
    # Прямой проход через LSTM кодировщика
    encoder_output, (hidden, cell) = encoder.lstm(embedded, (hidden, cell))
    
    # Получение выходов кодировщика
    encoder_outputs = encoder_output.squeeze(0)  # (input_length, hidden_size)
    
    # Создаем пустой тензор для декодера
    decoder_input = torch.tensor([[vocab['<bos>']]]).to(device)
    
    # Убедитесь, что hidden и cell имеют размерность (num_layers * num_directions, batch_size, hidden_size)
    hidden = hidden.unsqueeze(0) if hidden.dim() == 2 else hidden
    cell = cell.unsqueeze(0) if cell.dim() == 2 else cell
    
    # Проходим через декодер для получения векторного представления с вниманием
    decoder_output, _, _, _ = decoder(
        decoder_input, hidden, cell, encoder_outputs
    )
    
    # Возвращаем последнее скрытое состояние декодера
    return decoder_output

# Пример использования
vocab = opensub_vocab_input  # Например, словарь для кодировщика
word = 'happy'

vector_lstm_attention = get_word_vector_lstm_attention(word, encoder, decoder, vocab)

print(f"LSTM with Attention vector for '{word}': {vector_lstm_attention}")


LSTM with Attention vector for 'happy': tensor([[-1.7619e+01, -8.0208e-04, -2.6871e+01,  ..., -1.7588e+01,
         -1.7121e+01, -1.7231e+01]], grad_fn=<LogSoftmaxBackward0>)
