# AI Lab: Генерация текстов/последовательностей с помощью нейронных сетей


Выполнила: Власова Светлана

Группа: М8O-306Б

Вараинт: 3

**Импорт необходимых библиотек.**

In [None]:
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Embedding, GRU, LSTM, BatchNormalization
from tensorflow.keras.layers import Dropout, Dense, SimpleRNN
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

import numpy as np
import os
import time

import re

import nltk
#nltk.download('all')

import matplotlib
from matplotlib import pyplot as plt
#from jupyterthemes import jtplot
#jupyterthemes.download(jtplot)
matplotlib.rcParams['figure.figsize'] = (15,10)
#jtplot.style('onedork')

# Создание датасета

**Работа с текстом.**

Для формирования датасета я воспользовалась сайтом https://www.gutenberg.org/.
Мой выбор пал на произведение Ф. М. Достоевского "Идиот".
Импорт текста представлен ниже.

In [None]:
book = 'idiot.txt'
path = 'https://www.gutenberg.org/files/2638/2638-0.txt'
text = open(tf.keras.utils.get_file(book, path), 'rb').read().decode(encoding='utf-8')

Взглянем на начало текста.

In [None]:
print(text[:250])

*Функционал для "чистки" текста.*

Для того, чтобы не засорять набор данных, я решила отчистить текст от ненужной информации, которая есть перед первой и последней главами - вроде биографической справки или примечаний переводчиков. Также, я посчитала нужным извлечь из текста вставки, относящиеся к иллюстрациям. Это все реализовано в виде функции *clear_text(text)*, которая представлена ниже.

In [None]:
def clear_text(text):
    text = text[text.find('PART I'):text.find('End of the Project Gutenberg')]
    ' '.join(text.split('\r\n'))
    return ''.join(text.split('[Illustration]'))

Разбиение текста по главам представлено ниже - это позволит избавить от вставок 'PART #номер_главы'.

In [None]:
def split_by_parts(text):
    parts = ""
    text = text[text.find('\n'):]
    while text[:text.find('PART ')]!= '':
        parts +=  text[:text.find('PART ')]
        text = text[text.find('PART '):]
        text = text[text.find('\n'):]
    return parts

In [None]:
text = split_by_parts(clear_text(text))
print(text[:100])

In [None]:
print ('Длина текста: ', len(text))

Посмотрим, сколько уникальных символов есть в нашем тексте. Создадим словарь.

In [None]:
vocab = sorted(set(text))
vocab_size = len(vocab)
print ('Текст содержит ', vocab_size, ' уникальных символа.')

Итак, мы приблизили текст к удобному для дальнейшего обучения виду, избавившись от "выбросов" в виде технической информации или ненужных вставок, однако, это еще не все.

Теперь **векторизуем** *текст* - так, чтобы любая последовательность могла отображаться в числовую последовательность. Для этого создадим две таблицы поиска - одна будет отображать символы в числа, а другая числа в символы.

Так, каждый уникальный символ будет иметь числовое отображение на полуинтервале $\left[0, len(vocab)\right).$

In [None]:
char_to_id = {c:i for i, c in enumerate(vocab)} #таблица поиска по символу его числового значения
id_to_char = np.array(vocab)

Векторизуем наш текст.

In [None]:
vectorized_text = np.array([char_to_id[c] for c in text])

In [None]:
print('Полученная таблица: ')
for c, count in zip(char_to_id, range(20)):
    print('  {:4s}: {:3d},'.format(repr(c), char_to_id[c]))
print('  ...\n')
    

Проиллюстрируем "процесс" векторизации текста.

In [None]:
print ('{} ---- отображение char в int ---- > {}'.format(repr(text[:15]), vectorized_text[:15]))

**Формирование датасета.**

Приведем последовательность чисел в тензорный вид с помощью функции *tf.data.Dataset.from_tensor_slices().*

In [None]:
dataset = tf.data.Dataset.from_tensor_slices(vectorized_text)

Далее, будем разбивать датасет на последовательности символов фиксированной длины **SEQ_LEN.**

In [None]:
SEQ_LEN = 150

Т.к. элементом последовательности является один символ, будем разбивать набор данных на батчи размером **SEQ_LEN + 1**, так, чтобы входная и целевая последовательности были одинаковой длины - **SEQ_LEN**, и целевая последовательность получалась из входной сдвигом на 1 символ вправо, отличалась от нее одним символом в рамках выделенного батча.

Воспользуемся *batch-*методом из *tf.data.Dataset* для формирования последовательностей нужной длины.

In [None]:
sequences = dataset.batch(SEQ_LEN + 1, drop_remainder=True)
print('Число последовательностей длины ', SEQ_LEN + 1, ': ', len(list(sequences.as_numpy_iterator())))

Взглянем на первые три последовательности в исходном виде.

In [None]:
for i in sequences.take(5):
  print(repr(''.join(id_to_char[i.numpy()])), end="")

Итак, мы имеем набор из последовательностей размером **SEQ_LEN + 1** чисел. Для обучения следует разбить этот датасет на 2 колонки:
* обучающую последовательность, состоящую из перых *SEQ_LEN* тензоров.
* тестовую последовательность, состоящую из последних *SEQ_LEN* тензоров.

In [None]:
def split_input_target(chunk):
    input_text = chunk[:-1]
    target_text = chunk[1:]
    return input_text, target_text

dataset = sequences.map(split_input_target)

Для ускорения обучения перемешаем последовательности в датасете и разобъем их на наборы по **BATCH_SIZE** элементов.

In [None]:
BATCH_SIZE = 64

In [None]:
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
dataset

Датасет готов. Теперь перейдем к следующему шагу - проектированию моделей.

# Проектирование моделей

**Полносвязная RNN-сеть**

Создадим директорию, где будут храниться[](http://) данные полносвязной RNN.

In [None]:
rnn_dir = "./full_rnn"

In [None]:
! mkdir -p full_rnn

Полносвязная RNN-сеть будет состоять из следующих слоев:
1. Embedding - входной слой. Обучаемая справочная таблица, которая отображает номера каждого символа в вектор с размерами embedding_dim;
2. SimpleRNN - полносвязный рекуррентный слой с функцией инициализации Ксавьера;
3. Dense - выходной слой сети с выходной размерностью *vocab_size*.

In [None]:
def build_rnn(vocab_size, embedding_dim, batch_size, rnn_units):
    model = Sequential([
      # 1 layer
        Embedding(vocab_size, embedding_dim,
                  batch_input_shape=[batch_size, None]),
      # 2 layer 
        SimpleRNN(rnn_units, return_sequences=True, stateful=False, 
          recurrent_initializer='glorot_uniform'), 
      # 3 layer
        Dense(vocab_size, kernel_initializer="glorot_uniform")
    ])
    return model

**Однослойная LSTM-сеть**

Создадим директорию, где будут храниться данные однослойной LSTM.

In [None]:
sing_dir = "./single_lstm"

In [None]:
! mkdir -p single_lstm

Однослойная LSTM-сеть будет состоять из следующих слоев:

1. Embedding - входной слой. Обучаемая справочная таблица, которая отображает номера каждого символа в вектор с размерами embedding_dim;
2. LSTM - слой с функцией инициализации Ксавьера;
3. Dense - выходной слой сети с выходной размерностью vocab_size.

In [None]:
def build_single_lstm(vocab_size, embedding_dim, batch_size, rnn_units):
    model = Sequential([
      # 1 layer
        Embedding(vocab_size, embedding_dim, 
                  batch_input_shape=[batch_size, None]),
      # 2 layer 
        LSTM(rnn_units, return_sequences=True, stateful=False, 
          recurrent_initializer='glorot_uniform'), 
      # 3 layer
        Dense(vocab_size, kernel_initializer="glorot_uniform")
    ])
    return model

**Двухслойная LSTM-сеть**

Создадим директорию, где будут храниться данные двухслойной LSTM.

In [None]:
doub_dir = "./double_lstm"

In [None]:
! mkdir -p double_lstm

Двухслойная LSTM-сеть будет состоять из следующих слоев:

1. Embedding - входной слой. Обучаемая справочная таблица, которая отображает номера каждого символа в вектор с размерами embedding_dim;
2. LSTM - слой с функцией инициализации Ксавьера;
3. LSTM - слой с функцией инициализации Ксавьера;
4. Dense - выходной слой сети с выходной размерностью vocab_size.

In [None]:
def build_double_lstm(vocab_size, embedding_dim, batch_size, rnn_units):
    model = Sequential([
      # 1 layer
        Embedding(vocab_size, embedding_dim, 
                  batch_input_shape=[batch_size, None]),
      # 2 layer 
        LSTM(rnn_units, return_sequences=True, stateful=False, 
          recurrent_initializer='glorot_uniform'),
      # 3 layer 
        LSTM(rnn_units, return_sequences=True, stateful=False, 
          recurrent_initializer='glorot_uniform'),
      # 4 layer
        Dense(vocab_size, kernel_initializer="glorot_uniform")
    ])
    return model

**Однослойная GRU-сеть**

Создадим директорию, где будут храниться данные однослойной GRU.

In [None]:
gru_dir = "./single_gru"

In [None]:
! mkdir -p single_gru

Однослойная GRU-сеть будет состоять из следующих слоев:

1. Embedding - входной слой. Обучаемая справочная таблица, которая отображает номера каждого символа в вектор с размерами embedding_dim;
2. GRU - слой с функцией инициализации Ксавьера;
3. Dense - выходной слой сети с выходной размерностью vocab_size.

In [None]:
def build_gru(vocab_size, embedding_dim, batch_size, rnn_units):
    model = Sequential([
      # 1 layer
        Embedding(vocab_size, embedding_dim, 
                  batch_input_shape=[batch_size, None]),
      # 2 layer 
        GRU(rnn_units, return_sequences=True, stateful=False, 
          recurrent_initializer='glorot_uniform'), 
      # 3 layer
        Dense(vocab_size, kernel_initializer="glorot_uniform")
    ])
    return model

# Обучение моделей

При обучении можно сохранять параметры обученной модели или же саму модель. В процессе обучения будем сохранять веса, если после очередной эпохи модель показывает лучший результат - минимальное значение ошибки.

In [None]:
def checkpoint_creator(checkpoint_dir = "./"):
    checkpoint_prefix = os.path.join(checkpoint_dir, "checkpoints/ckpt_model")
    checkpoint_callback = ModelCheckpoint(filepath=checkpoint_prefix,
                                          monitor="loss", 
                                          mode="min",
                                          save_best_only=True,
                                          save_weights_only=True)
    return checkpoint_callback

**Функция потерь**

In [None]:
def loss(labels, logits):
  return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

**Параметры моделей**

Зададим необходимые параметры моделей. 

К слову, такие параметры, как длина последовтаельности **SEQ_LEN** или размер батча **BATCH_SIZE,** уже учлись при формировании датасета.

In [None]:
EMB_DIM = 256
UNITS = 512
EPOCHS = 50

**Сборка моделей**

*Полносвязная RNN-сеть.*

In [None]:
full_rnn = build_rnn(vocab_size, EMB_DIM, BATCH_SIZE, UNITS)
full_rnn.summary()

*Однослойная LSTM-сеть.*

In [None]:
single_lstm = build_single_lstm(vocab_size, EMB_DIM, BATCH_SIZE, UNITS)
single_lstm.summary()

*Двухслойная LSTM-сеть.*

In [None]:
double_lstm = build_double_lstm(vocab_size, EMB_DIM, BATCH_SIZE, UNITS)
double_lstm.summary()

*Однослойная GRU-сеть.*

In [None]:
single_gru = build_gru(vocab_size, EMB_DIM, BATCH_SIZE, UNITS)
single_gru.summary()

Скомпилируем, используя стандартный оптимизатор Адама. В качестве функции потерь используем описанную выше кроссэнтропию.

In [None]:
full_rnn.compile(optimizer='adam', loss=loss)
single_lstm.compile(optimizer='adam', loss=loss)
double_lstm.compile(optimizer='adam', loss=loss)
single_gru.compile(optimizer='adam', loss=loss)

**Обучение.**

Обучим полносвязную *RNN* модель.

In [None]:
my_callbacks = [EarlyStopping(monitor="loss", patience=3), checkpoint_creator(rnn_dir)]
rnn_history = full_rnn.fit(dataset, epochs=EPOCHS, callbacks=my_callbacks)

Обучим однослойную *LSTM* модель.

In [None]:
my_callbacks = [EarlyStopping(monitor="loss", patience=3), checkpoint_creator(sing_dir)]
lstm_single_history = single_lstm.fit(dataset, epochs=EPOCHS, callbacks=my_callbacks)

Обучим двухслойную *LSTM* модель.

In [None]:
my_callbacks = [EarlyStopping(monitor="loss", patience=3), checkpoint_creator(doub_dir)]
lstm_double_history = double_lstm.fit(dataset, epochs=EPOCHS, callbacks=my_callbacks)

Обучим однослойную *GRU* модель.

In [None]:
my_callbacks = [EarlyStopping(monitor="loss", patience=3), checkpoint_creator(gru_dir)]
gru_single_history = single_gru.fit(dataset, epochs=EPOCHS, callbacks=my_callbacks)

Визуализируем процессы обучения всех четырех моделей с помощью графика зависимости величины *loss* от количества эпох.

In [None]:
plt.plot(rnn_history.history['loss'])
plt.plot(lstm_single_history.history['loss'])
plt.plot(lstm_double_history.history['loss'])
plt.plot(gru_single_history.history['loss'], color = 'green')
plt.title('Процесс обучения')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['Полносвязная RNN', 'Однослойная LSTM', 'Двухслойная LSTM', 
            'Однослойная GRU'], loc='upper right')
plt.show()

# Тестирование моделей


Мы обучили наши модели на батчах размера **BATCH_SIZE**, однако при генерации текста мы отправляем в модель одну последовательность произвольного размера. Перестроим модели для еденичного размера батча и загрузим в них оптимальные веса обученных моделей из контрольных точек.

In [None]:
checkpoint_dir = os.path.join(rnn_dir, "checkpoints/")

full_rnn = build_rnn(vocab_size, EMB_DIM, 1, UNITS)
full_rnn.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
full_rnn.build(tf.TensorShape([1, None]))
full_rnn.summary()

In [None]:
checkpoint_dir = os.path.join(sing_dir, "checkpoints/")

single_lstm = build_single_lstm(vocab_size, EMB_DIM, 1, UNITS)
single_lstm.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
single_lstm.build(tf.TensorShape([1, None]))
single_lstm.summary()

In [None]:
checkpoint_dir = os.path.join(doub_dir, "checkpoints/")

double_lstm = build_double_lstm(vocab_size, EMB_DIM, 1, UNITS)
double_lstm.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
double_lstm.build(tf.TensorShape([1, None]))
double_lstm.summary()

In [None]:
checkpoint_dir = os.path.join(gru_dir, "checkpoints/")

single_gru = build_single_gru(vocab_size, EMB_DIM, 1, UNITS)
single_gru.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
single_gru.build(tf.TensorShape([1, None]))
single_gru.summary()

**Сохранение моделей**

In [None]:
filename = os.path.join(rnn_dir, "full_rnn.h5")
full_rnn.save(filename)

In [None]:
filename = os.path.join(sing_dir, "single_lstm.h5")
single_lstm.save(filename)

In [None]:
filename = os.path.join(doub_dir, "double_lstm.h5")
double_lstm.save(filename)

In [None]:
filename = os.path.join(gru_dir, "gru_dir.h5")
single_gru.save(filename)

**Генерация текста**

Функция генерации текста работает следующим образом:

* начальная строка векторизуется, инициализируется состояние обученой модели и устанавливается длина сгенерированной выходной последовательности;

* прогнозируется распределение следующего символа, исходя из входной строки и состояния модели;

* предсказанный символ становится следующим символом, который подается на вход модели;

* состояние, возвращаемое моделью, передается обратно в модель, так что теперь у нее больше контекста, а не только один символ. После каждой эпохи контекст модели расширяется.

In [None]:
def generate_text(model, start_string, num_generate=100):

  # Converting our start string to numbers (vectorizing)
  input_eval = [char_to_id[c] for c in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  # Empty string to store our results
  text_generated = []

  # Low temperatures results in more predictable text.
  # Higher temperatures results in more surprising text.
  # Experiment to find the best setting.
  temperature = 1.0

  # Here batch size == 1
  model.reset_states()
  for i in range(num_generate):
    predictions = model(input_eval)
    # remove the batch dimension
    predictions = tf.squeeze(predictions, 0)

    # using a categorical distribution to predict the character returned by the model
    predictions = predictions / temperature
    predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

    # We pass the predicted character as the next input to the model
    # along with the previous hidden state
    input_eval = tf.expand_dims([predicted_id], 0)

    text_generated.append(id_to_char[predicted_id])

  return (start_string + ''.join(text_generated))

In [None]:
generate_text(full_rnn, "The result of the training is")

In [None]:
generate_text(single_lstm, "The result of the training is")

In [None]:
generate_text(double_lstm, "The result of the training is")

In [None]:
generate_text(single_gru, "The result of the training is")