<a href="https://colab.research.google.com/github/lonsst/ML_practice/blob/main/RNN_Generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
import nltk

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from nltk.tokenize import word_tokenize, sent_tokenize
from sklearn.preprocessing import LabelEncoder
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import f1_score
from sklearn.metrics import precision_recall_fscore_support
import numpy as np
from tqdm import tqdm

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
cd drive/MyDrive/datasets

/content/drive/MyDrive/datasets


## 1. Генерирование русских имен при помощи RNN

Датасет: https://disk.yandex.ru/i/2yt18jHUgVEoIw

1.1 На основе файла name_rus.txt создайте датасет.
  * Учтите, что имена могут иметь различную длину
  * Добавьте 4 специальных токена:
    * `<PAD>` для дополнения последовательности до нужной длины;
    * `<UNK>` для корректной обработки ранее не встречавшихся токенов;
    * `<SOS>` для обозначения начала последовательности;
    * `<EOS>` для обозначения конца последовательности.
  * Преобразовывайте строку в последовательность индексов с учетом следующих замечаний:
    * в начало последовательности добавьте токен `<SOS>`;
    * в конец последовательности добавьте токен `<EOS>` и, при необходимости, несколько токенов `<PAD>`;
  * `Dataset.__get_item__` возращает две последовательности: последовательность для обучения и правильный ответ.
  
  Пример:
  ```
  s = 'The cat sat on the mat'
  # преобразуем в индексы
  s_idx = [2, 5, 1, 2, 8, 4, 7, 3, 0, 0]
  # получаем x и y (__getitem__)
  x = [2, 5, 1, 2, 8, 4, 7, 3, 0]
  y = [5, 1, 2, 8, 4, 7, 3, 0, 0]
  ```

1.2 Создайте и обучите модель для генерации фамилии.

  * Для преобразования последовательности индексов в последовательность векторов используйте `nn.Embedding`;
  * Используйте рекуррентные слои;
  * Задача ставится как предсказание следующего токена в каждом примере из пакета для каждого момента времени. Т.е. в данный момент времени по текущей подстроке предсказывает следующий символ для данной строки (задача классификации);
  * Примерная схема реализации метода `forward`:
  ```
    input_X: [batch_size x seq_len] -> nn.Embedding -> emb_X: [batch_size x seq_len x embedding_size]
    emb_X: [batch_size x seq_len x embedding_size] -> nn.RNN -> output: [batch_size x seq_len x hidden_size]
    output: [batch_size x seq_len x hidden_size] -> torch.Tensor.reshape -> output: [batch_size * seq_len x hidden_size]
    output: [batch_size * seq_len x hidden_size] -> nn.Linear -> output: [batch_size * seq_len x vocab_size]
  ```

1.3 Напишите функцию, которая генерирует фамилию при помощи обученной модели:
  * Построение начинается с последовательности единичной длины, состоящей из индекса токена `<SOS>`;
  * Начальное скрытое состояние RNN `h_t = None`;
  * В результате прогона последнего токена из построенной последовательности через модель получаете новое скрытое состояние `h_t` и распределение над всеми токенами из словаря;
  * Выбираете 1 токен пропорционально вероятности и добавляете его в последовательность (можно воспользоваться `torch.multinomial`);
  * Повторяете эти действия до тех пор, пока не сгенерирован токен `<EOS>` или не превышена максимальная длина последовательности.

При обучении каждые `k` эпох генерируйте несколько фамилий и выводите их на экран.

In [None]:
data = pd.read_csv('./name_rus.txt', encoding='cp1251', header=None, names=['surname'])
print(data.head())

      surname
0     авдокея
1     авдоким
2      авдоня
3    авдотька
4  авдотьюшка


In [None]:
class Vocab():
    def __init__(self, _data):
        self.max_surname_len = _data.surname.str.len().max()
        self.token_to_id = {}
        self.id_to_token = {}
        self.tech = ['<PAD>', '<SOS>', '<EOS>', '<UNK>']
        self.build_vocab(list('абвгдеёжзийклмнопрстуфхцчшщъыьэюя'))
        self.vocab_size = len(self.token_to_id)

    def build_vocab(self, letters):
        self.token_to_id = {token: idx for idx, token in enumerate(letters + self.tech)}
        self.id_to_token = {idx: token for token, idx in self.token_to_id.items()}

    def __len__(self):
        return self.vocab_size

    def __getitem__(self, token):
        return self.token_to_id[token]

    def __contains__(self, token):
        return token in self.token_to_id

    def to_tokens(self, ids):
        ids_sub = []
        for idx in ids:
            if self.id_to_token[int(idx)] not in self.tech:
                ids_sub.append(self.id_to_token[int(idx)])

        return ''.join(ids_sub)

    def to_ids(self, tokens):
        out = [self.token_to_id['<SOS>']] + [self.token_to_id['<PAD>']] * self.max_surname_len
        for i, token in enumerate(tokens, 1):
            if token not in self.token_to_id:
                out[i] = self.token_to_id['<UNK>']
            else:
                out[i] = self.token_to_id[token]
        out.append(self.token_to_id['<EOS>'])
        return out

In [None]:
def collate_fn(batch):
    x_batch, y_batch = zip(*batch)
    x_padded = pad_sequence(x_batch, padding_value=0, batch_first=True)
    y_padded = pad_sequence(y_batch, padding_value=0, batch_first=True)
    return x_padded, y_padded

In [None]:
vocab = Vocab(data)

In [None]:
class SurnameDataset(Dataset):
    def __init__(self, _data, _vocab):
        self.data = _data
        self.vocab = _vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        _x = self.vocab.to_ids(self.data.surname.iloc[idx])
        return torch.tensor(_x[:-1]), torch.tensor(_x[1:])

In [None]:
dataset = SurnameDataset(data, vocab)

In [None]:
class RNNModel(nn.Module):
    def __init__(self, _vocab: Vocab, embedding_size, hidden_size):
        super(RNNModel, self).__init__()
        self.embedding = nn.Embedding(_vocab.vocab_size, embedding_size)
        self.rnn = nn.RNN(embedding_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, 1024)
        self.linear2 = nn.Linear(1024, vocab_size)
        self.f = torch.nn.ReLU()
        self.dropout = torch.nn.Dropout(0.25)
        self.vocab = _vocab

    def forward(self, x, h=None):
        x = self.embedding(x)
        x, h = self.rnn(x, h)
        x = self.dropout(self.linear(x))
        x = self.linear2(self.f(x))
        return x, h

In [None]:
vocab_size = len(dataset.vocab)
vocab_size

37

In [None]:
#инициализация гиперпараметров
embedding_size = 200
hidden_size = 128
learning_rate = 0.001
batch_size = 64
num_epochs = 10

In [None]:
model = RNNModel(vocab, embedding_size, hidden_size)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

In [None]:
dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)

In [None]:
def sample_next(model, x, prev_state, topk=5, uniform=True):
    out, state = model(x, prev_state)
    last_out = out[0, -1, :]
    topk = topk if topk else last_out.shape[0]
    top_logit, top_ix = torch.topk(last_out, k=topk, dim=-1)
    p = None if uniform else torch.nn.functional.softmax(top_logit.detach(), dim=-1).numpy()
    sampled_ix = np.random.choice(top_ix, p=p)
    return sampled_ix, state


def sample(model, start_letters, topk=5, uniform=False, max_seqlen=15, stop_on=None):
    model.eval()
    with torch.no_grad():
        sampled_ix_list = start_letters[:]
        x = torch.tensor([start_letters])

        prev_state = None
        for t in range(max_seqlen - len(start_letters)):
            sampled_ix, prev_state = sample_next(model, x, prev_state, topk, uniform)

            sampled_ix_list.append(sampled_ix)
            x = torch.tensor([[sampled_ix]])

            if sampled_ix == stop_on:
                break

    model.train()
    return sampled_ix_list


vocab.to_tokens(sample(model, [0], stop_on=vocab.token_to_id['<EOS>']))

'ая'

In [None]:
def train(_model: torch.nn.Module, epochs=100):
    optimizer = torch.optim.Adam(_model.parameters())
    loss_fn = torch.nn.CrossEntropyLoss()
    loss_log = []
    accuracy_log = []
    precision_log = []
    recall_log = []
    f1_log = []

    loader = DataLoader(dataset, batch_size=512)

    for i in range(epochs):
        epoch_loss = 0
        correct_predictions = 0
        total_predictions = 0

        _model.train()

        for j, (batch_x, batch_y) in enumerate(loader):
            y_pred = _model(batch_x)


            y_pred_flat = y_pred[0].reshape(-1, vocab.vocab_size)
            batch_y_flat = batch_y.reshape(-1)

            running_loss = loss_fn(y_pred_flat, batch_y_flat)
            epoch_loss += running_loss.item()

            # accuracy
            _, predicted_flat = torch.max(y_pred_flat, 1)
            correct_predictions += (predicted_flat == batch_y_flat).sum().item()
            total_predictions += batch_y_flat.size(0)

            running_loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        accuracy = correct_predictions / total_predictions

        _model.eval()
        epoch_loss /= j

        if i % 25 == 0:
            print(f'EPOCH: {i + 1:3d} \t LOSS: {epoch_loss:0.4f} \t ACCURACY: {accuracy:0.4f}')

            eos = vocab.token_to_id['<EOS>']
            start = vocab.to_ids('викт')[1:5]
            samples = [vocab.to_tokens(sample(model, start, stop_on=eos)) for _ in range(3)]
            print('Викт ---> ', *samples, sep=' | ')

        loss_log.append(epoch_loss)
        accuracy_log.append(accuracy)

    return _model, loss_log, accuracy_log

model, loss_log, accuracy_log = train(model, epochs=76)

EPOCH:   1 	 LOSS: 1.7096 	 ACCURACY: 0.6611 	 F1: 0.0000
Викт --->  | виктана | виктан | виктан
EPOCH:  26 	 LOSS: 1.1784 	 ACCURACY: 0.7153 	 F1: 0.0000
Викт --->  | викта | виктана | виктаныч
EPOCH:  51 	 LOSS: 1.1105 	 ACCURACY: 0.7290 	 F1: 0.0000
Викт --->  | виктодя | викташа | виктиниан
EPOCH:  76 	 LOSS: 1.0507 	 ACCURACY: 0.7418 	 F1: 0.0000
Викт --->  | виктюша | виктана | викташа


In [None]:
eos = vocab.token_to_id['<EOS>']
start = vocab.to_ids('викт')[1:5]
samples = [vocab.to_tokens(sample(model, start, stop_on=eos)) for _ in range(10)]
print('Викт ---> ', *samples, sep=' | ')

Викт --->  | виктон | виктимыч | викта | викта | виктаня | виктиана | виктана | виктон | виктана | викта


## 2. Генерирование текста при помощи RNN

2.1 Скачайте из интернета какое-нибудь художественное произведение
  * Выбирайте достаточно крупное произведение, чтобы модель лучше обучалась;

2.2 На основе выбранного произведения создайте датасет.

Отличия от задачи 1:
  * Токены <SOS>, `<EOS>` и `<UNK>` можно не добавлять;
  * При создании датасета текст необходимо предварительно разбить на части. Выберите желаемую длину последовательности `seq_len` и разбейте текст на построки длины `seq_len` (можно без перекрытия, можно с небольшим перекрытием).

2.3 Создайте и обучите модель для генерации текста
  * Задача ставится точно так же как в 1.2;
  * При необходимости можете применить:
    * двухуровневые рекуррентные слои (`num_layers`=2)
    * [обрезку градиентов](https://pytorch.org/docs/stable/generated/torch.nn.utils.clip_grad_norm_.html)

2.4 Напишите функцию, которая генерирует фрагмент текста при помощи обученной модели
  * Процесс генерации начинается с небольшого фрагмента текста `prime`, выбранного вами (1-2 слова)
  * Сначала вы пропускаете через модель токены из `prime` и генерируете на их основе скрытое состояние рекуррентного слоя `h_t`;
  * После этого вы генерируете строку нужной длины аналогично 1.3


In [None]:
import requests
import nltk
from nltk.tokenize import sent_tokenize
from __future__ import print_function
from keras.callbacks import LambdaCallback
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.optimizers import RMSprop
import numpy as np
import random
import sys
import io
import numpy as np

In [None]:
path = './onegin.txt'
text = open(path, encoding='cp1251').read().lower()

sentences = sent_tokenize(text)
num_sentences = len(sentences)

unique_characters = sorted(list(set(text)))
num_unique_characters = len(unique_characters)

char_indices = dict((c, i) for i, c in enumerate(unique_characters))
indices_char = dict((i, c) for i, c in enumerate(unique_characters))

maxlen = 40
step = 3
sentences = []
next_chars = []

for i in range(0, len(text) - maxlen, step):
    sentences.append(text[i: i + maxlen])
    next_chars.append(text[i + maxlen])
print('nb sequences:', len(sentences))

print('Vectorization...')
x = np.zeros((len(sentences), maxlen, len(unique_characters)), dtype=np.bool)
y = np.zeros((len(sentences), len(unique_characters)), dtype=np.bool)
for i, (sentence, next_char) in enumerate(zip(sentences, next_chars)):
    for t, char in enumerate(sentence):
        x[i, t, char_indices[char]] = 1
    y[i, char_indices[next_char]] = 1


nb sequences: 61853
Vectorization...


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  x = np.zeros((len(sentences), maxlen, len(unique_characters)), dtype=np.bool)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  y = np.zeros((len(sentences), len(unique_characters)), dtype=np.bool)


In [None]:
def sample(preds, temperature=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

In [None]:
def generate_text(model, seed_text, length, temperature):
    generated_text = seed_text
    for _ in range(length):
        x_pred = np.zeros((1, maxlen, len(unique_characters)))
        for t, char in enumerate(seed_text):
            x_pred[0, t, char_indices[char]] = 1.0
        preds = model.predict(x_pred, verbose=0)[0]
        next_index = sample(preds, temperature)
        next_char = indices_char[next_index]
        generated_text += next_char
        seed_text = seed_text[1:] + next_char
    return generated_text

In [None]:
def on_epoch_end(epoch, _):
    print()
    print(f'\n----- Эпоха {epoch + 1} завершена. Генерируем текст:')

    start_index = random.randint(0, len(text) - maxlen - 1)
    seed_text = text[start_index: start_index + maxlen]

    for temperature in [0.2, 0.5, 1.0]:
        print('----- Temperature:', temperature)
        generated_text = generate_text(model, seed_text, 400, temperature)
        print(generated_text)

In [None]:
model = Sequential()
model.add(LSTM(128, input_shape=(maxlen, len(unique_characters))))
model.add(Dense(len(unique_characters), activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam')

In [None]:
model.fit(x, y, batch_size=128, epochs=10, callbacks=[LambdaCallback(on_epoch_end=on_epoch_end)])

Epoch 1/10

----- Эпоха 1 завершена. Генерируем текст:
----- Temperature: 0.2
ниц, легких вдохновений,
		незрелых и уво но о но но она но со о то де ва но  да но а со рононо в ста  о не оне ма но со о нодо но со та о во до со дото но нос ово но е сто со тона но се ме со водо ва сто сет до сте сте но во сто да ста те о о оо с се но  воле то со не во во ео пола  ост те со ода уо се о е се то о оо о та во носто со но тет со де не со се во са но со о на но то со на се нот о но нето сто во но вот со о но са са со но с
----- Temperature: 0.5
ниц, легких вдохновений,
		незрелых и ув дь олтри онати оно зно до аолатьсй а таноя ма то,
		одкдео ли хави оне стр сала	ларадотасосй  саоу враео о р
 осналол та света патаи де о ооврне н ван 	ыть нор таво т: до ко е жа вси су де ео пол де совара ны да дели 	ооно тла даво л ва сто о лме м тоне  ла со то са волго пе мей ко  толелочид  не. нос со ны вод то
.
		сту стой гро у зей бем зрета си не о оа; , сстом стода нее сива да детра лес оо
----- Temperature

KeyboardInterrupt: ignored

вариант на pytorch (работает, но с переменным успехом....)

In [None]:
with open('./onegin.txt', encoding='cp1251') as file:
    text = file.read()

text = nltk.word_tokenize(re.sub(r'[^A-Z]', '', text.lower(), -1), 'russian')
text[:3]

['александр', 'сергеевич', 'пушкин']

In [None]:
class TextDataset(Dataset):

    def __init__(self, text, seq_len):
        self.text = text
        self.seq_len = seq_len
        self.tokens = list(set(text)) + [' ', '']
        self.token_to_id = {token: idx for idx, token in enumerate(self.tokens)}
        self.id_to_token = {idx: token for idx, token in enumerate(self.tokens)}
        self.num_tokens = len(self.tokens)

    def __len__(self):
        return len(self.text) // self.seq_len

    def __getitem__(self, idx):
        start_idx = idx * self.seq_len
        end_idx = start_idx + self.seq_len + 1
        text_str = self.text[start_idx:end_idx]
        text_encoded = [self.token_to_id[token] for token in text_str]
        x = torch.tensor(text_encoded[:-1])
        y = torch.tensor(text_encoded[1:])
        return x, y

    def decode(self, text_encoded):
        out = ''
        for idx in text_encoded:
            if int(idx) in self.id_to_token.keys():
                out += self.id_to_token[int(idx)]

            out += ' '

        return out

    def encode(self, text):
        out = []
        for token in text:
            if token in self.token_to_id:
                out.append(self.token_to_id[token])
            else:
                out.append(self.token_to_id[' '])
        return torch.tensor(out)

    def collate_fn(self, batch):
        x = torch.stack([item[0] for item in batch])
        y = torch.stack([item[1] for item in batch])
        return x, y

In [None]:
class RNN(torch.nn.Module):

    def __init__(self, num_tokens, emb_size, num_layers=1, dropout=0.5):
        super().__init__()
        self.emb = torch.nn.Embedding(num_tokens, emb_size)
        self.rnn = torch.nn.LSTM(emb_size, 256, num_layers=num_layers, dropout=dropout)
        self.h_l = torch.nn.Linear(256, num_tokens)

    def forward(self, x):
        x = self.emb(x)
        x, _ = self.rnn(x)
        x = self.h_l(x)
        return x

In [None]:
def generate_sample(model, dataset, prime_str=' ', sample_len=100):
    model.eval()
    with torch.no_grad():
        x = dataset.encode(prime_str)
        x = x[None, :].to(next(model.parameters()).device)
        for _ in range(sample_len):
            logits = model(x)
            p_next = torch.nn.functional.softmax(logits[:, -1], dim=-1)
            next_token = torch.multinomial(p_next, num_samples=1)
            x = torch.cat([x, next_token], dim=1)
        return dataset.decode(x[0].cpu())

In [None]:
def train(model, dataset, num_epochs, batch_size, lr=0.001, grad_clip=5, device='cpu'):

    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()
    loader = DataLoader(dataset, batch_size=batch_size)

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0

        for x, y in tqdm(loader, leave=False):
            optimizer.zero_grad()
            logits = model(x)
            loss = criterion(logits.transpose(1, 2), y)
            loss.backward()
            epoch_loss += loss.item()

            torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            optimizer.step()

        if epoch % 5 == 0:
            print(f'Epoch: {epoch + 1}, Loss: {epoch_loss:.4f}')
            print(generate_sample(model, dataset, sample_len=dataset.seq_len))

In [None]:
dataset = TextDataset(text, seq_len=15)
model = RNN(num_tokens=dataset.num_tokens, emb_size=256, num_layers=3, dropout=0.25)
train(model, dataset, num_epochs=40, batch_size=512)



Epoch: 1, Loss: 45.8066
  вздохнула средь шутивший излить наполеоны бездыханна трепетный мгновенной страшные огромный слушай изменила летой xxvii роптать 




Epoch: 6, Loss: 36.0707
  прелестям озирают вралем воспоминаньем тайны дне зала звуки кольнем роскоши виясь v. creux холодна читал… 




Epoch: 11, Loss: 35.8351
  x сне долгое я… объясненье прочь часовые туз привезено разостлан считаясь рождающийся простертой веселым злодейский 




Epoch: 16, Loss: 35.8155
  порядком тяжко рядов дальней капусту близ мнимый ветреный пятую заставить между строгом повеяла нам принужденья 




Epoch: 21, Loss: 35.8113
  прочили полились одною удивлен уверен охлажденного бесконечный виновнее впечатленье крестясь избушкой пади стеклах уголь серебре 




Epoch: 26, Loss: 35.8079
  бьет милую ума описывать ломбер > равно потреплет подслушать уж русские философических семья важны byron 




Epoch: 31, Loss: 35.8052
  пропал златая семейственной шикать согласитесь 2 аристократов чадаев морем чернеет ………………………… почетный барин ждала обнажают 




Epoch: 36, Loss: 35.8017
  ходит поставлен померкшими вспыхнет муравьев снаружи серенькие рассеян памяти решился вспомнил следствием поэмы уважение густой 


