In [1]:
from io import open
import unicodedata
import string
import re
import random

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

import time
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
SOS_token = 0
EOS_token = 1

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS"}
        self.n_words = 2  # Count SOS and EOS

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?а-яА-Я]+", r" ", s)
    return s

In [18]:
lines = open('rus.txt', encoding='utf-8').read().strip().split('\n')
pairs = [[normalizeString(s) for s in l.split('\t')[:2]] for l in lines]

MAX_LENGTH = 10

eng_prefixes = (
    "i am ", "i m ",
    "he is", "he s ",
    "she is", "she s",
    "you are", "you re ",
    "we are", "we re ",
    "they are", "they re "
)

In [19]:
def filterPair(p):
    return len(p[0].split(' ')) < MAX_LENGTH and \
           len(p[1].split(' ')) < MAX_LENGTH and \
           p[1].startswith(eng_prefixes)

def filterPairs(pairs):
    return [pair for pair in pairs if filterPair(pair)]

def prepareData(lang1, lang2, reverse=False):
    input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    return input_lang, output_lang, pairs

def readLangs(lang1, lang2, reverse=False):
    pairs = [[normalizeString(s) for s in l.split('\t')[:2]] for l in lines]
    if reverse:
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang(lang2)
        output_lang = Lang(lang1)
    else:
        input_lang = Lang(lang1)
        output_lang = Lang(lang2)
    return input_lang, output_lang, pairs

input_lang, output_lang, pairs = prepareData('eng', 'rus', True)

class EncoderLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1):
        super(EncoderLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers)

    def forward(self, input, hidden, cell):
        embedded = self.embedding(input).view(1, 1, -1)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        return output, hidden, cell

    def initHiddenCell(self):
        hidden = torch.zeros(self.num_layers, 1, self.hidden_size, device=device)
        cell = torch.zeros(self.num_layers, 1, self.hidden_size, device=device)
        return hidden, cell

class DecoderLSTM(nn.Module):
    def __init__(self, hidden_size, output_size, num_layers=1):
        super(DecoderLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden, cell):
        output = self.embedding(input).view(1, 1, -1)
        output = F.relu(output)
        output, (hidden, cell) = self.lstm(output, (hidden, cell))
        output = self.softmax(self.out(output[0]))
        return output, hidden, cell

    def initHiddenCell(self):
        hidden = torch.zeros(self.num_layers, 1, self.hidden_size, device=device)
        cell = torch.zeros(self.num_layers, 1, self.hidden_size, device=device)
        return hidden, cell

def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorsFromPair(pair):
    input_tensor = tensorFromSentence(input_lang, pair[0])
    target_tensor = tensorFromSentence(output_lang, pair[1])
    return input_tensor, target_tensor

In [20]:
teacher_forcing_ratio = 0.5

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
    encoder_hidden, encoder_cell = encoder.initHiddenCell()
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)
    loss = 0
    for ei in range(input_length):
        encoder_output, encoder_hidden, encoder_cell = encoder(input_tensor[ei], encoder_hidden, encoder_cell)
    decoder_input = torch.tensor([[SOS_token]], device=device)
    decoder_hidden, decoder_cell = encoder_hidden, encoder_cell
    use_teacher_forcing = random.random() < teacher_forcing_ratio
    if use_teacher_forcing:
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]
    else:
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()
            loss += criterion(decoder_output, target_tensor[di])
            if decoder_input.item() == EOS_token:
                break
    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()
    return loss.item() / target_length

def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentence)
        input_length = input_tensor.size()[0]
        encoder_hidden, encoder_cell = encoder.initHiddenCell()
        for ei in range(input_length):
            encoder_output, encoder_hidden, encoder_cell = encoder(input_tensor[ei], encoder_hidden, encoder_cell)
        decoder_input = torch.tensor([[SOS_token]], device=device)
        decoder_hidden, decoder_cell = encoder_hidden, encoder_cell
        decoded_words = []
        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(output_lang.index2word[topi.item()])
            decoder_input = topi.squeeze().detach()
        return decoded_words

def trainIters(encoder, decoder, n_iters, print_every=1000, learning_rate=0.01):
    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [tensorsFromPair(random.choice(pairs)) for _ in range(n_iters)]
    criterion = nn.NLLLoss()
    for iter in range(1, n_iters + 1):
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]
        loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)
        if iter % print_every == 0:
            print(f"Iteration {iter}: Loss = {loss}")

In [21]:
hidden_size = 256
encoder1 = EncoderLSTM(input_lang.n_words, hidden_size).to(device)
decoder1 = DecoderLSTM(hidden_size, output_lang.n_words).to(device)

In [22]:
trainIters(encoder1, decoder1, 75000, print_every=5000)

# Функция для случайной проверки модели
def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(pairs)
        print('>', pair[0])
        print('=', pair[1])
        output_words = evaluate(encoder, decoder, pair[0])
        output_sentence = ' '.join(output_words)
        print('<', output_sentence)
        print('')

# Проверим качество перевода на случайных примерах
evaluateRandomly(encoder1, decoder1)

Iteration 5000: Loss = 0.017588727176189423
Iteration 10000: Loss = 0.00445779071499904
Iteration 15000: Loss = 0.002574014477431774
Iteration 20000: Loss = 0.0012947161061068375
Iteration 25000: Loss = 0.0008875545114278794
Iteration 30000: Loss = 0.0007747888254622618
Iteration 35000: Loss = 0.0006126898806542158
Iteration 40000: Loss = 0.00042873923666775227
Iteration 45000: Loss = 0.0008884982671588659
Iteration 50000: Loss = 0.0006796061061322689
Iteration 55000: Loss = 0.0005874824710190296
Iteration 60000: Loss = 0.0002721909841056913
Iteration 65000: Loss = 0.0004858759348280728
Iteration 70000: Loss = 0.00033326169941574337
Iteration 75000: Loss = 0.0002750233979895711
> it is too expensive .
= это слишком дорого .
< это слишком дорого . <EOS>

> goodbye !
= до свидания !
< до свидания ! <EOS>

> i need help .
= мне нужна помощь .
< мне нужна помощь . <EOS>

> good night !
= спокоинои ночи !
< спокоинои ночи ! <EOS>

> this is my house .
= это мои дом .
< это мои дом . <EOS>



In [27]:
sentence = "i am learning russian ."
print(evaluate(encoder1, decoder1, sentence))


['я', 'изучаю', 'русскии', 'язык', '.', '<EOS>']
