### Генерация поэзии с помощью нейронных сетей: шаг 1
##### Автор: [Радослав Нейчев](https://www.linkedin.com/in/radoslav-neychev/), @neychev

Ваша основная задача: научиться генерироват стихи с помощью простой рекуррентной нейронной сети (Vanilla RNN). В качестве корпуса текстов для обучения будет выступать роман в стихах "Евгений Онегин" Александра Сергеевича Пушкина.

In [1]:
# do not change the code in the block below
# __________start of block__________
import string
import os
from random import sample

import numpy as np
import torch, torch.nn as nn
import torch.nn.functional as F

from IPython.display import clear_output

import matplotlib.pyplot as plt
# __________end of block__________

In [2]:
# do not change the code in the block below
# __________start of block__________
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print('{} device is available'.format(device))
# __________end of block__________

cpu device is available


#### 1. Загрузка данных.

In [3]:
# __________start of block__________
#!wget https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/onegin.txt

with open('onegin.txt', 'r', encoding='utf-8') as iofile:  # указываем кодировку utf-8
    text = iofile.readlines()
    
text = "".join([x.replace('\t\t', '').lower() for x in text])
# __________end of block__________


#### 2. Построение словаря и предобработка текста
В данном задании требуется построить языковую модель на уровне символов. Приведем весь текст к нижнему регистру и построим словарь из всех символов в доступном корпусе текстов. Также добавим токен `<sos>`.

In [4]:
# do not change the code in the block below
# __________start of block__________
tokens = sorted(set(text.lower())) + ['<sos>']
num_tokens = len(tokens)

assert num_tokens == 84, "Check the tokenization process"

token_to_idx = {x: idx for idx, x in enumerate(tokens)}
idx_to_token = {idx: x for idx, x in enumerate(tokens)}

assert len(tokens) == len(token_to_idx), "Mapping should be unique"

print("Seems fine!")


text_encoded = [token_to_idx[x] for x in text]
# __________end of block__________

Seems fine!


__Ваша задача__: обучить классическую рекуррентную нейронную сеть (Vanilla RNN) предсказывать следующий символ на полученном корпусе текстов и сгенерировать последовательность длины 100 для фиксированной начальной фразы.

Вы можете воспользоваться кодом с занятие №6 или же обратиться к следующим ссылкам:
* Замечательная статья за авторством Andrej Karpathy об использовании RNN: [link](http://karpathy.github.io/2015/05/21/rnn-effectiveness/)
* Пример char-rnn от Andrej Karpathy: [github repo](https://github.com/karpathy/char-rnn)
* Замечательный пример генерации поэзии Шекспира: [github repo](https://github.com/spro/practical-pytorch/blob/master/char-rnn-generation/char-rnn-generation.ipynb)

Данное задание является достаточно творческим. Не страшно, если поначалу оно вызывает затруднения. Последняя ссылка в списке выше может быть особенно полезна в данном случае.

Далее для вашего удобства реализована функция, которая генерирует случайный батч размера `batch_size` из строк длиной `seq_length`. Вы можете использовать его при обучении модели.

In [5]:
# do not change the code in the block below
# __________start of block__________
batch_size = 256
seq_length = 500
start_column = np.zeros((batch_size, 1), dtype=int) + token_to_idx['<sos>']

def generate_chunk():
    global text_encoded, start_column, batch_size, seq_length

    start_index = np.random.randint(0, len(text_encoded) - batch_size*seq_length - 1)
    data = np.array(text_encoded[start_index:start_index + batch_size*seq_length]).reshape((batch_size, -1))
    yield np.hstack((start_column, data))
# __________end of block__________    

Пример батча:

In [6]:
test = next(generate_chunk())

In [7]:
test.shape

(256, 501)

Далее вам предстоит написать код для обучения модели и генерации текста.

In [8]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, n_layers=1):
        super(RNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        
        self.encoder = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers, batch_first=True)
        self.decoder = nn.Linear(hidden_size, output_size)
    
    def forward(self, input, hidden):
        input = self.encoder(input)  # (batch_size, seq_length, hidden_size)
        output, hidden = self.gru(input, hidden)  # (batch_size, seq_length, hidden_size)
        output = self.decoder(output.reshape(-1, self.hidden_size))  # Преобразуем для Linear слоя
        return output, hidden

    def init_hidden(self, batch_size):
        return torch.zeros(self.n_layers, batch_size, self.hidden_size)

В качестве иллюстрации ниже доступен график значений функции потерь, построенный в ходе обучения авторской сети (сам код для ее обучения вам и предстоит написать).

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Параметры модели и обучения
input_size = len(token_to_idx)  # Размер словаря
hidden_size = 128  # Размер скрытого слоя
output_size = len(token_to_idx)  # Размер выходного слоя, равен размеру словаря
n_layers = 3  # Число GRU слоев
num_epochs = 1000  # Число эпох

# Инициализация модели, функции потерь и оптимизатора
model = RNN(input_size, hidden_size, output_size, n_layers)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Определяем RNN-модель с исправлениями


# Функция для тренировки модели
def train(model, num_epochs):
    model.train()  # Устанавливаем режим обучения

    for epoch in range(num_epochs):
        total_loss = 0
        for batch in generate_chunk():  # Получаем батч из генератора

            # Подготовка входных и целевых данных
            inputs = torch.LongTensor(batch[:, :-1])  # Все токены, кроме последнего
            targets = torch.LongTensor(batch[:, 1:]).reshape(-1)  # Все токены, начиная со второго
            
            # Инициализация скрытого состояния
            hidden = model.init_hidden(batch_size)

            # Накопление градиентов
            optimizer.zero_grad()
            
            # Прогон данных через модель
            output, hidden = model(inputs, hidden)  # Получаем предсказание и обновляем скрытое состояние
            
            # Вычисляем ошибку по всем предсказаниям сразу
            loss = criterion(output, targets)
            
            # Обратное распространение и обновление параметров
            loss.backward()
            optimizer.step()

            total_loss += loss.item()  # Учитываем среднюю ошибку за батч
        
        # Выводим информацию о потере после каждой эпохи
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss / batch_size:.4f}")

# Запускаем тренировку
train(model, num_epochs)


Epoch [1/1000], Loss: 0.0173
Epoch [2/1000], Loss: 0.0171
Epoch [3/1000], Loss: 0.0169
Epoch [4/1000], Loss: 0.0166
Epoch [5/1000], Loss: 0.0163
Epoch [6/1000], Loss: 0.0158
Epoch [7/1000], Loss: 0.0152
Epoch [8/1000], Loss: 0.0146
Epoch [9/1000], Loss: 0.0141
Epoch [10/1000], Loss: 0.0138
Epoch [11/1000], Loss: 0.0136
Epoch [12/1000], Loss: 0.0135
Epoch [13/1000], Loss: 0.0133
Epoch [14/1000], Loss: 0.0132
Epoch [15/1000], Loss: 0.0132
Epoch [16/1000], Loss: 0.0131
Epoch [17/1000], Loss: 0.0131
Epoch [18/1000], Loss: 0.0131
Epoch [19/1000], Loss: 0.0131
Epoch [20/1000], Loss: 0.0130
Epoch [21/1000], Loss: 0.0130
Epoch [22/1000], Loss: 0.0130
Epoch [23/1000], Loss: 0.0130
Epoch [24/1000], Loss: 0.0130
Epoch [25/1000], Loss: 0.0130
Epoch [26/1000], Loss: 0.0130
Epoch [27/1000], Loss: 0.0130
Epoch [28/1000], Loss: 0.0130
Epoch [29/1000], Loss: 0.0129
Epoch [30/1000], Loss: 0.0129
Epoch [31/1000], Loss: 0.0130
Epoch [32/1000], Loss: 0.0129
Epoch [33/1000], Loss: 0.0129
Epoch [34/1000], Lo

KeyboardInterrupt: 

Шаблон функции `generate_sample` также доступен ниже. Вы можете как дозаполнить его, так и написать свою собственную функцию с нуля. Не забывайте, что все примеры в обучающей выборке начинались с токена `<sos>`.

In [12]:
def generate_sample(char_rnn, seed_phrase=None, max_length=200, temperature=1.0, device=device):
    '''
    The function generates text given a phrase of length at least SEQ_LENGTH.
    :param seed_phrase: prefix characters. The RNN is asked to continue the phrase
    :param max_length: maximum output length, including seed_phrase
    :param temperature: coefficient for sampling.  higher temperature produces more chaotic outputs,
                        smaller temperature converges to the single most likely output
    '''
    
    # Начальная последовательность: токен начала строки + токены seed_phrase
    if seed_phrase is not None:
        x_sequence = [token_to_idx['<sos>']] + [token_to_idx[token] for token in seed_phrase]
    else: 
        x_sequence = [token_to_idx['<sos>']]

    x_sequence = torch.tensor([x_sequence], dtype=torch.int64).to(device)
    
    # Инициализация скрытого состояния
    hidden = char_rnn.init_hidden(batch_size=1)

    # Генерация начальной последовательности
    result = seed_phrase if seed_phrase is not None else ""
    
    # Прогон начальной последовательности через модель
    with torch.no_grad():  # Отключаем вычисление градиентов для генерации
        for i in range(len(x_sequence[0]) - 1):
            _, hidden = char_rnn(x_sequence[:, i:i+1], hidden)
    
    # Генерация следующих символов
    input_token = x_sequence[:, -1]  # Последний токен начальной последовательности
    for _ in range(max_length - len(result)):
        output, hidden = char_rnn(input_token.unsqueeze(1), hidden)
        output = output.squeeze(0) / temperature  # Применяем температуру

        # Применяем softmax для получения вероятностей и выбираем следующий токен
        probabilities = torch.softmax(output, dim=-1)
        next_token = torch.multinomial(probabilities, num_samples=1).item()

        # Проверка на токен конца строки (например, '<eos>')
        if tokens[next_token] == '<eos>':
            break

        # Добавляем предсказанный символ к результату
        result += tokens[next_token]

        # Обновляем входной токен для следующего шага
        input_token = torch.tensor([next_token], dtype=torch.int64).to(device)

    return result


Пример текста сгенерированного обученной моделью доступен ниже. Не страшно, что в тексте много несуществующих слов. Используемая модель очень проста: это простая классическая RNN.

In [13]:
print(generate_sample(model, ' мой дядя самых честных правил', max_length=500, temperature=0.8))

 мой дядя самых честных правил
не постещет улечаюцы.
тальной полпосенье вутра.



x

и к студет не веняская в тот,
стреетсе лидейства пустенье;
но как он прогосту залицы,
страстенье радетаньех поромный довилный;
есто мы может из волский,
да ли правневали мои.



xxx

и споклует их ле затам.
все меркливо синной себе,
моем их обезореты!
козора взор загреной срестки,
с из накрачали вден ручяй,



viii

с идет и стасудь порезь,
не безнас рашдет, клонной глененьих,
– там цех, разней их нем.



xvi

б


### Сдача задания
Сгенерируйте десять последовательностей длиной 500, используя строку ' мой дядя самых честных правил'. Температуру для генерации выберите самостоятельно на основании визуального качества генериуремого текста. Не забудьте удалить все технические токены в случае их наличия.

Сгенерированную последовательность сохрание в переменную `generated_phrase` и сдайте сгенерированный ниже файл в контест.

In [None]:
seed_phrase = ' мой дядя самых честных правил'

In [15]:
# your code here

# For example:

generated_phrases = [
     generate_sample(
         model,
         ' мой дядя самых честных правил',
         max_length=500,
         temperature=1.
     ).replace('<sos>', '')
     for _ in range(10)
 ]

In [16]:
# do not change the code in the block below
# __________start of block__________

import json
if 'generated_phrases' not in locals():
    raise ValueError("Please, save generated phrases to `generated_phrases` variable")

for phrase in generated_phrases:

    if not isinstance(phrase, str):
        raise ValueError("The generated phrase should be a string")

    if len(phrase) != 500:
        raise ValueError("The `generated_phrase` length should be equal to 500")

    assert all([x in set(tokens) for x in set(list(phrase))]), 'Unknown tokens detected, check your submission!'
    

submission_dict = {
    'token_to_idx': token_to_idx,
    'generated_phrases': generated_phrases
}

with open('submission_dict.json', 'w') as iofile:
    json.dump(submission_dict, iofile)
print('File saved to `submission_dict.json`')
# __________end of block__________

File saved to `submission_dict.json`


На этом задание завершено. Поздравляем!