In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from datasets import load_dataset
import matplotlib.pyplot as plt
from collections import Counter
import random
import numpy as np

In [2]:
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)
random.seed(42)
np.random.seed(42)

In [3]:
df_summary = 0

In [4]:
df_score = 0

In [None]:
# This code is downloading the notebook from GitHub and running it
import requests
from pathlib import Path
url = "https://raw.githubusercontent.com/nbakas/NLP/refs/heads/main/02-Preprocessing.ipynb"
filename = url.split("/")[-1]
local_path = Path.cwd() / filename
response = requests.get(url)
response.raise_for_status()
local_path.write_bytes(response.content)
%run $local_path

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from collections import Counter
import numpy as np

# Example preprocessing: tokenize and build vocab

In [7]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, vocab=None, max_len=20):
        self.max_len = max_len
        self.texts = [t.lower().split() for t in texts]
        self.labels = labels

        if vocab is None:
            counter = Counter(word for text in self.texts for word in text)
            self.vocab = {word: idx + 2 for idx, (word, _) in enumerate(counter.most_common())}  # 0: pad, 1: unk
            self.vocab['<PAD>'] = 0
            self.vocab['<UNK>'] = 1
        else:
            self.vocab = vocab

    def __len__(self):
        return len(self.labels)

    def encode_text(self, text):
        ids = [self.vocab.get(word, self.vocab['<UNK>']) for word in text]
        if len(ids) < self.max_len:
            ids += [self.vocab['<PAD>']] * (self.max_len - len(ids))
        else:
            ids = ids[:self.max_len]
        return torch.tensor(ids)

    def __getitem__(self, idx):
        text = self.encode_text(self.texts[idx])
        label = torch.tensor(self.labels[idx] - 1)  # Assuming scores are 1-5, make them 0-4
        return text, label

# Model

In [8]:
class DLModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, model_type='RNN', dropout_prob=0.5):
        super().__init__()
        # This layer automatically learns word embeddings during training, so you just need to convert your text into lists of token indices — which your TextDataset already does in encode_text().
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        if model_type == 'RNN':
            self.rnn = nn.RNN(embed_dim, hidden_dim, batch_first=True)
        elif model_type == 'LSTM':
            self.rnn = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        elif model_type == 'GRU':
            self.rnn = nn.GRU(embed_dim, hidden_dim, batch_first=True)
        else:
            raise ValueError('model_type must be RNN, LSTM, or GRU')

        self.dropout = nn.Dropout(dropout_prob)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        output, hidden = self.rnn(embedded)

        if isinstance(hidden, tuple):  # LSTM gives (hidden_state, cell_state)
            hidden = hidden[0]

        hidden = self.dropout(hidden.squeeze(0))
        out = self.fc(hidden)
        return out

# Prepare data

In [None]:
texts = df_summary.tolist()
texts[:5]

In [None]:
labels = df_score.astype(int).tolist()
labels[:5]

In [11]:
train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

In [12]:
train_dataset = TextDataset(train_texts, train_labels)
test_dataset = TextDataset(test_texts, test_labels, vocab=train_dataset.vocab)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)

# Hyperparameters

In [13]:
vocab_size = len(train_dataset.vocab)
embed_dim = 384
hidden_dim = 128
output_dim = 5  # 5 classes (1-5 stars)
model_type = 'LSTM'  # <-- CHANGE HERE to 'RNN' or 'GRU'

# Model, loss, optimizer

In [14]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DLModel(vocab_size, embed_dim, hidden_dim, output_dim, model_type).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

In [None]:
device

In [15]:
nof_epochs = 10

In [16]:
import time
start_time = time.time()

In [17]:
train_accuracies = []
test_accuracies = []
train_losses = []
test_losses = []

# Training loop

In [None]:
for epoch in range(nof_epochs):
    model.train()
    train_loss = 0
    for idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        if idx % 100 == 0:
            print(f"Epoch {epoch+1}, Batch {idx+1}/{len(train_loader)}, Train Loss: {train_loss/(idx+1):.4f}, Elapsed: {(time.time() - start_time)/60:.5f} min")
    train_loss = train_loss / len(train_loader)
    train_losses.append(train_loss)
    
    # Initialize variables to track test loss and correct predictions
    test_loss = 0
    correct_train = 0
    correct_test = 0
    total_train = 0
    total_test = 0

    # Calculate train accuracy
    model.eval()
    with torch.no_grad():
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            preds = outputs.argmax(dim=1)
            correct_train += (preds == targets).sum().item()
            total_train += targets.size(0)
    train_accuracy = correct_train / total_train
    train_accuracies.append(train_accuracy)

    # Calculate test loss and accuracy
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            test_loss += loss.item()
            preds = outputs.argmax(dim=1)
            correct_test += (preds == targets).sum().item()
            total_test += targets.size(0)
    test_loss /= len(test_loader)
    test_losses.append(test_loss)
    test_accuracy = correct_test / total_test
    test_accuracies.append(test_accuracy)

    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")


In [None]:
import matplotlib.pyplot as plt

# Plot training and test loss
plt.figure(figsize=(10, 2))
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot training and test accuracy
plt.figure(figsize=(10, 2))
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(test_accuracies, label='Test Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()


# Test accuracy

In [None]:
from sklearn.metrics import classification_report
# Test + collect predictions
model.eval()
all_preds = []
all_targets = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        preds = outputs.argmax(dim=1)
        
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())

# Print classification report
print(classification_report(all_targets, all_preds, digits=4))
