#Генерация текстов, encoder-decoder

In [23]:
# Импорт необходимых библиотек
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

import numpy as np

In [24]:
# Монтируем ГуглДиск
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [123]:
# Параметры
batch_size = 64
epochs = 1
num_samples = 10000
data_path = '/content/drive/MyDrive/Нейронные сети/Фреймворк_PyTorch/data/fra.txt'

In [26]:
# Собираем из текстов токены

input_texts = []
target_texts = []

input_vocab = set()
output_vocab = set()

with open(data_path, 'r', encoding='utf-8') as f:
    lines = f.read().split('\n')
for line in lines[: min(num_samples, len(lines) - 1)]:
    input_text, target_text, _ = line.split('\t')
    target_text = '\t' + target_text + '\n'
    input_texts.append(input_text)
    for word in input_text.split():
        input_vocab.add(word.strip())
    target_texts.append(target_text)
    for word in target_text.split():
        output_vocab.add(word.strip())
    
input_vocab2index = {word: i+2 for i, word in enumerate(input_vocab)}
output_vocab2index = {word: i+2 for i, word in enumerate(output_vocab)}

In [124]:
# Создаем класс для датасета
class DataWrapper(Dataset):
    def __init__(self, data, target=None, transform=None):
        self.data = data
        if target is not None:
            self.target = target
        self.transform = transform
        
    def __getitem__(self, index):
        def indexesFromSentence(sentence, vocab):
            return [vocab.get(word.strip(), 0) for word in sentence.split(' ')]

        def tensorFromSentence(sentence, vocab):
            indexes = indexesFromSentence(sentence, vocab)
            indexes.append(1)
            return torch.tensor(indexes, dtype=torch.long).view(-1, 1)

        def tensorsFromSent(input_sentences, output_sentences):
            input_tensor = tensorFromSentence(input_sentences, input_vocab2index)
            target_tensor = tensorFromSentence(output_sentences, output_vocab2index)
            return (input_tensor, target_tensor)

        training_pairs = np.random.randint(0, len(self.data), size=10000)
        input_tensor, target_tensor = tensorsFromSent(self.data[training_pairs[i]], self.target[training_pairs[i]])

        if self.transform:
            input_tensor = self.transform(input_tensor)
            
        return input_tensor, target_tensor
    
    def __len__(self):
        return len(self.data)

train_dataset = DataWrapper(input_texts, target_texts)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

In [28]:
# Классы для энкодера и декодера
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size)


class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=10):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        #output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size)

In [43]:
# Функция обучения моделей
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=10):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    encoder_outputs = torch.zeros(max_length, encoder.hidden_size)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(
            input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]

    decoder_input = torch.tensor([[0]])

    decoder_hidden = encoder_hidden

    for di in range(target_length):
        decoder_output, decoder_hidden, decoder_attention = decoder(
            decoder_input, decoder_hidden, encoder_outputs)
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()  # detach from history as input

        loss += criterion(decoder_output, target_tensor[di])
        if decoder_input.item() == 1:
            break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

In [126]:
# Обучаем модели
encoder = EncoderRNN(len(input_vocab2index)+2, 30)
attn_decoder1 = AttnDecoderRNN(30, len(output_vocab2index)+2, dropout_p=0.1)

encoder_optimizer = torch.optim.SGD(encoder.parameters(), lr=0.01)
decoder_optimizer = torch.optim.SGD(attn_decoder1.parameters(), lr=0.01)

criterion = nn.NLLLoss()

print_loss_total = 0
for epoch in range(1, epochs + 1):
    print(f"Train epoch {epoch}/{epochs}")
    for i, (input_tensor, target_tensor) in enumerate(train_dataset): # torch.cat(data,dim=0)
    
      loss = train(input_tensor, target_tensor,
                   encoder,
                   attn_decoder1,
                   encoder_optimizer,
                   decoder_optimizer,
                   criterion)
      print_loss_total += loss
    
      print_loss_avg = print_loss_total / 1
      print_loss_total = 0
      print('(%d %d%%) %.4f' % (i, i / 10 * 100, print_loss_avg))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
(5001 50010%) 3.2409
(5002 50020%) 4.7234
(5003 50030%) 3.0661
(5004 50040%) 4.9515
(5005 50050%) 2.5278
(5006 50060%) 3.1245
(5007 50070%) 4.4812
(5008 50080%) 3.0445
(5009 50090%) 3.0580
(5010 50100%) 4.1429
(5011 50110%) 5.1620
(5012 50120%) 4.9060
(5013 50130%) 3.1201
(5014 50140%) 4.0552
(5015 50150%) 3.6970
(5016 50160%) 4.0435
(5017 50170%) 4.7996
(5018 50180%) 3.0013
(5019 50190%) 3.2986
(5020 50200%) 4.1828
(5021 50210%) 3.5226
(5022 50220%) 7.5893
(5023 50230%) 4.8113
(5024 50240%) 2.3107
(5025 50250%) 6.0353
(5026 50260%) 2.0328
(5027 50270%) 6.8442
(5028 50280%) 3.8371
(5029 50290%) 2.5169
(5030 50300%) 4.2481
(5031 50310%) 3.0575
(5032 50320%) 6.7789
(5033 50330%) 5.6155
(5034 50340%) 5.1508
(5035 50350%) 1.7214
(5036 50360%) 4.2447
(5037 50370%) 3.1176
(5038 50380%) 5.6007
(5039 50390%) 2.3812
(5040 50400%) 3.9580
(5041 50410%) 2.6687
(5042 50420%) 6.1837
(5043 50430%) 4.4018
(5044 50440%) 5.1251
(5045 50450