In [2]:
import pandas as pd
import numpy as np
import datetime
import spacy
import re
import torch
import pickle
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from gensim.models import Word2Vec

In [3]:
# Ознакомимся с нашим набором данных
df = pd.read_csv('../data/text.csv')

In [4]:
# Загрузка списка из файла
with open('tokenized_text.pkl', 'rb') as f:
    tokenized_text = pickle.load(f)

In [5]:
# буду использовать word2vec для эмбеддинга
w2v_model = Word2Vec(tokenized_text, vector_size=100, workers=4)
# Получу векторные представления слов для каждого документа и усредняю веркторы в документе. Сохраняю индексы документов, для которых не нашлось векторных представлений слов
empty_doc_indexes = []
document_vectors = []
for index, document in enumerate(tokenized_text):
    doc_vectors = [w2v_model.wv[word] for word in document if word in w2v_model.wv]

    if doc_vectors:
        avg_doc_vector = np.mean(doc_vectors, axis=0)
        document_vectors.append(avg_doc_vector)
    else:
        empty_doc_indexes.append(index)

In [6]:
# Удалю метки для документов, слова в которых не получили векторное представление
labels_list = df['label'].values.tolist()
for index in sorted(empty_doc_indexes, reverse=True):
    labels_list.pop(index)

In [7]:
# Создаю тензоры для данных и их меток
document_vectors_np = np.array(document_vectors)
document_vectors_tensor = torch.tensor(document_vectors_np)
labels_tensor = torch.tensor(labels_list)

In [8]:
# Делю данные на тренировочные и тестовые/валидационные
train_data, test_val_data, train_labels, test_val_labels = train_test_split(document_vectors_tensor, labels_tensor, train_size=0.7, random_state=24, stratify=labels_tensor)

In [9]:
# Разделяю тестовые/валидационные на тестовые и валидационные
test_data, val_data, test_labels, val_labels = train_test_split(test_val_data, test_val_labels, train_size=0.5, random_state=24, stratify=test_val_labels)

In [10]:
# Стандартизую данные
mean = torch.mean(train_data, dim=0)
std = torch.std(train_data, dim=0)
train_data = (train_data - mean) / std
test_data = (test_data - mean) / std
val_data = (val_data - mean) / std

In [11]:
# Создаю измерение батчей
train_dataset = TensorDataset(train_data, train_labels)
val_dataset = TensorDataset(val_data, val_labels)
test_dataset = TensorDataset(test_data, test_labels)

batch_size = 64

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [12]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))

        # Получение вывода только из последнего временного шага
        out = self.fc(out[:, -1, :])
        return out

In [13]:
def training_loop(n_epochs, optimizer, model, criterion, train_loader, val_loader=None):
    train_losses = []
    val_losses = []

    for epoch in range(n_epochs):
        loss_train = 0.0
        for data, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            loss_train += loss.item()

        avg_loss_train = loss_train / len(train_loader)
        train_losses.append(avg_loss_train)

        if val_loader is not None:
            val_loss = 0.0
            with torch.no_grad():
                for val_data, val_labels in val_loader:
                    val_outputs = model(val_data)
                    val_loss += criterion(val_outputs, val_labels).item()
            avg_val_loss = val_loss / len(val_loader)
            val_losses.append(avg_val_loss)

        # Вывод средней потери на каждой эпохе
        if epoch == 0 or (epoch + 1) % 10 == 0:
            print('{} Epoch {}, Training loss: {:.4f}'.format(
                datetime.datetime.now(), epoch + 1, avg_loss_train))
            if val_loader is not None:
                print('{} Epoch {}, Validation loss: {:.4f}'.format(
                    datetime.datetime.now(), epoch + 1, avg_val_loss))

    return train_losses, val_losses

In [14]:
def plot_losses(train_losses, val_losses=None):
    plt.plot(train_losses, label='Training loss')
    if val_losses:
        plt.plot(val_losses, label='Validation loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Validation Losses')
    plt.show()

In [15]:
# Параметры модели
input_size = 100  # Размер входного вектора
hidden_size = 128  # Размер скрытого состояния LSTM
num_layers = 2  # Количество слоев LSTM
num_classes = 6  # Количество классов
learning_rate = 0.001
n_epochs = 15

# Инициализация модели
model = LSTMModel(input_size, hidden_size, num_layers, num_classes)

# Определение функции потерь и оптимизатора
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [16]:
train_losses, val_losses = training_loop(n_epochs, optimizer, model, criterion, train_loader, val_loader)

RuntimeError: For unbatched 2-D input, hx and cx should also be 2-D but got (3-D, 3-D) tensors

In [ ]:
plot_losses(train_losses, val_losses)

In [ ]:
val_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for val_data, val_labels in val_loader:
        val_outputs = model(val_data)
        val_loss += criterion(val_outputs, val_labels).item()
        _, predicted = torch.max(val_outputs, 1)
        total += val_labels.size(0)
        correct += (predicted == val_labels).sum().item()

print('Validation loss: {:.4f}'.format(val_loss / len(val_loader)))
print('Validation accuracy: {:.2f}%'.format(100 * correct / total))

In [ ]:
test_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
    for test_data, test_labels in test_loader:
        test_outputs = model(test_data)
        test_loss += criterion(test_outputs, test_labels).item()
        _, predicted = torch.max(test_outputs, 1)
        total += test_labels.size(0)
        correct += (predicted == test_labels).sum().item()

print('Test loss: {:.4f}'.format(test_loss / len(test_loader)))
print('Test accuracy: {:.2f}%'.format(100 * correct / total))