#  Введение в рекуррентные нейронные сети

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Николенко С., Кадурин А., Архангельская Е. Глубокое обучение.
* https://pytorch.org/docs/stable/nn.html#recurrent-layers
* https://karpathy.github.io/2015/05/21/rnn-effectiveness/
* https://pytorch.org/docs/stable/generated/torch.nn.RNNCell.html
* https://blog.floydhub.com/a-beginners-guide-on-recurrent-neural-networks-with-pytorch/
* https://pytorch.org/tutorials/intermediate/char_rnn_classification_tutorial.html

## Задачи для совместного разбора

1\. Рассмотрите пример работы одного шага простейшего рекуррентного слоя.  

$$ h' = tanh(W_{ih}x + W_{hh}h) $$

![RNN](https://kvitajakub.github.io/img/rnn-unrolled.png)

In [None]:
import torch as th
import torch.nn as nn

In [None]:
batch_size=16
seq_len = 8
emb_dim = 32
hidden_dim = 10

X = th.rand(batch_size, seq_len, emb_dim)

cell = nn.RNNCell(input_size=emb_dim, hidden_size=hidden_dim)

In [None]:
h = th.zeros(batch_size, hidden_dim)
for s in range(seq_len):
    x_i = X[:, s, :]
    h = cell(x_i, h)
    break

In [None]:
batch_size=16
seq_len = 8
emb_dim = 32
hidden_dim = 10

X = th.rand(seq_len, batch_size, emb_dim)

cell = nn.RNNCell(input_size=emb_dim, hidden_size=hidden_dim)

In [None]:
h.shape

torch.Size([16, 10])

2\. Рассмотрите пример работы рекуррентных слоев из `torch.nn`.

In [None]:
batch_size=16
seq_len = 8
emb_dim = 32
hidden_dim = 10

X = th.rand(batch_size, seq_len, emb_dim)

In [None]:
layer = nn.RNN(input_size=emb_dim, hidden_size=hidden_dim, batch_first=True)

In [None]:
o, h = layer(X)
# h - последний
# о - все слои в т.ч h
o.shape, h.shape

(torch.Size([16, 8, 10]), torch.Size([1, 16, 10]))

In [None]:
o[0, -1, :]

tensor([-0.7585, -0.6337,  0.4746,  0.2706,  0.4587,  0.0971, -0.4634, -0.6666,
         0.2110,  0.5807], grad_fn=<SliceBackward0>)

In [None]:
h[0, 0]

tensor([-0.7585, -0.6337,  0.4746,  0.2706,  0.4587,  0.0971, -0.4634, -0.6666,
         0.2110,  0.5807], grad_fn=<SelectBackward0>)

## Задачи для самостоятельного решения

<p class="task" id="1"></p>

1\. Используя класс `nn.RNNCell` (абстракцию для отдельного временного шага RNN), реализуйте простейшую рекуррентную сеть Элмана в виде класса `RNN`. Предусмотрите возможность работы с двумя вариантами данных: где данные (x) представлены в виде (batch, seq, feature) и где данные представлены в формате (seq, batch, feature). Создайте тензор `x1` размера 16 x 8 x 32 (batch, seq, feature) и пропустите через модель `RNN`. Выведите на экран форму двух полученных тензоров. Проверьте, что тензор `output[-1]` поэлементно равен `h`.

- [ ] Проверено на семинаре

In [None]:
class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, batch_first=False):
    super().__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.batch_first = batch_first
    self.cell = nn.RNNCell(input_size=input_size, hidden_size=hidden_size)

  def forward(self, x, h=None):
    '''
    x.shape = (batch_size, seq_len, feature_size) - тензор входных данных
    h.shape = (batch_size, hidden_size) - тензор со скрытым состоянием RNN
    '''
    outputs = []
    if not self.batch_first:
        x = x.permute(1, 0, 2)

    if h is None:
        h = th.zeros(size=(x.shape[0], self.hidden_size))

    seq_len = x.shape[1]
    for s in range(seq_len):
        x_i = x[:, s, :]
        h = self.cell(x_i, h)
        outputs.append(h)

    return th.stack(outputs), h
    # инициализация тензора скрытых состояний
    # h = ...

    # проход по каждому элементу последовательностей s в батче и обновление скрытого состояния
    # h = RNNCell(s_t, h)

    # вернуть тензор всех наблюдавшихся скрытых состояний размера (batch_size, seq_len, hidden_size) и тензор скрытых состояний в последний момент времени

In [None]:
rnn = RNN(input_size=emb_dim, hidden_size=hidden_dim, batch_first=True)

In [None]:
x1 = th.rand(size=(16, 8, 32))
out, h = rnn(x1)

In [None]:
out.shape

torch.Size([8, 16, 10])

In [None]:
(out[-1] == h).all()

tensor(True)

<p class="task" id="2"></p>

2\. Создайте тензор `x2` размера 8 x 16 x 32 (seq, batch, feature) и пропустите через модель `RNN`. Выведите на экран форму двух полученных тензоров. Проверьте, что тензор `output[-1]` поэлементно равен `h`.

- [ ] Проверено на семинаре

In [None]:
rnn = RNN(input_size=emb_dim, hidden_size=hidden_dim)

x1 = th.rand(size=(8, 16, 32))
out, h = rnn(x1)

In [None]:
(out[-1] == h).all()

tensor(True)

<p class="task" id="3"></p>

3\. Считайте файл `pets.csv`, приведите имена питомцев к нижнем регистру. Решите проблему с противоречивостью данных (некоторые имена встречаются в обоих классах). Разбейте набор данных на обучающую и тестовую выборку.  Создайте Vocab на основе обучающей выборки (токен - __буква__). Добавьте в словарь специальный токен `<PAD>`. Выведите на экран количество токенов в полученном словаре.

- [ ] Проверено на семинаре

In [None]:
import pandas as pd
import numpy as np

df = pd.read_csv('pets.csv')
df.head()

Unnamed: 0,имя,класс
0,Арчи,собака
1,Алекс,собака
2,Амур,собака
3,Алтaй,собака
4,Альф,собака


In [None]:
df = df.drop_duplicates(subset='имя')

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
train_dset, test_dset = train_test_split(df, test_size=0.2)

In [None]:
from torchtext.vocab import build_vocab_from_iterator

corpus = []
for name in train_dset['имя']:
    corpus.append(list(name.lower()))

vocab = build_vocab_from_iterator(corpus, specials=['pad'])

In [None]:
len(vocab.get_stoi())

39

In [None]:
df['класс'].unique()

array(['собака', 'кошка'], dtype=object)

<p class="task" id="4"></p>

4\. Создайте класс `PetsDataset`. Используя преобразования, сделайте длины наборов индексов одинаковой фиксированной длины (подходящее значение определите сами). Закодируйте целыми числами классы питомцев. Создайте два объекта класса `PetsDataset` (для обучающей и тестовой выборки). Выведите на экран их длины.

- [ ] Проверено на семинаре

In [None]:
from torch.utils.data import Dataset
import torchtext.transforms as T
from torchtext.vocab import build_vocab_from_iterator

class PetsDataset(Dataset):
    def __init__(self, df, vocab):
      self.df = df
      self.label = list(df['класс'].map(
        {'собака': 0, 'кошка': 1}
      ))
      self.vocab = vocab
      self.corpus = []
      for name in df['имя']:
          self.corpus.append(list(name.lower()))
      self.transform = T.Sequential(
          T.ToTensor(0),
          T.PadTransform(max_length=self.get_max_len(), pad_value=0)
      )

    def __getitem__(self, idx):
      t = self.corpus[idx]
      l = self.label[idx]

      if type(idx) == int:
          vectors = [self.vocab.lookup_indices([letter])[0] for letter in t]
      else:
          vectors = [[self.vocab.lookup_indices([letter])[0] for letter in word] for word in t]
      return self.transform(vectors), th.tensor(l)

    def __len__(self):
        return len(self.df)

    def get_max_len(self):
        return max(len(x) for x in self.corpus )

In [None]:
pets_train = PetsDataset(train_dset, vocab)
pets_test = PetsDataset(test_dset, vocab)

In [None]:
len(pets_train), len(pets_test)

(2233, 559)

In [None]:
pets_train[:3]

(tensor([[28,  6, 10,  2,  4,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
           0,  0,  0,  0],
         [14,  7,  6, 19,  8,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
           0,  0,  0,  0],
         [10, 13, 17,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
           0,  0,  0,  0]]),
 tensor([0, 1, 0]))

<p class="task" id="5"></p>

5\. Используя созданный класс `RNN`, решите задачу классификации категорий питомцев по их именам. Выведите на экран отчет по классификации на обучающем и тестовом множестве.

- [ ] Проверено на семинаре

In [None]:
from torch.utils.data import DataLoader

dl_train = DataLoader(pets_train, batch_size=500)
dl_test = DataLoader(pets_test, batch_size=10)

In [None]:
class Model(nn.Module):
    def __init__(self, seq_len, emb_dim, hidden_size, out_features, batch_first=False):
        super().__init__()
        self.emb = self.emb = nn.Embedding(
        num_embeddings = seq_len, embedding_dim = emb_dim
        )
        self.rnn = RNN(input_size=emb_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(in_features=hidden_dim, out_features=64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(in_features=64, out_features=2)
    def forward(self, X):
        out = self.emb(X)
        out, h = self.rnn(out)
        out = self.relu(out)
        out = self.fc(h)
        out = self.fc2(out)
        return out

In [None]:
import torch.optim as optim

n_epochs = 3
lr = 0.01
model = Model(len(vocab.get_stoi()), 300, 100, 2)
crit = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr)

#model = Model(len(vocab.get_stoi()), 300, 100, 2)
losses = []

for epoch in range(n_epochs):
    train_losses = []
    print(epoch)
    for X_, y_ in dl_train:
        out = model(X_)
        loss = crit(out, y_)
        loss.backward()
        train_losses.append(loss)
        optimizer.step()
        optimizer.zero_grad()
    losses.append(th.tensor(train_losses).mean())

0
1
2


In [None]:
losses

[tensor(0.6933), tensor(0.6932), tensor(0.6932)]

In [None]:
from sklearn.metrics import classification_report

train_outs = th.tensor([])
for X_, y_ in dl_train:
    out = model(X_)
    train_outs = th.cat((train_outs, out))
print(classification_report(pets_train[:][1],
                      th.argmax(train_outs, dim=1).detach().numpy(), zero_division=True))

test_outs = th.tensor([])
for X_, y_ in dl_test:
    out = model(X_)
    test_outs = th.cat((test_outs, out))
print(classification_report(pets_test[:][1],
                      th.argmax(test_outs, dim=1).detach().numpy(), zero_division=True))

              precision    recall  f1-score   support

           0       0.51      1.00      0.67      1134
           1       1.00      0.00      0.00      1099

    accuracy                           0.51      2233
   macro avg       0.75      0.50      0.34      2233
weighted avg       0.75      0.51      0.34      2233

              precision    recall  f1-score   support

           0       0.51      1.00      0.68       285
           1       1.00      0.00      0.01       274

    accuracy                           0.51       559
   macro avg       0.76      0.50      0.34       559
weighted avg       0.75      0.51      0.35       559



<p class="task" id="6"></p>

6\. Решите предыщую задачу, заменив собственный модуль `RNN` на модули `nn.RNN`, `nn.LSTM` и `nn.GRU`. Сравните результаты работы.

- [ ] Проверено на семинаре

In [None]:
class Model(nn.Module):
    def __init__(self, seq_len, emb_dim, hidden_size, out_features, batch_first=False):
        super().__init__()
        self.emb = self.emb = nn.Embedding(
        num_embeddings = seq_len, embedding_dim = emb_dim
        )
        self.rnn = nn.RNN(input_size=emb_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(in_features=hidden_dim, out_features=64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(in_features=64, out_features=2)
    def forward(self, X):
        out = self.emb(X)
        out, h = self.rnn(out)
        out = self.fc(h[0])
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [None]:
import torch.optim as optim

n_epochs = 3
lr = 0.1
model = Model(len(vocab.get_stoi()), 300, 100, 2)
crit = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr)

losses = []

for epoch in range(n_epochs):
    train_losses = []
    print(epoch)
    for X_, y_ in dl_train:
        out = model(X_)
        #out = out.reshape(out.shape[1], 2)
        loss = crit(out, y_)
        loss.backward()
        train_losses.append(loss)
        optimizer.step()
        optimizer.zero_grad()
    losses.append(th.tensor(train_losses).mean())

0
1
2


In [None]:
train_outs = th.tensor([])
for X_, y_ in dl_train:
    out = model(X_)
    out = out.reshape(out.shape[1], 2)
    train_outs = th.cat((train_outs, out))
print(classification_report(pets_train[:][1],
                      th.argmax(train_outs, dim=1).detach().numpy(), zero_division=True))

test_outs = th.tensor([])
for X_, y_ in dl_test:
    out = model(X_)
    out = out.reshape(out.shape[1], 2)
    test_outs = th.cat((test_outs, out))
print(classification_report(pets_test[:][1],
                      th.argmax(test_outs, dim=1).detach().numpy(), zero_division=True))

              precision    recall  f1-score   support

           0       0.00      0.00      0.00      1134
           1       0.49      1.00      0.66      1099

    accuracy                           0.49      2233
   macro avg       0.25      0.50      0.33      2233
weighted avg       0.24      0.49      0.32      2233

              precision    recall  f1-score   support

           0       0.00      0.00      0.00       285
           1       0.49      1.00      0.66       274

    accuracy                           0.49       559
   macro avg       0.24      0.50      0.33       559
weighted avg       0.24      0.49      0.32       559



In [None]:
class Model(nn.Module):
    def __init__(self, seq_len, emb_dim, hidden_size, out_features, batch_first=False):
        super().__init__()
        self.emb = self.emb = nn.Embedding(
        num_embeddings = seq_len, embedding_dim = emb_dim
        )
        self.rnn = nn.LSTM(input_size=emb_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(in_features=hidden_dim, out_features=64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(in_features=64, out_features=2)
    def forward(self, X):
        out = self.emb(X)
        out, (h, c) = self.rnn(out)
        out = self.fc(h)
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [None]:
n_epochs = 3
lr = 0.1
model = Model(len(vocab.get_stoi()), 300, 100, 2)
crit = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr)

losses = []

for epoch in range(n_epochs):
    train_losses = []
    print(epoch)
    for X_, y_ in dl_train:
        out = model(X_)
        out = out.reshape(out.shape[1], 2)
        loss = crit(out, y_)
        loss.backward()
        train_losses.append(loss)
        optimizer.step()
        optimizer.zero_grad()
    losses.append(th.tensor(train_losses).mean())

0
1
2


In [None]:
train_outs = th.tensor([])
for X_, y_ in dl_train:
    out = model(X_)
    out = out.reshape(out.shape[1], 2)
    train_outs = th.cat((train_outs, out))
print(classification_report(pets_train[:][1],
                      th.argmax(train_outs, dim=1).detach().numpy(), zero_division=True))

test_outs = th.tensor([])
for X_, y_ in dl_test:
    out = model(X_)
    out = out.reshape(out.shape[1], 2)
    test_outs = th.cat((test_outs, out))
print(classification_report(pets_test[:][1],
                      th.argmax(test_outs, dim=1).detach().numpy(), zero_division=True))

              precision    recall  f1-score   support

           0       0.51      1.00      0.67      1134
           1       1.00      0.00      0.01      1099

    accuracy                           0.51      2233
   macro avg       0.75      0.50      0.34      2233
weighted avg       0.75      0.51      0.35      2233

              precision    recall  f1-score   support

           0       0.51      0.97      0.67       285
           1       0.43      0.02      0.04       274

    accuracy                           0.51       559
   macro avg       0.47      0.50      0.35       559
weighted avg       0.47      0.51      0.36       559



In [None]:
class Model(nn.Module):
    def __init__(self, seq_len, emb_dim, hidden_size, out_features, batch_first=False):
        super().__init__()
        self.emb = self.emb = nn.Embedding(
        num_embeddings = seq_len, embedding_dim = emb_dim
        )
        self.rnn = nn.GRU(input_size=emb_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(in_features=hidden_dim, out_features=64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(in_features=64, out_features=2)
    def forward(self, X):
        out = self.emb(X)
        out, h = self.rnn(out)
        out = self.fc(h)
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [None]:
n_epochs = 3
lr = 0.1
model = Model(len(vocab.get_stoi()), 300, 100, 2)
crit = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr)

losses = []

for epoch in range(n_epochs):
    train_losses = []
    print(epoch)
    for X_, y_ in dl_train:
        out = model(X_)
        out = out.reshape(out.shape[1], 2)
        loss = crit(out, y_)
        loss.backward()
        train_losses.append(loss)
        optimizer.step()
        optimizer.zero_grad()
    losses.append(th.tensor(train_losses).mean())

0
1
2


In [None]:
train_outs = th.tensor([])
for X_, y_ in dl_train:
    out = model(X_)
    out = out.reshape(out.shape[1], 2)
    train_outs = th.cat((train_outs, out))
print(classification_report(pets_train[:][1],
                      th.argmax(train_outs, dim=1).detach().numpy(), zero_division=True))

test_outs = th.tensor([])
for X_, y_ in dl_test:
    out = model(X_)
    out = out.reshape(out.shape[1], 2)
    test_outs = th.cat((test_outs, out))
print(classification_report(pets_test[:][1],
                      th.argmax(test_outs, dim=1).detach().numpy(), zero_division=True))

              precision    recall  f1-score   support

           0       0.57      0.69      0.62      1134
           1       0.59      0.45      0.51      1099

    accuracy                           0.57      2233
   macro avg       0.58      0.57      0.57      2233
weighted avg       0.58      0.57      0.57      2233

              precision    recall  f1-score   support

           0       0.56      0.64      0.59       285
           1       0.56      0.47      0.51       274

    accuracy                           0.56       559
   macro avg       0.56      0.55      0.55       559
weighted avg       0.56      0.56      0.55       559



## Обратная связь
- [x] Хочу получить обратную связь по решению