In [1]:
from random import randint
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import time

In [2]:
def get_randint():
    while True:
        yield randint(0, 9)


def func(x: list) -> list:
    y_0 = x[0]
    y_all = [y_0]
    for xi in x[1:]:
        y_i = xi + y_0
        if y_i >= 10:
            y_i = y_i - 10
        y_all.append(y_i)
    return y_all


n_examples = 5000
s1 = [[next(get_randint()) for _ in range(25)] for _ in range(n_examples)]
s2 = [[next(get_randint()) for _ in range(75)] for _ in range(n_examples)]
s3 = [[next(get_randint()) for _ in range(125)] for _ in range(n_examples)]

y1 = [func(x) for x in s1]
y2 = [func(x) for x in s2]
y3 = [func(x) for x in s3]

s1, s2, s3 = torch.tensor(s1), torch.tensor(s2), torch.tensor(s3)
y1, y2, y3 = torch.tensor(y1), torch.tensor(y2), torch.tensor(y3)

# объединение последовательностей всех длин
X = [s1, s2, s3]
y = [y1, y2, y3]

In [3]:
class NeuralNetwork(nn.Module):
    def __init__(self, rnnClass, input_size, hidden_size, output_size):
        super(NeuralNetwork, self).__init__()
        self.hidden = rnnClass(input_size, hidden_size, batch_first=True)
        self.output = nn.Linear(hidden_size, output_size)
        
    def forward(self, X):   
        o, _ = self.hidden(X)
        output = self.output(o)
        return output

In [4]:
def train_model(model, num_epochs, criterion, optimizer, batch=100):
    for epoch in range(num_epochs):
            total_loss = 0
            for seq_x, seq_y in zip(X, y): # достаем по паре последовательностей (s1, y1), (s2, y2), (s3, y3)
                for i in range(int(len(seq_x) / batch)):
                    
                    inputs = seq_x[i * batch : (i + 1) * batch].unsqueeze(dim=2).to(torch.float32) # 100 x 25 x 1
                    targets = seq_y[i * batch : (i + 1) * batch].unsqueeze(dim=2).flatten() # 2500 x 1  
                    
                    optimizer.zero_grad()
                    outputs = model(inputs) # 100 x 25 x 10
                    outputs = outputs.view(-1, output_size) # 2500 x 10
                    
                    loss = criterion(outputs, targets)
                    loss.backward()
                    optimizer.step()
                    total_loss += loss.item()
                    
            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

In [5]:
# так как не используются эмбеддинги, количество параметров (фич) каждого эл-та последовательности равно 1
input_size = 1 
hidden_size = 64
output_size = 10
learning_rate = 0.001

### <проверка размерностей>

In [6]:
print(f'{s1.shape=}, {y1.shape=}')
print(f'{s1.unsqueeze(dim=2).shape=}, {y1.unsqueeze(dim=2).shape=}')

s1.shape=torch.Size([5000, 25]), y1.shape=torch.Size([5000, 25])
s1.unsqueeze(dim=2).shape=torch.Size([5000, 25, 1]), y1.unsqueeze(dim=2).shape=torch.Size([5000, 25, 1])


In [7]:
s1_check = s1.unsqueeze(dim=2).to(torch.float32)
y1_check = y1.unsqueeze(dim=2).to(torch.float32)

In [8]:
hidden = nn.RNN(1, 64, batch_first=True)
hidden(s1_check[0:3])[0].shape

torch.Size([3, 25, 64])

In [9]:
output = nn.Linear(64, 10)
output(hidden(s1_check[0:3])[0]).shape

torch.Size([3, 25, 10])

### </проверка размерностей>

# RNN 

In [10]:
num_epochs = 200

In [11]:
model_RNN = NeuralNetwork(nn.RNN, input_size, hidden_size, output_size)

In [12]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_RNN.parameters(), lr=learning_rate)

In [13]:
train_model(model_RNN, num_epochs, criterion, optimizer)

Epoch 10/200, Loss: 1.9360119104385376
Epoch 20/200, Loss: 2.2969093322753906
Epoch 30/200, Loss: 1.8558143377304077
Epoch 40/200, Loss: 1.795109510421753
Epoch 50/200, Loss: 1.3494449853897095
Epoch 60/200, Loss: 2.274125099182129
Epoch 70/200, Loss: 1.9121074676513672
Epoch 80/200, Loss: 1.7885265350341797
Epoch 90/200, Loss: 1.6759361028671265
Epoch 100/200, Loss: 1.0801721811294556
Epoch 110/200, Loss: 0.989353358745575
Epoch 120/200, Loss: 0.9236520528793335
Epoch 130/200, Loss: 0.7453180551528931
Epoch 140/200, Loss: 0.9558409452438354
Epoch 150/200, Loss: 1.1549068689346313
Epoch 160/200, Loss: 0.8634561896324158
Epoch 170/200, Loss: 0.99730384349823
Epoch 180/200, Loss: 0.9914222359657288
Epoch 190/200, Loss: 0.7600827217102051
Epoch 200/200, Loss: 0.7497560381889343


### тестирование на обучающей выборке

In [14]:
s1_test, y1_test = s1[0:5], y1[0:5]
s1_test

tensor([[8, 2, 8, 9, 4, 6, 8, 6, 7, 3, 8, 9, 6, 3, 7, 0, 5, 2, 1, 7, 4, 7, 3, 8,
         3],
        [9, 4, 7, 1, 5, 5, 5, 3, 6, 5, 8, 7, 8, 4, 3, 6, 8, 2, 4, 5, 5, 9, 1, 4,
         1],
        [6, 7, 4, 2, 1, 4, 5, 2, 0, 0, 1, 2, 1, 0, 1, 1, 9, 9, 0, 9, 9, 7, 6, 3,
         1],
        [0, 9, 9, 1, 7, 6, 9, 0, 2, 5, 8, 2, 1, 9, 1, 3, 3, 0, 9, 8, 3, 9, 8, 4,
         5],
        [7, 0, 2, 5, 6, 0, 5, 8, 0, 5, 6, 5, 9, 0, 1, 8, 2, 6, 3, 7, 7, 5, 6, 0,
         6]])

In [16]:
with torch.no_grad():
    output = model_RNN(s1_test.unsqueeze(dim=2).to(torch.float32))
    predicted = torch.argmax(output, dim=2)
predicted

tensor([[8, 0, 6, 7, 2, 4, 6, 4, 5, 1, 6, 7, 4, 1, 5, 7, 3, 0, 9, 5, 2, 5, 1, 6,
         1],
        [9, 2, 5, 9, 3, 3, 3, 1, 4, 3, 6, 5, 6, 2, 1, 4, 6, 0, 2, 3, 3, 7, 9, 2,
         9],
        [6, 3, 0, 8, 7, 0, 1, 8, 5, 5, 6, 8, 7, 6, 7, 7, 4, 4, 5, 4, 4, 2, 1, 8,
         7],
        [0, 9, 0, 2, 7, 7, 0, 1, 3, 6, 9, 3, 2, 0, 2, 4, 4, 1, 0, 8, 4, 0, 8, 5,
         6],
        [8, 7, 8, 3, 4, 7, 3, 6, 7, 3, 4, 3, 7, 7, 9, 6, 0, 4, 1, 5, 5, 3, 4, 7,
         4]])

In [17]:
torch.equal(predicted, y1_test)

False

# LSTM

In [18]:
num_epochs = 100

In [19]:
model_LSTM = NeuralNetwork(nn.LSTM, input_size, hidden_size, output_size)

In [20]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_LSTM.parameters(), lr=learning_rate)

In [21]:
train_model(model_LSTM, num_epochs, criterion, optimizer)

Epoch 10/100, Loss: 0.7779436111450195
Epoch 20/100, Loss: 0.22038637101650238
Epoch 30/100, Loss: 0.08809491246938705
Epoch 40/100, Loss: 0.033889077603816986
Epoch 50/100, Loss: 0.014261136762797832
Epoch 60/100, Loss: 0.006758007686585188
Epoch 70/100, Loss: 0.0033360205125063658
Epoch 80/100, Loss: 0.0016286259051412344
Epoch 90/100, Loss: 0.0012332035694271326
Epoch 100/100, Loss: 0.0008903219713829458


In [22]:
s1_test, y1_test = s1[0:5], y1[0:5]
with torch.no_grad():
    output = model_LSTM(s1_test.unsqueeze(dim=2).to(torch.float32))
    predicted = torch.argmax(output, dim=2)
torch.equal(predicted, y1_test)

True

# GRU

In [23]:
num_epochs = 70

In [24]:
model_GRU = NeuralNetwork(nn.GRU, input_size, hidden_size, output_size)

In [25]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_GRU.parameters(), lr=learning_rate)

In [26]:
train_model(model_GRU, num_epochs, criterion, optimizer)

Epoch 10/70, Loss: 0.351041704416275
Epoch 20/70, Loss: 0.069420225918293
Epoch 30/70, Loss: 0.025917138904333115
Epoch 40/70, Loss: 0.0116353714838624
Epoch 50/70, Loss: 0.006219253409653902
Epoch 60/70, Loss: 0.003778202459216118
Epoch 70/70, Loss: 0.002216474385932088


In [27]:
s1_test, y1_test = s1[0:5], y1[0:5]
with torch.no_grad():
    output = model_GRU(s1_test.unsqueeze(dim=2).to(torch.float32))
    predicted = torch.argmax(output, dim=2)
torch.equal(predicted, y1_test)

True

# Проверка на тестовых данных 

In [62]:
# тестирование моделей на новых тестовых датасетах 1000 х 100

n_examples = 1000
X_test = [[next(get_randint()) for _ in range(100)] for _ in range(n_examples)]
y_test = [func(x) for x in X_test]

X_test = torch.tensor(X_test)
y_test = torch.tensor(y_test)

X_test.shape, y_test.shape

(torch.Size([1000, 100]), torch.Size([1000, 100]))

In [63]:
def calculate_accuracy(x, y):
    x = x.numpy()
    y = y.numpy()
    assert x.size == y.size, 'Check dimensions!'
    return (np.sum(x == y) / x.size)

In [64]:
with torch.no_grad():
    output = model_RNN(X_test.unsqueeze(dim=2).to(torch.float32))
    predicted = torch.argmax(output, dim=2)
print('RNN accuracy:', calculate_accuracy(predicted, y_test))

RNN accuracy: 0.6079


In [65]:
with torch.no_grad():
    output = model_LSTM(X_test.unsqueeze(dim=2).to(torch.float32))
    predicted = torch.argmax(output, dim=2)
print('LSTM accuracy:', calculate_accuracy(predicted, y_test))

LSTM accuracy: 1.0


In [66]:
with torch.no_grad():
    output = model_GRU(X_test.unsqueeze(dim=2).to(torch.float32))
    predicted = torch.argmax(output, dim=2)
print('GRU accuracy:', calculate_accuracy(predicted, y_test))

GRU accuracy: 1.0


# Выводы 

Было проведено сравнение трех моделей рекуррентных сетей на задаче предсказывания последовательности чисел, зависящих от первого элемента последовательности.

Модель RNN не смогла до конца выучить алгоритм последовательности по результатам 200 эпох. Итоговый loss 0.7497, Test_acc 0.6;

Модель LSTM по результатам 100 эпох показала практически 100% точность. Итоговый loss 0.0008, Test_acc 1.0;

Модель GRU по результатам 70 эпох также показала практически 100% точность. Итоговый loss 0.0022, Test_acc 1.0;

В целом модели LSTM и GRU показали одинаково высокий результат
и подходят для предсказания длинных последовательностей.