In [79]:
# импорт библиотек
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
import numpy as np


In [80]:

# Шаг 1: Подготовка данных

# Функция для генерации последовательностей x и соответствующих y
def generate_sequences(length):
    x = torch.randint(0, 10, (length,))  # Генерируем случайную последовательность x с использованием torch
    y = (x + x[0]) % 10  # Рассчитываем соответствующую последовательность y
    return x, y

# Генерация последовательностей разной длины
x_10, y_10 = generate_sequences(10)
x_40, y_40 = generate_sequences(40)
x_80, y_80 = generate_sequences(80)

In [81]:
def transform(sequences):
    # Создаем новую матрицу того же размера, что и исходная, заполненную нулями
    transformed = torch.zeros_like(sequences)

    # Проходим по всем последовательностям
    for i, sequence in enumerate(sequences):
        # Присваиваем первый элемент напрямую
        transformed[i, 0] = sequence[0]
        # Вычисляем оставшиеся элементы
        for j in range(1, len(sequence)):
            transformed[i, j] = (sequence[j] + sequence[0]) % 10

    return transformed

In [82]:
# Функция для генерации набора данных
def create_datasets(num_samples, sequence_lengths):
    datasets = []
    for length in sequence_lengths:
        # Генерация данных для каждой длины последовательности
        x_data = [generate_sequences(length)[0] for _ in range(num_samples)]
        y_data = [generate_sequences(length)[1] for _ in range(num_samples)]
        
        # Преобразование в тензоры PyTorch
        x_tensor = torch.stack(x_data)
        y_tensor = torch.stack(y_data)
        
        # Создание TensorDataset
        datasets.append(TensorDataset(x_tensor, y_tensor))
    return datasets

# Создание обучающих и тестовых наборов данных
num_train_samples = 1000  # Количество образцов для обучения
num_test_samples = 200   # Количество образцов для тестирования
sequence_lengths = [10, 40, 80]  # Длины последовательностей

train_datasets = create_datasets(num_train_samples, sequence_lengths)
test_datasets = create_datasets(num_test_samples, sequence_lengths)

# Теперь у вас есть список обучающих и тестовых наборов данных для разных длин последовательностей


In [83]:
# Функция для создания даталоадера из последовательностей
def create_dataloader(x, y, batch_size=32, shuffle=True):
    dataset = TensorDataset(x, y)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

# Сначала сгенерируем последовательности
x_10, y_10 = generate_sequences(10)
x_40, y_40 = generate_sequences(40)
x_80, y_80 = generate_sequences(80)

# Применим функцию transform к каждой из них
y_10_transformed = transform(x_10.unsqueeze(0))
y_40_transformed = transform(x_40.unsqueeze(0))
y_80_transformed = transform(x_80.unsqueeze(0))

# Создадим даталоадеры для каждой пары последовательностей
dataloader_10 = create_dataloader(x_10.unsqueeze(0), y_10_transformed, batch_size=1)
dataloader_40 = create_dataloader(x_40.unsqueeze(0), y_40_transformed, batch_size=1)
dataloader_80 = create_dataloader(x_80.unsqueeze(0), y_80_transformed, batch_size=1)

# Теперь у нас есть три даталоадера, которые можно использовать для обучения модели или других целей.


In [84]:
# Шаг 2: Определение архитектур моделей

class MyRecurrentNet(nn.Module):
    def __init__(self, rnn_class, seq_len, input_size, hidden_size, num_classes):
        super(MyRecurrentNet, self).__init__()
        self.embedding = nn.Embedding(10, input_size)
        self.rnn = rnn_class(input_size=input_size, hidden_size=hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        x = self.embedding(x)
        out, _ = self.rnn(x)
        # Теперь 'out' имеет размерность (batch_size, seq_len, hidden_size)
        out = self.fc(out)
        # 'out' после этого имеет размерность (batch_size, seq_len, num_classes)
        return out


In [85]:
model_MyLSTM = MyRecurrentNet(nn.LSTM, seq_len=10, input_size=10, hidden_size=64, num_classes=25)
model_MyRNN = MyRecurrentNet(nn.RNN, seq_len=10, input_size=10, hidden_size=64, num_classes=25)
model_MyGRU = MyRecurrentNet(nn.GRU, seq_len=10, input_size=10, hidden_size=64, num_classes=25)


In [86]:

# Шаг 3: Обучение моделей

# Функция для обучения модели
def train_model(model, train_loader, criterion, optimizer, num_classes, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        for x, y in train_loader:
            optimizer.zero_grad()
            outputs = model(x)
            # Преобразование выходных данных модели для соответствия размерности целевых данных
            outputs = outputs.view(-1, num_classes)  # Размерность теперь [batch_size * sequence_length, num_classes]
            y = y.view(-1)  # Преобразование y к одномерному вектору
            # Вычисление потерь
            loss = criterion(outputs, y.long())  # Убедитесь, что y имеет тип LongTensor
            loss.backward()
            optimizer.step()


In [87]:
# Шаг 4: Оценка производительности моделей

# Создаем заголовок для таблички с результатами
print(f"{'Модель':<10}{'Длина 10':<15}{'Длина 40':<15}{'Длина 80':<15}")

# Цикл по типам моделей
for model_type in ["RNN", "LSTM", "GRU"]:
    row = f"{model_type:<10}"
    
    # Цикл по датасетам
    for i, (train_dataset, test_dataset) in enumerate(zip(train_datasets, test_datasets)):
        # Создаем train_loader для текущего набора данных
        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

        # Создаем экземпляр модели в зависимости от типа
        if model_type == "RNN":
            model = MyRecurrentNet(nn.RNN, seq_len=10, input_size=10, hidden_size=64, num_classes=25)
        elif model_type == "LSTM":
            model = MyRecurrentNet(nn.LSTM, seq_len=10, input_size=10, hidden_size=64, num_classes=25)
        elif model_type == "GRU":
            model = MyRecurrentNet(nn.GRU, seq_len=10, input_size=10, hidden_size=64, num_classes=25)
        
        # Определяем функцию потерь и оптимизатор
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        
        # Обучаем модель
        # Обучаем модель, передав все необходимые аргументы
        train_model(model, train_loader, criterion, optimizer, num_classes=25)
        
        # Создаем test_loader для текущего набора данных
        test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
        model.eval()  # Перевод модели в режим оценки
        test_loss = 0.0
        correct = 0
        total = 0
        
        # Оценка производительности на тестовых данных
        with torch.no_grad():
            for x, y in test_loader:
                outputs = model(x)
                outputs = outputs.view(-1, outputs.size(-1))  # Преобразуем размерность, если нужно

                # Проверьте, требуется ли преобразование `y` из one-hot кодировки в индексы классов
                if y.ndim > 1 and y.size(1) > 1:  # Это условие предполагает one-hot кодировку
                    y = torch.argmax(y, dim=1)

                test_loss += criterion(outputs, y).item()
                _, predicted = torch.max(outputs.data, 1)
                total += y.size(0)
                correct += (predicted == y).sum().item()
        
        # Расчет средней потери и точности
        test_loss /= len(test_loader)
        accuracy = 100 * correct / total
        row += f"{test_loss/len(test_loader):<15.4f}{accuracy:<15.2f}"
    print(row)



Модель    Длина 10       Длина 40       Длина 80       


ValueError: Expected input batch_size (640) to match target batch_size (64).

In [64]:
print("Проверка размерностей входных данных:")
print(f"Входные данные X: {x.size()}")  # Ожидается [batch_size, sequence_length]
print(f"Целевые данные Y: {y.size()}")  # Ожидается [batch_size, sequence_length]

# Проверка размерностей выходных данных модели
test_outputs = model(x)
print(f"Выходные данные модели: {test_outputs.size()}")  # Ожидается [batch_size, sequence_length, num_classes]

num_classes = 25  # Установите количество классов для вашей задачи

# Проверка размерностей после преобразования для функции потерь
print(f"Размерность целевых данных для функции потерь: {y.view(-1).size()}")
print(f"Размерность выходных данных модели для функции потерь: {test_outputs.view(-1, num_classes).size()}")


Проверка размерностей входных данных:
Входные данные X: torch.Size([64, 10])
Целевые данные Y: torch.Size([64, 10])
Выходные данные модели: torch.Size([64, 10, 25])
Размерность целевых данных для функции потерь: torch.Size([640])
Размерность выходных данных модели для функции потерь: torch.Size([640, 25])
