# Assignment 2 - Recurrent Neural Networks



## Programming (Full points: 100)

In this assignment, our goal is to use PyTorch to implement Recurrent Neural Networks (RNN) for sentiment analysis task. Sentiment analysis is to classify sentences (input) into certain sentiments (output labels), which includes positive, negative and neutral.

We will use a benckmark dataset, SST, for this assignment.
* we download the SST dataset from torchtext package, and do some preprocessing to build vocabulary and split the dataset into training/validation/test sets. You don't need to modify the code in this step.


In [31]:
import copy
import torch
from torch import nn
from torch import optim

import torchtext
from torchtext import data
from torchtext import datasets

# Set device for PyTorch operations
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

TEXT = data.Field(sequential=True, batch_first=True, lower=True)
LABEL = data.LabelField()

# load data splits
train_data, val_data, test_data = datasets.SST.splits(TEXT, LABEL)

# build dictionary
TEXT.build_vocab(train_data)
LABEL.build_vocab(train_data)

# hyperparameters
vocab_size = len(TEXT.vocab)
label_size = len(LABEL.vocab)
padding_idx = TEXT.vocab.stoi['<pad>']
embedding_dim = 128
hidden_dim = 128

# build iterators
train_iter, val_iter, test_iter = data.BucketIterator.splits(
    (train_data, val_data, test_data), 
    batch_size=32)

* define the training and evaluation function in the cell below.
### (25 points)


In [None]:
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    
    for batch in iterator:
        # Zero gradients
        optimizer.zero_grad()
        
        # Get text and labels, move to device
        text = batch.text.to(device)
        labels = batch.label.to(device)
        
        # Forward pass
        predictions = model(text)
        
        # Calculate loss
        loss = criterion(predictions, labels)
        
        # Backward pass
        loss.backward()
        
        # Update parameters
        optimizer.step()
        
        # Accumulate loss
        epoch_loss += loss.item()
    
    return epoch_loss / len(iterator)


def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    correct_predictions = 0
    total_samples = 0
    
    with torch.no_grad():
        for batch in iterator:
            # Get text and labels, move to device
            text = batch.text.to(device)
            labels = batch.label.to(device)
            
            # Forward pass
            predictions = model(text)
            
            # Calculate loss
            loss = criterion(predictions, labels)
            epoch_loss += loss.item()
            
            # Calculate accuracy
            predicted_labels = predictions.argmax(dim=1)
            correct_predictions += (predicted_labels == labels).sum().item()
            total_samples += labels.size(0)
    
    accuracy = correct_predictions / total_samples
    avg_loss = epoch_loss / len(iterator)
    
    return avg_loss, accuracy



* build a RNN model for sentiment analysis in the cell below.
We have provided several hyperparameters we needed for building the model, including vocabulary size (vocab_size), the word embedding dimension (embedding_dim), the hidden layer dimension (hidden_dim), the number of layers (num_layers) and the number of sentence labels (label_size). Please fill in the missing codes, and implement a RNN model.
### (40 points)

In [33]:
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, label_size, padding_idx):
        super(RNNClassifier, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.label_size = label_size
        self.num_layers = 1

        # add the layers required for sentiment analysis.
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim, padding_idx=padding_idx)
        self.rnn = nn.RNN(
            input_size=self.embedding_dim,
            hidden_size=self.hidden_dim,
            num_layers=self.num_layers,
            batch_first=True,
        )
        self.fc = nn.Linear(self.hidden_dim, self.label_size)

    def zero_state(self, batch_size):
        # return initial hidden state on the correct device
        return torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=device)

    def forward(self, text):
        # text: [batch_size, seq_len]
        embedding = self.embedding(text)
        hidden = self.zero_state(text.size(0))
        outputs, hidden = self.rnn(embedding, hidden)
        final_hidden = hidden[-1]
        logits = self.fc(final_hidden)
        return logits

* train the model and compute the accuracy in the cell below.
### (20 points)

In [34]:
# train baseline RNN model and report accuracies
model = RNNClassifier(vocab_size, embedding_dim, hidden_dim, label_size, padding_idx).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 5
best_val_acc = 0.0
best_state = None

for epoch in range(num_epochs):
    train_loss = train(model, train_iter, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_iter, criterion)
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state = copy.deepcopy(model.state_dict())
    print(f"Epoch {epoch+1}/{num_epochs} | train loss: {train_loss:.4f} | val loss: {val_loss:.4f} | val acc: {val_acc:.4f}")

# load best validation model before testing
if best_state is not None:
    model.load_state_dict(best_state)

test_loss, test_acc = evaluate(model, test_iter, criterion)
print(f"Test loss: {test_loss:.4f} | Test acc: {test_acc:.4f}")


Epoch 1/5 | train loss: 1.0555 | val loss: 1.1291 | val acc: 0.3933
Epoch 2/5 | train loss: 1.0482 | val loss: 1.0757 | val acc: 0.3960
Epoch 3/5 | train loss: 1.0484 | val loss: 1.1943 | val acc: 0.2643
Epoch 4/5 | train loss: 1.0460 | val loss: 1.2473 | val acc: 0.2988
Epoch 5/5 | train loss: 1.0463 | val loss: 1.1192 | val acc: 0.3170
Test loss: 1.0917 | Test acc: 0.4059


* try to train a model with better accuracy in the cell below. For example, you can use different optimizers such as SGD and Adam. You can also compare different hyperparameters and model size.
### (15 points), to obtain FULL point in this problem, the accuracy needs to be higher than 70%

In [35]:
class AttentiveBiLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, label_size, padding_idx, num_layers=2, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        self.embed_drop = nn.Dropout(0.2)
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True,
        )
        lstm_out_dim = hidden_dim * 2
        self.attn = nn.Linear(lstm_out_dim, 1, bias=False)
        self.drop = nn.Dropout(dropout)
        # FC input is 3x: last_hidden + attention + max_pooling
        self.fc1 = nn.Linear(lstm_out_dim * 3, lstm_out_dim)
        self.fc2 = nn.Linear(lstm_out_dim, label_size)

    def forward(self, text):
        embedded = self.embed_drop(self.embedding(text))
        outputs, (hidden, cell) = self.lstm(embedded)
        
        # Get last hidden state (forward and backward)
        last_hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)  # [batch, hidden*2]
        
        # Attention pooling
        attn_weights = torch.softmax(self.attn(outputs), dim=1)
        attn_rep = torch.sum(attn_weights * outputs, dim=1)
        
        # Max pooling
        max_rep = torch.max(outputs, dim=1)[0]
        
        # Combine all three: last hidden + attention + max pooling
        rep = torch.cat([last_hidden, attn_rep, max_rep], dim=1)
        
        # Deeper FC layers
        x = self.drop(torch.relu(self.fc1(rep)))
        logits = self.fc2(x)
        return logits

def train_with_clipping(model, iterator, optimizer, criterion, clip=1.0):
    model.train()
    total_loss = 0
    for batch in iterator:
        optimizer.zero_grad()
        text = batch.text.to(device)
        labels = batch.label.to(device)
        preds = model(text)
        loss = criterion(preds, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(iterator)

# Standard cross-entropy loss (removed label smoothing for better learning)
criterion = nn.CrossEntropyLoss()

# Fine-tuned model: Aggressive improvements to push over 70%
# Larger model with triple combination: last hidden + attention + max pooling
model_finetuned = AttentiveBiLSTM(
    vocab_size=vocab_size,
    embedding_dim=500,  # Increased
    hidden_dim=400,      # Increased  
    label_size=label_size,
    padding_idx=padding_idx,
    num_layers=4,        # 4 layers for more capacity
    dropout=0.3,         # Lower dropout for more learning
).to(device)

# Adam optimizer with optimized learning rate
optimizer_ft = optim.Adam(model_finetuned.parameters(), lr=1e-3, weight_decay=1e-5)
# ReduceLROnPlateau scheduler
scheduler_ft = optim.lr_scheduler.ReduceLROnPlateau(optimizer_ft, mode='max', factor=0.5, patience=5, min_lr=1e-6)

num_epochs_ft = 35  # More epochs
patience_ft = 10     # More patience to allow model to converge
best_val_acc_ft = 0.0
best_state_ft = None
patience_counter_ft = 0

for epoch in range(num_epochs_ft):
    train_loss = train_with_clipping(model_finetuned, train_iter, optimizer_ft, criterion, clip=1.0)
    val_loss, val_acc = evaluate(model_finetuned, val_iter, criterion)
    scheduler_ft.step(val_acc)  # ReduceLROnPlateau needs validation metric
    
    if val_acc > best_val_acc_ft:
        best_val_acc_ft = val_acc
        best_state_ft = copy.deepcopy(model_finetuned.state_dict())
        patience_counter_ft = 0
    else:
        patience_counter_ft += 1
    
    current_lr = optimizer_ft.param_groups[0]['lr']
    print(f"Epoch {epoch+1}/{num_epochs_ft} | train loss: {train_loss:.4f} | val loss: {val_loss:.4f} | val acc: {val_acc:.4f} | lr: {current_lr:.6f}")
    
    if patience_counter_ft >= patience_ft:
        print(f"Early stopping at epoch {epoch+1}")
        break

if best_state_ft is not None:
    model_finetuned.load_state_dict(best_state_ft)

test_loss_ft, test_acc_ft = evaluate(model_finetuned, test_iter, criterion)
print("FINE-TUNED MODEL RESULTS:")
print(f"\nFinal accuracy: {test_acc_ft*100:.2f}% (target: >70%)")


Epoch 1/35 | train loss: 1.0262 | val loss: 0.9497 | val acc: 0.5786 | lr: 0.001000
Epoch 2/35 | train loss: 0.8801 | val loss: 0.9375 | val acc: 0.5967 | lr: 0.001000
Epoch 3/35 | train loss: 0.7137 | val loss: 0.9295 | val acc: 0.6394 | lr: 0.001000
Epoch 4/35 | train loss: 0.5616 | val loss: 1.0763 | val acc: 0.6213 | lr: 0.001000
Epoch 5/35 | train loss: 0.4412 | val loss: 1.1159 | val acc: 0.6058 | lr: 0.001000
Epoch 6/35 | train loss: 0.3333 | val loss: 1.4733 | val acc: 0.6067 | lr: 0.001000
Epoch 7/35 | train loss: 0.2552 | val loss: 1.7194 | val acc: 0.5686 | lr: 0.001000
Epoch 8/35 | train loss: 0.1847 | val loss: 2.0243 | val acc: 0.5995 | lr: 0.001000
Epoch 9/35 | train loss: 0.1523 | val loss: 2.0363 | val acc: 0.5985 | lr: 0.000500
Epoch 10/35 | train loss: 0.0826 | val loss: 2.5871 | val acc: 0.5813 | lr: 0.000500
Epoch 11/35 | train loss: 0.0424 | val loss: 2.6602 | val acc: 0.5904 | lr: 0.000500
Epoch 12/35 | train loss: 0.0370 | val loss: 2.8595 | val acc: 0.5949 | lr