<a href="https://colab.research.google.com/github/vsolodkyi/NeuralNetworks_SkillBox/blob/main/module_14/%D0%A0%D0%B5%D0%B0%D0%BB%D0%B8%D0%B7%D0%B0%D1%86%D0%B8%D1%8F_%D1%81%D0%BB%D0%BE%D1%8F_RNN_%D0%9F%D1%80%D0%B0%D0%BA%D1%82%D0%B8%D0%BA%D0%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Реализация слоя RNN




В этом уроке мы посмотрим, как реализовать простой RNN слой. Причём, сделаем это двумя способами. Сначала будем использовать готовую реализацию из Keras, а затем реализуем свой RNN слой с нуля на чистом TensorFlow, чтобы лучше понять, как он устроен внутри. Затем сравним результаты работы двух версий RNN слоя, чтобы убедиться, что они эквивалентны.

### Используем TensorFlow 2.0

На момент подготовки этих материалов в Google Colab по умолчанию используется версия TensorFlow 1.X

Переключаемся на версию 2.0 (работает только в Colab)

In [1]:
%tensorflow_version 2.x

Colab only includes TensorFlow 2.x; %tensorflow_version has no effect.


### Загрузка библиотек
TensorFlow должен иметь как минимум версию 2.0

In [2]:
import numpy as np
import tensorflow as tf
print(tf.__version__)

2.8.2


### Входной тензор
Подготовим данные, на которых будем тестировать наш RNN слой. Пока что мы не решаем никакую конкретную задачу и ничего не обучаем. Мы просто хотим посмотреть, как работает прямое распространение RNN слоя (и сравнить две версии), поэтому нам подойдут просто случайные данные (случайный входной тензор).

В работе с рекуррентными нейронными сетями на входе, как правило, используется трёхмерный тензор. Первое измерение - батч (это измерение есть почти всегда, когда мы работаем с любыми нейронными сетями). Его смысл всегда в том, что мы одновременно обрабатываем пакет (группу) данных вместо одного экземпляра. Второе измерение - измерение последовательности. То есть, если наша последовательность была длины 100, то второе измерение будет 100. И третье измерение - размер векторного представления (эмбеддинга) каждого элемента последовательности.

Если в реальной задаче цепочки в пределах одного батча имеют разную длину (например, это предложения с разным количеством слов), то обычно все цепочки предварительно приводят к равным длинам за счёт добавления нулевых векторов (или других специальных векторов, трактуемых как элементы паддинга цепочки), чтобы мы смогли собрать нужный нам тензор.

Размерности тензора `x`: (батч, длина цепочки, размер эмбеддинга)

In [18]:
BATCH_SIZE = 2
SEQ_LEN = 100 # Длина последовательности
EMB_SIZE = 16 # Размер векторного представления (эмбеддинга)

x = np.random.rand(BATCH_SIZE, SEQ_LEN, EMB_SIZE).astype(np.float32)
print(x.shape)

(2, 100, 16)


### Создание простого RNN слоя с помощью Keras
Для начала создадим и протестируем обычный RNN слой из Keras.

Для его создания нам понадобятся следующие параметры:

`H_SIZE` - количество элементов в выходном векторе `h`.

`activation` - функция активации для слоя

`return_sequences` - флаг, в зависимости от которого на выходе мы получаем цепочку векторов `h` (той же длины, что и входная цепочка) или только последний вектор `h`

В этом примере мы будем использовать такой RNN слой, который возвращает выходной вектор `h` для каждого элемента выходной последовательности. То есть, на входе последовательность длины `SEQ_LEN`, на выходе последовательность `SEQ_LEN`

In [13]:
H_SIZE = 32
rnn_layer = tf.keras.layers.SimpleRNN(H_SIZE, activation='relu', return_sequences=True)

Запускаем инференс (прямое распространение) для созданного слоя и проверяем размерность выходного тензора.

Она должна была получиться `(BATCH_SIZE, SEQ_LEN, H_SIZE)`

In [19]:
y = rnn_layer(x)
print(y.shape)

(2, 100, 32)


### Создание простого RNN слоя с помощью TensorFlow
Теперь давайте создадим свой RNN слой на чистом TensorFlow, а затем сравним результат его работы с версией из Keras.

Отдельно выделим функцию для так называемой RNN ячейки, задача которой сделать прямое распространение в данный момент времени `t`. То есть, по входу `x` и вектору `h` с предыдущего момента времени получить новый вектор `h`.

И тогда останется просто пройтись в цикле по входной последовательности и вызвать RNN ячейку для каждого её элемента (передавая при этом текущий полученный вектор `h` в следующий запуск ячейки).

Вспомним, какие обучаемые параметры нам нужны для простого RNN слоя: две матрицы и один вектор смещений (биас). Для этого в конструкторе создадим два полносвязных слоя и у второго выключим `bias` (нам будет достаточно биаса из первого слоя).

In [15]:
class RNNLayer(tf.keras.Model):
    def __init__(self, h_size):
        super().__init__()
        self.h_size = h_size
        self.fcXH = tf.keras.layers.Dense(self.h_size)
        self.fcHH = tf.keras.layers.Dense(self.h_size, use_bias=False) # биас не нужен, так как он есть в fcXH

    # RNN ячейка
    def RNN_cell(self, x, h):
        h = tf.nn.relu(self.fcXH(x) + self.fcHH(h))
        return h

    def call(self, x_all):
        batch, length, emb_size = x.shape
        h = tf.zeros((batch, self.h_size))
        h_all = [] # список всех получившихся векторов h
        
        # Цикл по входной последовательности
        for i in range(length):
            h = self.RNN_cell(x_all[:, i, :], h)
            h_all.append(h)
            
        # склеиваем все ответы и меняем размерности, чтоб получилось (batch, length, h_size)
        h_all = tf.transpose(tf.stack(h_all), [1, 0, 2])
        return h_all

rnn_layer_my = RNNLayer(H_SIZE)

Запускаем инференс (прямое распространение) для созданного слоя и проверяем размерность выходного тензора.

Она должна была получиться `(BATCH_SIZE, SEQ_LEN, H_SIZE)`

In [20]:
y = rnn_layer_my(x)
print(y.shape)

(2, 100, 32)


### Проверка эквивалентности двух реализаций
Теперь проверим эквивалентность двух созданных RNN слоёв (из Keras и нашего собственного).

Пока что сравнить их ответы мы не можем, так как у каждой версии слоя при его создании веса инициализировались случайным образом. Параметры инициализируются либо во время первого инференса, либо после вызова `model.build(...)`.

Для того, чтобы наши слои работали одинаково, надо скопировать веса из одного в другой.

Скопируем веса из `rnn_layer` в наш `rnn_layer_my`, после чего сделаем прямое распространение для обоих реализаций и сравним результаты. Разница должна получиться нулевой (или очень маленькой).

In [21]:
# Перед тем, как что-то присваивать в параметры модели, нужно чтобы они создались.
# Для этого можно вызвать либо инференс с каким-то входом, либо model.build(...)

rnn_layer_my.fcXH.kernel = rnn_layer.weights[0]
rnn_layer_my.fcHH.kernel = rnn_layer.weights[1]
rnn_layer_my.fcXH.bias = rnn_layer.weights[2]

y = rnn_layer(x)
y_my = rnn_layer_my(x)

print(np.max(np.abs(y.numpy() - y_my.numpy())))

0.0


Очень часто RNN ячейка является отдельной сущностью. Особенно это удобно, если у неё довольно сложное строение, и если мы не хотим вдаваться в детали её реализации, а просто хотим строить рекуррентную сеть из некоторых ячеек. Чтобы закрепить эту информацию, выполните следующие задания:

**[Задание 1]** Разделите реализации RNN ячейки и RNN слоя. Другими словами, реализуйте отдельный класс `RNNCell` (унаследованный от `tf.keras.Model`) и перенесите туда все обучаемые параметры RNN слоя. Задача ячейки (при инференсе) по векторам x и h предсказывать новый вектор h. А в новом классе `RNNLayer` вместо полносвязных слоёв сразу создайте экземпляр класса `RNNCell` и используйте его для обработки входной последовательности всё в том же цикле, что и раньше. 

**[Задание 2]** Сравните результаты работы нового `RNNLayer` с оригинальным RNN слоем из Keras также, как мы это делали раньше (разница предсказаний должна получиться равной нулю).

In [23]:
class RNNCell(tf.keras.Model):
      def __init__(self, h_size):
        super().__init__()
        self.h_size = h_size
        self.fcXH = tf.keras.layers.Dense(self.h_size)
        self.fcHH = tf.keras.layers.Dense(self.h_size, use_bias=False) # биас не нужен, так как он есть в fcXH

      def call(self, x, h):
        h = tf.nn.relu(self.fcXH(x) + self.fcHH(h))
        return h
  
class RNNLayerNew(tf.keras.Model):

      def __init__(self, h_size):
        super().__init__()
        self.h_size = h_size
        self.rnn_cell = RNNCell(h_size)

      

      def call(self, x_all):
        batch, length, emb_size = x.shape
        h = tf.zeros((batch, self.h_size))
        h_all = [] # список всех получившихся векторов h
        
        # Цикл по входной последовательности
        for i in range(length):
            h = self.rnn_cell(x_all[:, i, :], h)
            h_all.append(h)
            
        # склеиваем все ответы и меняем размерности, чтоб получилось (batch, length, h_size)
        h_all = tf.transpose(tf.stack(h_all), [1, 0, 2])
        return h_all 

In [26]:
rnn_layer_new = RNNLayerNew(H_SIZE)
cell_new = RNNCell(H_SIZE) 

In [27]:
y_my = rnn_layer_my(x)
print(y_my.shape)

(2, 100, 32)


In [29]:
# Перед тем, как что-то присваивать в параметры модели, нужно чтобы они создались.
# Для этого можно вызвать либо инференс с каким-то входом, либо model.build(...)

rnn_layer_new.rnn_cell.fcXH.kernel = rnn_layer.weights[0]
rnn_layer_new.rnn_cell.fcHH.kernel = rnn_layer.weights[1]
rnn_layer_new.rnn_cell.fcXH.bias = rnn_layer.weights[2]

y = rnn_layer(x)
y_my = rnn_layer_my(x)

print(np.max(np.abs(y.numpy() - y_my.numpy())))

0.0
