<!-- #  Семинар: Рекурентные нейронные сети. -->

В данной работе вам предлагается посмотреть на всю мощь рекурентных нейронных сетей решив небольшую задачу. 

 Предлагаю решить вам задачу расшифровки сообщения с помощью RNN. 
 Представьте, что вам даны сообщения зашифрованные с помощью шифра Цезаря, являющимся одним из самый простых шифров в криптографии.
 

Шифр цезаря работает следующим образом: каждая буква 
исходного алфавита сдвигается на K символов вправо: 

Пусть нам дано сообщение: message="RNN IS NOT AI", тогда наше шифрование выполняющиеся по правилу f, с K=2, даст нам результат:
f(message, K) = TPPAKUAPQVACK

Для удобство можно взять символы только одного регистра в нашей имплементации, и сказать, что все буквы не английского алфавита будут отмечены как прочерк "-".

In [26]:
import random
import torch
import torch.nn as nn

In [27]:
# Определим ключ и словарь
key = 2

vocab = [char for char in ' -ABCDEFGHIJKLMNOPQRSTUVWXYZ']
print(vocab)

[' ', '-', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']


In [28]:
# Напишем функцию, которая делает 
def encrypt(text, key):
    """Returns the encrypted form of 'text'."""
    indexes = [vocab.index(char) for char in text]
    encrypted_indexes = [(idx + key) % len(vocab) for idx in indexes]
    encrypted_chars = [vocab[idx] for idx in encrypted_indexes]
    encrypted = ''.join(encrypted_chars)
    return encrypted

print(encrypt('RNN IS NOT AI', key))

TPPAKUAPQVACK


Теперь нам необходимо нагенерировать датасет для решения задачи обучения с учителем. Нашим датасетом может быть случайно зашифрованные фразы, и тогда его структура будет следующей:
message --- encrypted message

Это пример параллельного корпуса из НЛП.

Но нам необходимо представить каждую букву в виде ее номера в словаре, чтобы далее воспользоваться Embedding слоем. 

Для простоты давайте допустим, что все строки имеют одинаковую длину seq_len

In [29]:
num_examples = 256 # размер датасета
seq_len = 18 # максимальная длина строки

def encrypted_dataset(dataset_len, k):
    """
    Return: List(Tuple(Tensor encrypted, Tensor source))
    """
    dataset = []
    for x in range(dataset_len):
        random_message  = ''.join([random.choice(vocab) for x in range(seq_len)])
        encrypt_random_message = encrypt(''.join(random_message), k)
        src = [vocab.index(x) for x in random_message]
        tgt = [vocab.index(x) for x in encrypt_random_message]
        dataset.append([torch.tensor(tgt), torch.tensor(src)])
    return dataset

In [30]:
x = encrypted_dataset(10, 10)

In [31]:
x

[[tensor([17, 21, 20,  5, 10, 10, 27,  4,  8,  0,  0, 22, 22, 14,  0,  8, 13, 16]),
  tensor([ 7, 11, 10, 23,  0,  0, 17, 22, 26, 18, 18, 12, 12,  4, 18, 26,  3,  6])],
 [tensor([ 2,  1,  3, 17,  0, 22, 18, 12, 25,  8,  8,  8, 10, 14, 14,  6,  9,  7]),
  tensor([20, 19, 21,  7, 18, 12,  8,  2, 15, 26, 26, 26,  0,  4,  4, 24, 27, 25])],
 [tensor([ 1, 15, 25, 24, 13, 23,  7,  6, 24, 10,  9, 10, 19,  6,  9, 14, 26,  2]),
  tensor([19,  5, 15, 14,  3, 13, 25, 24, 14,  0, 27,  0,  9, 24, 27,  4, 16, 20])],
 [tensor([26,  6, 17, 21, 19, 11, 27, 27, 27, 12, 18,  8, 23, 24, 17,  0,  5, 21]),
  tensor([16, 24,  7, 11,  9,  1, 17, 17, 17,  2,  8, 26, 13, 14,  7, 18, 23, 11])],
 [tensor([ 4,  7, 11,  4, 24,  7, 10, 22,  0,  5,  7,  2,  4, 13, 19,  9, 15,  5]),
  tensor([22, 25,  1, 22, 14, 25,  0, 12, 18, 23, 25, 20, 22,  3,  9, 27,  5, 23])],
 [tensor([18, 16, 25,  4, 21,  2, 13,  4, 22, 27, 21,  3, 24,  8, 23,  8, 11, 21]),
  tensor([ 8,  6, 15, 22, 11, 20,  3, 22, 12, 17, 11, 21, 14, 26, 13, 2

**Pytorch RNN:**
$$h_t = \text{tanh}(w_{ih} x_t + b_{ih} + w_{hh} h_{(t-1)} + b_{hh})$$

**where : $h_t$ is the hidden state at time $t$, $x_t$ is
    the input at time $t$, and $h_{(t-1)}$ is the hidden state of the
    previous layer at time $t-1$ or the initial hidden state at time $0$.**
    
Args: 

        input_size: The number of expected features in the input $x$
        hidden_size: The number of features in the hidden state $h$
        num_layers: Number of recurrent layers. E.g., setting

In [32]:
class Decipher(nn.Module):
    def __init__(self, vocab_size, 
                 embedding_dim, 
                 hidden_dim, 
                 rnn_type='simple'):
        """
        :params: int vocab_size 
        :params: int embedding_dim
        :params
        """
        super(Decipher, self).__init__()
        self.embed = nn.Embedding(vocab_size, embedding_dim)
        # look here: https://pytorch.org/tutorials/beginner/nlp/word_embeddings_tutorial.html
        if rnn_type == 'simple':
            self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=2)
         
        self.fc = nn.Linear(hidden_dim, vocab_size)
        self.initial_hidden = torch.zeros(2, 1, hidden_dim)
        
    def forward(self, cipher):
        # CHECK INPUT SIZE
        # Unsqueeze 1 dimension for batches
        # https://pytorch.org/docs/stable/torch.html
        embd_x = self.embed(cipher).unsqueeze(1)
        out_rnn, hidden = self.rnn(embd_x, self.initial_hidden)
        # Apply the affine transform and transpose output in appropriate way
        # because you want to get the softmax on vocabulary dimension
        # in order to get probability of every letter
        return self.fc(out_rnn).transpose(1, 2)

In [33]:
# Определим параметры нашей модели
embedding_dim = 5
hidden_dim = 10
vocab_size = len(vocab) 
lr = 1e-3

criterion = torch.nn.CrossEntropyLoss()

# Инициализируем модель
model = Decipher(vocab_size, embedding_dim, hidden_dim)

# Инициализируем оптимизатор: рекомендуется Adam
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)

num_epochs = 10

In [34]:
k = 10
for x in range(num_epochs):
    print('Epoch: {}'.format(x))
    for encrypted, original in encrypted_dataset(num_examples, k):

        scores = model(encrypted)
        original = original.unsqueeze(1)
        # Calculate loss
        loss = criterion(scores, original)
        # Zero grads
        optimizer.zero_grad()
        # Backpropagate
        loss.backward()
        # Update weights
        optimizer.step()
    print('Loss: {:6.4f}'.format(loss.item()))

    with torch.no_grad():
        matches, total = 0, 0
        for encrypted, original in encrypted_dataset(num_examples, k):
            # Compute a softmax over the outputs
            predictions = torch.nn.functional.softmax(model(encrypted), 1)
            # Choose the character with the maximum probability (greedy decoding)
            _, batch_out = predictions.max(dim=1)
            # Remove batch
            batch_out = batch_out.squeeze(1)
            # Calculate accuracy
            matches += torch.eq(batch_out, original).sum().item()
            total += torch.numel(batch_out)
        accuracy = matches / total
        print('Accuracy: {:4.2f}%'.format(accuracy * 100))

Epoch: 0
Loss: 2.8201
Accuracy: 30.47%
Epoch: 1
Loss: 1.8440
Accuracy: 60.46%
Epoch: 2
Loss: 1.3467
Accuracy: 84.46%
Epoch: 3


KeyboardInterrupt: 