Environment Setup and Data Download

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from collections import Counter
import re
import numpy as np
import matplotlib.pyplot as plt

# set random seed
torch.manual_seed(42)
if torch.backends.mps.is_available() and torch.backends.mps.is_built():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

print(f"Using device: {device}")

Text preprocessing and word embedding

In [None]:
# define tokenize function
def tokenize(text):
    # convert text to lower case
    text = re.sub(r'[^\w\s]', '', text.lower())
    return text.split()

class IMDBDataset(Dataset):
    def __init__(self, data_path, vocab=None, max_len=500):
        self.data = []
        self.labels = []
        self.max_len = max_len

        # read data from a local file
        for label, sentiment in enumerate(['neg', 'pos']):
            path = os.path.join(data_path, sentiment)
            if not os.path.exists(path): continue
            for file in os.listdir(path):
                with open(os.path.join(path, file), 'r', encoding='utf-8') as f:
                    self.data.append(tokenize(f.read()))
                    self.labels.append(label)

        if vocab is None:
            all_tokens = [token for text in self.data for token in text]
            counts = Counter(all_tokens)
            # filter out rare words and add pad and unk token
            self.vocab = {token: i for i, token in enumerate(['<pad>', '<unk>'] + [token for token, count in counts.items() if count >= 5])}
        else:
            self.vocab = vocab

        # convert tokens to indices
        self.data = [[self.vocab.get(token, self.vocab['<unk>']) for token in text] for text in self.data]
                
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        indices = self.data[idx]
        # Truncate and pad sequences
        indices = indices[:self.max_len]
        indices += [0] * (self.max_len - len(indices))
        return torch.tensor(indices), torch.tensor(self.labels[idx])

# Instantiate dataset (replace 'aclImdb/train' with your local path)
full_train_ds = IMDBDataset('aclImdb/train')

print('train size:', len(full_train_ds))
print('pos labels:', sum(full_train_ds.labels), 'total:', len(full_train_ds.labels))

# Train/validation split
train_size = int(0.9 * len(full_train_ds))
val_size = len(full_train_ds) - train_size
train_ds, val_ds = random_split(
    full_train_ds,
    [train_size, val_size],
    generator=torch.Generator().manual_seed(42)
)

train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=64, shuffle=False)

# Test set (reuse training vocabulary)
test_ds = IMDBDataset('aclImdb/test', vocab=full_train_ds.vocab)
test_loader = DataLoader(test_ds, batch_size=64, shuffle=False)

Load GloVe Pretrained Weights

In [None]:
def get_glove_matrix(glove_path, vocab, emb_dim=100):
    """
    glove_path: path to 'glove.6B.100d.txt'
    """
    embeddings_index = {}
    with open(glove_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coefs

    vocab_size = len(vocab)
    embedding_matrix = np.zeros((vocab_size, emb_dim))
    
    for word, i in vocab.items():
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector
        else:
            # Initialize missing words randomly
            embedding_matrix[i] = np.random.normal(scale=0.6, size=(emb_dim,))
            
    return torch.FloatTensor(embedding_matrix)

# Prepare GloVe weights
EMB_DIM = 100  # Must match the GloVe dimension
glove_path = 'glove.6B.100d.txt'
glove_weights = get_glove_matrix(glove_path, full_train_ds.vocab, emb_dim=EMB_DIM)

Define RNN Model Architecture

In [None]:
class SentimentRNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_dim, output_dim, glove_weights):
        super().__init__()
        # 1. Embedding layer
        self.embedding = nn.Embedding(vocab_size, emb_dim)
        self.embedding.weight.data.copy_(glove_weights)
        self.dropout = nn.Dropout(0.2)
        
        # 2. BiLSTM layer
        self.rnn = nn.LSTM(emb_dim, hid_dim, batch_first=True, bidirectional=True)
        
        # 3. Classification head
        self.fc = nn.Linear(hid_dim * 4, output_dim)
        
    def forward(self, text):
        # text: [batch, seq_len]
        embedded = self.dropout(self.embedding(text)) # [batch, seq_len, emb_dim]
        
        output, (hidden, cell) = self.rnn(embedded)
        # mean + max pooling
        avg_pool = output.mean(dim=1)
        max_pool, _ = output.max(dim=1)
        feat = torch.cat((avg_pool, max_pool), dim=1)  # [batch, hid_dim*4]
        return self.fc(feat)

Train Model

In [None]:
# Hyperparameters
HIDDEN_DIM = 256
# Assumes train_loader and glove_weights are ready
model = SentimentRNN(len(full_train_ds.vocab), EMB_DIM, HIDDEN_DIM, 2, glove_weights).to(device)

def train(model, loader, optimizer, criterion):
    epoch_loss = 0
    correct = 0
    total = 0
    model.train()
    
    for texts, labels in loader:
        texts, labels = texts.to(device), labels.to(device)
        
        optimizer.zero_grad()
        predictions = model(texts)
        loss = criterion(predictions, labels)
        
        loss.backward()
        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        epoch_loss += loss.item()
        preds = predictions.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
        
    avg_loss = epoch_loss / len(loader)
    acc = correct / total if total > 0 else 0
    return avg_loss, acc

def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for texts, labels in loader:
            texts, labels = texts.to(device), labels.to(device)
            predictions = model(texts)
            loss = criterion(predictions, labels)
            epoch_loss += loss.item()
            preds = predictions.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    avg_loss = epoch_loss / len(loader)
    acc = correct / total if total > 0 else 0
    return avg_loss, acc

optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

train_losses = []
val_losses = []
train_accs = []
val_accs = []

for epoch in range(20):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)
    print(
        f'Epoch: {epoch+1:02}, '
        f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, '
        f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}'
    )

In [None]:
# Plot the performance
epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(8, 4))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curve')
plt.legend()
plt.show()

plt.figure(figsize=(8, 4))
plt.plot(epochs, train_accs, label='Train Acc')
plt.plot(epochs, val_accs, label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Accuracy Curve')
plt.legend()
plt.show()