https://stepik.org/lesson/1576200/step/5

https://github.com/selfedu-rus/neuro-pytorch/blob/main/solves/4.3.3

In [None]:
from collections.abc import Sequence
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim

# здесь объявляйте класс модели

x = torch.linspace(-20, 20, 2000)
y = torch.cos(x) + 0.5 * torch.sin(5*x) + 0.1 * torch.randn_like(x)

# Вспомогательные переменные:
total = len(x)      # общее количество отсчетов
train_size = 1000   # размер обучающей выборки
seq_length = 10     # число предыдущих отсчетов, по которым строится прогноз следующего значения

# Наборы данных для обучения и тестирования:
y.unsqueeze_(1) # (2000) -> (2000, 1)
'''
 torch.cat для объединения срезов y. Каждый срез — это 10 последовательных элементов (i до i+seq_length),
 и такие срезы берутся для i от 0 до train_size - seq_length - 1.
 Получается тензор с размерностями (seq_length, количество примеров).
'''
train_data_y = torch.cat([y[i:i+seq_length] for i in range(train_size-seq_length)], dim=1)
train_targets = torch.tensor([y[i+seq_length].item() for i in range(train_size-seq_length)])
print('Длина train:', len(train_targets))

test_data_y = torch.cat([y[i:i+seq_length] for i in range(train_size-seq_length, total-seq_length)], dim=1)
test_targets = torch.tensor([y[i+seq_length].item() for i in range(train_size-seq_length, total-seq_length)])
print('Длина test:', len(test_targets))
'''
permute меняет местами оси, чтобы примеры были первым измерением, как требуется в DataLoader
(seq_length, количество примеров) -> (количество примеров, seq_length):
'''
d_train = data.TensorDataset(train_data_y.permute(1, 0), train_targets)
d_test = data.TensorDataset(test_data_y.permute(1, 0), test_targets)

# Объекты датасетов и DataLoader:
train_data = data.DataLoader(d_train, batch_size=8, shuffle=True)
test_data = data.DataLoader(d_test, batch_size=len(d_test), shuffle=False)

class RNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.rnn = nn.RNN(input_size=1, hidden_size=5, batch_first=True)
        self.fc = nn.Linear(5, 1)

    def forward(self, x):
        _, h = self.rnn(x)
        # print(h.shape) # h.shape: (num_layers, batch_size, hidden_size) = [1, 8, 5]
        # output = self.fc(h) # плохой вариант, хоть и рабочий
        output = self.fc(h.squeeze(0)) # правильный вариант
        return output

# создание объекта модели
model = RNNModel()

optimizer = optim.RMSprop(model.parameters(), lr=0.001) # оптимизатор RMSprop с шагом обучения 0.001
loss_func = nn.MSELoss() # функция потерь - средний квадрат ошибок

epochs = 5   # число эпох
model.train() # переведите модель в режим обучения

for _e in range(epochs):
    for x_train, y_train in train_data:
        # добавляем размерность для подачи в модель, убираем лишнюю для метрики
        predict = model(x_train.unsqueeze(-1)).squeeze()
        loss = loss_func(predict, y_train)

        # выполните один шаг обучения (градиентного спуска)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# переведите модель в режим эксплуатации
model.eval()
d, t = next(iter(test_data))

# с использованием менеджера torch.no_grad вычислите прогнозы для выборки d
with torch.no_grad():
    # добавляем размерность для подачи в модель, убираем лишнюю для метрики
    predict = model(d.unsqueeze(-1)).squeeze()

# вычислите потери с помощью loss_func для predict и t; значение Q сохраните в виде вещественного числа
Q = loss_func(predict, t).item()

Q

Длина train: 990
Длина test: 1000


0.018559271469712257

RNN слой возвращает вектор h размерностью (num_layers, batch_size, hidden_size). В нашем случае [1, 8, 5].
И линейный слой принимает такую форму без модифацикаций. Полагаю, что PyTorch делает[1*8, 5] = [8,5] и всё работает.
Но при num_layers > 1 такой финт уже не пройдёт, поэтому лучше сделать выход RNN явным, дабавив .squeeze(0)

In [None]:
# @title Исправленный код (пересечение train и test)
# Тестовая выборка (исправлено!)
test_data_y = torch.cat([
    y[i:i+seq_length]
    for i in range(train_size, total-seq_length)  # ← ключевое исправление!
], dim=1)

test_targets = torch.tensor([
    y[i+seq_length].item()
    for i in range(train_size, total-seq_length)
])

In [None]:
# @title Решение https://github.com/selfedu-rus/neuro-pytorch/blob/main/solves/4.3.3
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim

# здесь объявляйте класс модели
class MyModelRNN(nn.Module):
    def __init__(self):
        super().__init__()
        self._h_size = 5
        self.rnn = nn.RNN(1, self._h_size, batch_first=True)
        self.output = nn.Linear(self._h_size, 1)

    def forward(self, x):
        _, h = self.rnn(x)
        return self.output(h)


x = torch.linspace(-20, 20, 2000)
y = torch.cos(x) + 0.5 * torch.sin(5*x) + 0.1 * torch.randn_like(x)

total = len(x)      # общее количество отсчетов
train_size = 1000   # размер обучающей выборки
seq_length = 10     # число предыдущих отсчетов, по которым строится прогноз следующего значения

y.unsqueeze_(1)
train_data_y = torch.cat([y[i:i+seq_length] for i in range(train_size-seq_length)], dim=1)
train_targets = torch.tensor([y[i+seq_length].item() for i in range(train_size-seq_length)])

test_data_y = torch.cat([y[i:i+seq_length] for i in range(train_size-seq_length, total-seq_length)], dim=1)
test_targets = torch.tensor([y[i+seq_length].item() for i in range(train_size-seq_length, total-seq_length)])

d_train = data.TensorDataset(train_data_y.permute(1, 0), train_targets)
d_test = data.TensorDataset(test_data_y.permute(1, 0), test_targets)

train_data = data.DataLoader(d_train, batch_size=8, shuffle=True)
test_data = data.DataLoader(d_test, batch_size=len(d_test), shuffle=False)

model = MyModelRNN() # создание объекта модели

optimizer = optim.RMSprop(params=model.parameters(), lr=0.001)
loss_func = nn.MSELoss()

epochs = 5 # число эпох
model.train()

for _e in range(epochs):
    for x_train, y_train in train_data:
        predict = model(x_train.unsqueeze(-1)).squeeze()
        loss = loss_func(predict, y_train)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

model.eval()
d, t = next(iter(test_data))
with torch.no_grad():
    predict = model(d.unsqueeze(-1)).squeeze()

Q = loss_func(predict, t).item()

Q

0.02265542931854725