[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/mravanba/comp551-notebooks/blob/master/RNN.ipynb)

# Recurrent Neural Networks (RNNs) for Text

Our goal is to implement a Recurrent Neural Network (RNN) for sentiment analysis using PyTorch. We'll build a simple text classifier that predicts whether a movie review is positive or negative. To make training faster on CPU, we'll use a limited vocabulary and shorter sequences.

RNNs are designed to process sequential data by maintaining a hidden state that captures information from previous time steps. Unlike feedforward networks that process each input independently, RNNs:
- **Process sequences step-by-step**: Handle variable-length inputs (e.g., sentences of different lengths)
- **Maintain hidden state**: Carry information forward through the sequence
- **Share parameters across time**: Use the same weights at each time step, making them efficient for sequential data

For text data, RNNs can learn patterns in word sequences to understand context and meaning, making them suitable for tasks like sentiment analysis, language modeling, and machine translation.

In [104]:
import numpy as np
import matplotlib.pyplot as plt
import warnings
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# PyTorch imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# For text processing
from collections import Counter
import re

# Set random seeds for reproducibility
# This ensures results are consistent across runs
np.random.seed(42)
torch.manual_seed(42)

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

Using device: cpu


## Creating a Simple Sentiment Dataset

For this tutorial, we'll create a simple movie review dataset for demonstration. Our toy dataset will have:
- **Positive reviews**: Reviews with positive sentiment
- **Negative reviews**: Reviews with negative sentiment
- **Binary classification**: Predict 0 (negative) or 1 (positive)

We'll process the text by:
1. Converting to lowercase
2. Building a vocabulary from the training data
3. Converting words to integer indices
4. Padding sequences to a fixed length

In [105]:
# Simple movie review dataset
# In practice, you would load this from a file or use a real dataset
train_texts = [
    # Positive reviews
    "this movie is great and fantastic",
    "i loved this film it was amazing",
    "excellent movie with great acting",
    "wonderful story and beautiful cinematography",
    "best movie i have seen this year",
    "brilliant film highly recommend it",
    "amazing performance by the actors",
    "loved every minute of this movie",
    "fantastic plot and great direction",
    "this is an excellent film",
    "superb acting and wonderful story",
    "incredible movie worth watching",
    "great entertainment and amazing visuals",
    "perfect movie for the weekend",
    "outstanding performance and direction",
    "beautiful film with touching story",
    "impressive acting and great script",
    "enjoyed this movie very much",
    "brilliant direction and excellent cast",
    "wonderful experience watching this film",
    
    # Negative reviews
    "this movie is terrible and boring",
    "i hated this film it was awful",
    "horrible movie with bad acting",
    "terrible story and poor direction",
    "worst movie i have seen this year",
    "disappointing film waste of time",
    "awful performance by the actors",
    "disliked every minute of this movie",
    "poor plot and bad direction",
    "this is a terrible film",
    "bad acting and horrible story",
    "dreadful movie not worth watching",
    "boring entertainment and poor visuals",
    "waste of time and money",
    "disappointing performance and direction",
    "awful film with weak story",
    "poor acting and bad script",
    "did not enjoy this movie",
    "terrible direction and weak cast",
    "unpleasant experience watching this film",
]

# Labels: 1 for positive (first 20), 0 for negative (last 20)
train_labels = [1] * 20 + [0] * 20

# Test set (similar structure)
test_texts = [
    "wonderful movie highly recommended",
    "great film with amazing story",
    "excellent acting and direction",
    "loved this amazing film",
    "fantastic movie worth watching",
    "terrible movie very disappointing",
    "awful film waste of time",
    "horrible acting and poor story",
    "hated this terrible movie",
    "bad film not recommended",
]

test_labels = [1, 1, 1, 1, 1, 0, 0, 0, 0, 0]

print(f'Training samples: {len(train_texts)}')
print(f'Test samples: {len(test_texts)}')
print(f'\nExample positive review: "{train_texts[0]}"')
print(f'Example negative review: "{train_texts[20]}"')

Training samples: 40
Test samples: 10

Example positive review: "this movie is great and fantastic"
Example negative review: "this movie is terrible and boring"


## Text Preprocessing and Vocabulary Building

To feed text into a neural network, we need to convert words to numbers:

1. **Tokenization**: Split text into individual words
2. **Vocabulary**: Create a mapping from words to unique integer indices
3. **Numericalization**: Convert each word in a sentence to its index
4. **Padding**: Make all sequences the same length by adding padding tokens

We also add special tokens:
- `<PAD>`: Padding token (index 0) for shorter sequences
- `<UNK>`: Unknown token (index 1) for words not in vocabulary

In [106]:
# Simple tokenization function
def tokenize(text):
    """
    Convert text to lowercase and split into words.
    
    Parameters:
    text: input string
    
    Returns:
    list of tokens (words)
    """
    # Convert to lowercase and remove punctuation
    text = text.lower()
    # Split on whitespace
    tokens = text.split()
    return tokens

# Build vocabulary from training data
def build_vocab(texts, max_vocab_size=1000):
    """
    Build a vocabulary from a list of texts.
    
    Parameters:
    texts: list of text strings
    max_vocab_size: maximum vocabulary size (most frequent words)
    
    Returns:
    word2idx: dictionary mapping words to indices
    idx2word: dictionary mapping indices to words
    """
    # Count word frequencies
    word_counts = Counter()
    for text in texts:
        tokens = tokenize(text)
        word_counts.update(tokens)
    
    # Get most common words
    # Reserve index 0 for padding and index 1 for unknown words
    most_common = word_counts.most_common(max_vocab_size - 2)
    
    # Create word to index mapping
    # Start from index 2 (0 is <PAD>, 1 is <UNK>)
    word2idx = {'<PAD>': 0, '<UNK>': 1}
    for idx, (word, count) in enumerate(most_common, start=2):
        word2idx[word] = idx
    
    # Create index to word mapping (inverse)
    idx2word = {idx: word for word, idx in word2idx.items()}
    
    return word2idx, idx2word

# Build vocabulary from training texts
word2idx, idx2word = build_vocab(train_texts, max_vocab_size=1000)
vocab_size = len(word2idx)

print(f'Vocabulary size: {vocab_size}')
print(f'\nFirst 20 words in vocabulary:')
for i in range(min(20, vocab_size)):
    print(f'{i}: {idx2word[i]}')

Vocabulary size: 76

First 20 words in vocabulary:
0: <PAD>
1: <UNK>
2: and
3: this
4: movie
5: film
6: direction
7: acting
8: story
9: great
10: is
11: i
12: with
13: performance
14: of
15: watching
16: terrible
17: bad
18: poor
19: it


In [107]:
# Convert text to sequence of indices
def text_to_sequence(text, word2idx):
    """
    Convert a text string to a sequence of word indices.
    
    Parameters:
    text: input text string
    word2idx: word to index mapping dictionary
    
    Returns:
    list of word indices
    """
    tokens = tokenize(text)
    # Convert each token to its index
    # Use index 1 (<UNK>) for words not in vocabulary
    sequence = [word2idx.get(token, 1) for token in tokens]
    return sequence

# Pad sequences to fixed length
def pad_sequences(sequences, max_length, pad_value=0):
    """
    Pad sequences to a fixed length.
    
    Parameters:
    sequences: list of sequences (lists of integers)
    max_length: target length for all sequences
    pad_value: value to use for padding (default: 0 for <PAD>)
    
    Returns:
    numpy array of shape (num_sequences, max_length)
    """
    padded = np.zeros((len(sequences), max_length), dtype=np.int64)
    
    for i, seq in enumerate(sequences):
        # Truncate if longer than max_length
        seq = seq[:max_length]
        # Copy sequence to padded array
        padded[i, :len(seq)] = seq
    
    return padded

# Set maximum sequence length (number of words per review)
# Shorter sequences are padded, longer ones are truncated
MAX_LENGTH = 10

# Convert training texts to sequences
train_sequences = [text_to_sequence(text, word2idx) for text in train_texts]
test_sequences = [text_to_sequence(text, word2idx) for text in test_texts]

# Pad sequences
X_train = pad_sequences(train_sequences, MAX_LENGTH)
X_test = pad_sequences(test_sequences, MAX_LENGTH)

# Convert labels to numpy arrays
y_train = np.array(train_labels)
y_test = np.array(test_labels)

print(f'Training data shape: {X_train.shape}')
print(f'Test data shape: {X_test.shape}')
print(f'\nExample sequence (first training sample):')
print(f'Text: "{train_texts[0]}"')
print(f'Indices: {X_train[0]}')
print(f'Label: {y_train[0]} (positive)')

Training data shape: (40, 10)
Test data shape: (10, 10)

Example sequence (first training sample):
Text: "this movie is great and fantastic"
Indices: [ 3  4 10  9  2 25  0  0  0  0]
Label: 1 (positive)


## Creating PyTorch Dataset and DataLoader

We'll create a custom PyTorch Dataset class to handle our text data. The DataLoader will batch our data and shuffle it during training.

In [108]:
class SentimentDataset(Dataset):
    """
    Custom Dataset for sentiment analysis.
    
    This class wraps our preprocessed sequences and labels.
    """
    
    def __init__(self, sequences, labels):
        """
        Initialize the dataset.
        
        Parameters:
        sequences: numpy array of shape (num_samples, max_length)
        labels: numpy array of shape (num_samples,)
        """
        self.sequences = torch.LongTensor(sequences)
        self.labels = torch.LongTensor(labels)
    
    def __len__(self):
        """Return the number of samples in the dataset."""
        return len(self.labels)
    
    def __getitem__(self, idx):
        """
        Get a single sample from the dataset.
        
        Parameters:
        idx: index of the sample to retrieve
        
        Returns:
        tuple of (sequence, label)
        """
        return self.sequences[idx], self.labels[idx]

# Create dataset objects
train_dataset = SentimentDataset(X_train, y_train)
test_dataset = SentimentDataset(X_test, y_test)

# Create data loaders
# batch_size=8: small batch size for CPU-friendly training
# shuffle=True: randomize training order each epoch
trainloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
testloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

print(f'Number of training batches: {len(trainloader)}')
print(f'Number of test batches: {len(testloader)}')

# Examine a batch
sample_batch = next(iter(trainloader))
sequences_batch, labels_batch = sample_batch
print(f'\nBatch shapes:')
print(f'Sequences: {sequences_batch.shape}  # (batch_size, sequence_length)')
print(f'Labels: {labels_batch.shape}  # (batch_size,)')

Number of training batches: 5
Number of test batches: 2

Batch shapes:
Sequences: torch.Size([8, 10])  # (batch_size, sequence_length)
Labels: torch.Size([8])  # (batch_size,)


## Implementing a Vanilla RNN from Scratch

We'll build a simple RNN-based sentiment classifier by implementing the RNN computation manually. This helps us understand exactly how RNNs work internally.

**Architecture:**

1. **Embedding Layer**: Converts word indices to dense vectors (word embeddings)
   - Maps each word index to a learnable vector of size `embedding_dim`
   - Similar words will have similar embeddings after training

2. **RNN Layer** (implemented from scratch): Processes the sequence of word embeddings
   - At each time step t, RNN computes: **h_t = tanh(W_ih @ x_t + W_hh @ h_{t-1} + b)**
   - `h_t`: hidden state at time t (captures information from all previous words)
   - `x_t`: input embedding at time t (from the embedding layer)
   - `W_ih`: weight matrix for input-to-hidden transformation
   - `W_hh`: weight matrix for hidden-to-hidden transformation (recurrent connection)
   - `b`: bias term
   - The hidden state is updated sequentially as we process each word in the sequence

3. **Fully Connected Layer**: Maps final hidden state to class scores
   - Takes the last hidden state (after processing all words)
   - Outputs 2 scores (one for each class: negative/positive)

We'll implement the RNN computation manually using basic PyTorch operations to see how the sequential processing works.

In [109]:
class VanillaRNN(nn.Module):
    """
    Simple RNN for sentiment classification, implemented from scratch.
    
    Architecture: Embedding → RNN (manual implementation) → Fully Connected
    Input: Sequences of word indices (batch_size, seq_length)
    Output: Class scores (batch_size, num_classes)
    """
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        """
        Initialize the RNN layers.
        
        Parameters:
        vocab_size: size of the vocabulary (number of unique words)
        embedding_dim: dimension of word embeddings (e.g., 32, 50)
        hidden_dim: dimension of RNN hidden state
        output_dim: number of output classes (2 for binary sentiment)
        """
        super(VanillaRNN, self).__init__()
        
        self.hidden_dim = hidden_dim
        
        # Embedding layer: converts word indices to dense vectors
        # vocab_size: number of words in vocabulary
        # embedding_dim: size of each embedding vector
        # padding_idx=0: the <PAD> token (index 0) will have zero embedding
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        
        # RNN parameters (implemented manually)
        # Instead of using nn.RNN, we define the weight matrices explicitly
        
        # W_ih: input-to-hidden weight matrix
        # Maps input x_t (embedding_dim) to hidden space (hidden_dim)
        # This linear layer computes W_ih @ x_t + b_ih
        self.input_to_hidden = nn.Linear(embedding_dim, hidden_dim)
        
        # W_hh: hidden-to-hidden weight matrix (recurrent connection)
        # Maps previous hidden state h_{t-1} (hidden_dim) to hidden space (hidden_dim)
        # This linear layer computes W_hh @ h_{t-1} + b_hh
        # Note: We set bias=False here because we already have bias in input_to_hidden
        self.hidden_to_hidden = nn.Linear(hidden_dim, hidden_dim, bias=False)
        
        # Fully connected layer: maps final hidden state to class scores
        # hidden_dim → output_dim (2 classes for binary sentiment)
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        """
        Forward pass through the network.
        
        Parameters:
        x: input sequences, shape (batch_size, seq_length)
           Each element is a word index
        
        Returns:
        out: class scores, shape (batch_size, output_dim)
        """
        # Embedding layer
        # Input: (batch_size, seq_length)
        # Output: (batch_size, seq_length, embedding_dim)
        embedded = self.embedding(x)
        
        batch_size = embedded.size(0)
        seq_length = embedded.size(1)
        
        # Initialize hidden state with zeros
        # Shape: (batch_size, hidden_dim)
        # This is h_0, the initial hidden state before processing any input
        hidden = torch.zeros(batch_size, self.hidden_dim).to(x.device)
        
        # Process sequence step by step (this is the core RNN computation)
        # We manually iterate through each time step to show how RNNs work
        for t in range(seq_length):
            # Get input at current time step
            # x_t has shape: (batch_size, embedding_dim)
            x_t = embedded[:, t, :]
            
            # RNN computation: h_t = tanh(W_ih @ x_t + W_hh @ h_{t-1} + b)
            # This is the fundamental RNN equation!
            
            # Step 1: Transform input through input-to-hidden weights
            # input_to_hidden computes: W_ih @ x_t + b_ih
            # Shape: (batch_size, hidden_dim)
            input_contribution = self.input_to_hidden(x_t)
            
            # Step 2: Transform previous hidden state through hidden-to-hidden weights
            # hidden_to_hidden computes: W_hh @ h_{t-1}
            # Shape: (batch_size, hidden_dim)
            hidden_contribution = self.hidden_to_hidden(hidden)
            
            # Step 3: Combine both contributions and apply tanh activation
            # This creates the new hidden state that incorporates both:
            # - Information from current input (x_t)
            # - Information from previous hidden state (h_{t-1})
            hidden = torch.tanh(input_contribution + hidden_contribution)
            
            # The hidden state now contains information from all words seen so far
            # (from word 0 to word t)
        
        # After processing all time steps, 'hidden' contains the final hidden state
        # This final hidden state has "seen" the entire sequence
        # Shape: (batch_size, hidden_dim)
        
        # Fully connected layer: map final hidden state to class scores
        # Input: (batch_size, hidden_dim)
        # Output: (batch_size, output_dim)
        out = self.fc(hidden)
        
        return out

# Hyperparameters
EMBEDDING_DIM = 32   # Size of word embeddings (smaller for CPU efficiency)
HIDDEN_DIM = 64      # Size of RNN hidden state
OUTPUT_DIM = 2       # Binary classification (negative/positive)

# Instantiate the model and move to device
model = VanillaRNN(vocab_size, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM).to(device)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(model)
print(f'\nTotal parameters: {total_params:,}')
print(f'Trainable parameters: {trainable_params:,}')
print(f'\nRNN weight matrices:')
print(f'W_ih (input-to-hidden): {EMBEDDING_DIM} × {HIDDEN_DIM} = {EMBEDDING_DIM * HIDDEN_DIM:,} parameters')
print(f'W_hh (hidden-to-hidden): {HIDDEN_DIM} × {HIDDEN_DIM} = {HIDDEN_DIM * HIDDEN_DIM:,} parameters')
print(f'Embedding: {vocab_size} × {EMBEDDING_DIM} = {vocab_size * EMBEDDING_DIM:,} parameters')
print(f'Output layer: {HIDDEN_DIM} × {OUTPUT_DIM} = {HIDDEN_DIM * OUTPUT_DIM:,} parameters')

VanillaRNN(
  (embedding): Embedding(76, 32, padding_idx=0)
  (input_to_hidden): Linear(in_features=32, out_features=64, bias=True)
  (hidden_to_hidden): Linear(in_features=64, out_features=64, bias=False)
  (fc): Linear(in_features=64, out_features=2, bias=True)
)

Total parameters: 8,770
Trainable parameters: 8,770

RNN weight matrices:
W_ih (input-to-hidden): 32 × 64 = 2,048 parameters
W_hh (hidden-to-hidden): 64 × 64 = 4,096 parameters
Embedding: 76 × 32 = 2,432 parameters
Output layer: 64 × 2 = 128 parameters


## Training the RNN

The training loop:
1. Forward pass: compute predictions
2. Compute loss: compare predictions to ground truth labels
3. Backward pass: compute gradients via backpropagation through time (BPTT)
4. Update weights: apply optimizer step

In [110]:
# Define loss function
# CrossEntropyLoss expects raw scores (logits) and applies softmax internally
criterion = nn.CrossEntropyLoss()

# Define optimizer
# Adam with slightly lower learning rate for more stable training
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Number of training epochs (reduced since we have more data)
num_epochs = 80

# Lists to store training history
train_losses = []
train_accuracies = []

print('Starting training...')
print(f'Batches per epoch: {len(trainloader)}')
print(f'Model has {total_params:,} parameters\n')

for epoch in range(num_epochs):
    # ===== Training phase =====
    model.train()  # Set model to training mode (enables dropout)
    
    epoch_loss = 0.0
    correct = 0
    total = 0
    
    # Iterate over training batches
    for i, (sequences, labels) in enumerate(trainloader):
        # Move data to device
        sequences = sequences.to(device)
        labels = labels.to(device)
        
        # Zero the gradients from previous iteration
        optimizer.zero_grad()
        
        # Forward pass: get predictions
        outputs = model(sequences)
        
        # Compute loss
        loss = criterion(outputs, labels)
        
        # Backward pass: compute gradients via backpropagation through time (BPTT)
        loss.backward()
        
        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Update weights
        optimizer.step()
        
        # Accumulate statistics
        epoch_loss += loss.item()
        _, predicted = torch.max(outputs, dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    # Calculate average loss and accuracy for this epoch
    avg_loss = epoch_loss / len(trainloader)
    accuracy = 100 * correct / total
    
    # Store history
    train_losses.append(avg_loss)
    train_accuracies.append(accuracy)
    
    # Print progress every 5 epochs for cleaner output
    if (epoch + 1) % 5 == 0 or epoch == 0:
        print(f'Epoch [{epoch+1:2d}/{num_epochs}] - Loss: {avg_loss:.4f}, Acc: {accuracy:5.2f}%')

print('\nTraining complete!')

Starting training...
Batches per epoch: 5
Model has 8,770 parameters

Epoch [ 1/80] - Loss: 0.6977, Acc: 50.00%
Epoch [ 5/80] - Loss: 0.6149, Acc: 77.50%
Epoch [10/80] - Loss: 0.2737, Acc: 82.50%
Epoch [15/80] - Loss: 0.1323, Acc: 92.50%
Epoch [20/80] - Loss: 0.0591, Acc: 97.50%
Epoch [25/80] - Loss: 0.0140, Acc: 100.00%
Epoch [30/80] - Loss: 0.0031, Acc: 100.00%
Epoch [35/80] - Loss: 0.0017, Acc: 100.00%
Epoch [40/80] - Loss: 0.0013, Acc: 100.00%
Epoch [45/80] - Loss: 0.0010, Acc: 100.00%
Epoch [50/80] - Loss: 0.0008, Acc: 100.00%
Epoch [55/80] - Loss: 0.0007, Acc: 100.00%
Epoch [60/80] - Loss: 0.0006, Acc: 100.00%
Epoch [65/80] - Loss: 0.0005, Acc: 100.00%
Epoch [70/80] - Loss: 0.0005, Acc: 100.00%
Epoch [75/80] - Loss: 0.0004, Acc: 100.00%
Epoch [80/80] - Loss: 0.0004, Acc: 100.00%

Training complete!


## Evaluating on Test Set

Now let's evaluate our trained RNN on the test set to see how well it generalizes to unseen movie reviews.

In [112]:
# Evaluate on test set
model.eval()  # Set to evaluation mode

test_correct = 0
test_total = 0
test_loss = 0.0

# Store predictions for detailed analysis
all_predictions = []
all_labels = []

# Disable gradient computation for evaluation (saves memory)
with torch.no_grad():
    for sequences, labels in testloader:
        # Move to device
        sequences = sequences.to(device)
        labels = labels.to(device)
        
        # Get predictions
        outputs = model(sequences)
        
        # Compute loss
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        
        # Get predicted class
        _, predicted = torch.max(outputs, dim=1)
        
        # Store predictions and labels
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        
        # Count correct predictions
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

# Calculate metrics
test_accuracy = 100 * test_correct / test_total
avg_test_loss = test_loss / len(testloader)

print(f'Test Loss: {avg_test_loss:.4f}')
print(f'Test Accuracy: {test_accuracy:.2f}%')
print(f'Correct: {test_correct}/{test_total}')

Test Loss: 1.7080
Test Accuracy: 80.00%
Correct: 8/10
