This project aims to demanstrate how to do sentiment analysis using RNN, specifically LSTM, on the IMDB dataset. The IMDB dataset is a collection of 50,000 movie reviews that are labeled as either positive or negative.


<b>Best practice</b><br>
<br>
LSTM is often considered the default choice for RNN models in practice due to its ability to effectively capture long-term dependencies in sequential data while mitigating the vanishing gradient problem. However, GRUs are also commonly used depending on the specific task and dataset characteristics. The choice between LSTM and GRU depends on the following factors:

- <b>Model complexity</b>: LSTMs typically have more parameters than GRUs due to their additional gating mechanisms. If you have limited computational resources or are working with smaller datasets, GRUs may be more suitable due to their simpler architecture.</b>
- <b>Training speed</b>: GRUs are generally faster to train than LSTMs. If training time is a concern, GRUs might be a better choice.
- <b>Performance</b>: LSTMs tend to have better performance on tasks that require modeling long-term dependencies in sequential data. If your task involves capturing complex temporal patterns and you’re concerned about overfitting, LSTMs might be preferable.


#### Preprocessing the data


In [1]:
import pandas as pd

data_path = "movie_data.csv"
data: pd.DataFrame = pd.read_csv(data_path)
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 2 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   review     50000 non-null  object
 1   sentiment  50000 non-null  int64 
dtypes: int64(1), object(1)
memory usage: 781.4+ KB


In [2]:
from sklearn.model_selection import train_test_split

# Split the data into training and test sets
train_data, test_data = train_test_split(data, test_size=0.3, 
                                         random_state=42, stratify=data['sentiment'])

# Split the train data into training and validation sets
train_data, val_data = train_test_split(train_data, test_size=5000, 
                                         random_state=42, stratify=train_data['sentiment'])
train_data.head()

Unnamed: 0,review,sentiment
8666,"A wonderful film ahead of its time,<br /><br /...",1
2880,I saw this at the BendFilm Festival Friday ami...,1
29940,"When 'My Deja Vu, My Deja Vu' aired last seaso...",0
45022,I do not expect this film to be well understoo...,1
46498,First off let me say that this has to be on th...,0


In [3]:
# Explore the vocabulary within the training set
import re
from collections import Counter

# define a tokenization function to remove special characters and split text into words
def tokenize(text: str) -> list[str]:
    # remove HTML tags
    text = re.sub(r'<[^>]*>', '', text) 
    # find emoticons 
    emoticons: list[str] = re.findall(r'(?::|;|=)(?:-)?(?:\)|\(|D|P)', text.lower())  
    # remove non-word characters and move emoticons to the end of the string  
    text: str = re.sub(r'[\W]+', ' ', text.lower()) + ' '.join(emoticons).replace('-', '')

    return text.split()


# create a Counter object to count the frequency of each word in the training set
token_counts = Counter()
for text in train_data['review']:
    token_counts.update(tokenize(text))

print(f"Vocabulary size: {len(token_counts)}")
print(f"Labels: {Counter(train_data['sentiment'])}")

Vocabulary size: 84179
Labels: Counter({1: 15000, 0: 15000})


In [4]:
"""We will feed the word tokens into an embedding layer, nn.Embedding. The embedding layer requires integer input because 
it’s specifically designed to handle discrete categorical data, such as word indices, and transform them into continuous representations 
that a neural network can work with. Therefore, we need to first encode each token into a unique integer / index:"""


# Define a function to build the vocabulary mapping from tokens to indices and vice versa.
def build_vocabulary(token_counts: Counter, 
                     min_freq: int=1, 
                     specials: list[str]=['<pad>', '<unk>']) -> tuple[dict[str, int], dict[int, str]]:
    """
    Build a vocabulary mapping from tokens to indices
    
    Args:
        token_counts: Counter object with word frequencies
        min_freq: Minimum frequency required to include a token
        specials: List of special tokens to add
        
    Returns:
        word_to_idx: Dictionary mapping tokens to indices
        idx_to_word: Dictionary mapping indices to tokens
    """

    # Start with special tokens
    word_to_idx: dict[str, int] = {token: idx for idx, token in enumerate(specials)}
    # Sort word tokens by frequency (descending) 
    token_counts_sorted: list[tuple[str, int]] = sorted(token_counts.items(), 
                                                        key=lambda x: x[1], reverse=True)
    
    # Add tokens that meet minimum frequency
    for word, count in token_counts_sorted:
        if count >= min_freq and word not in word_to_idx:
            word_to_idx[word] = len(word_to_idx)  # new index starts with 2.
    
    # Create reverse mapping
    idx_to_word: dict[int, str] = {idx: word for word, idx in word_to_idx.items()}
    
    return word_to_idx, idx_to_word


# Build vocabulary
word_to_idx, idx_to_word = build_vocabulary(
    token_counts, 
    min_freq=1, 
    specials=['<pad>', '<unk>']
    )
print(f"Vocabulary size: {len(word_to_idx)}")
print([word_to_idx[word] for word in ['this', 'is', 'an', 'example']])

Vocabulary size: 84181
[11, 7, 35, 480]


<b>Best practice</b><br>
<br>
Using special tokens like "pad" and "unk" in RNNs is a common practice for handling variable-length sequences and out-of-vocabulary words. Here are some best practices for their usage:

- Use "pad" tokens to pad sequences to a fixed length. This ensures that all input sequences have the same length, which is necessary for efficient batch processing in neural networks. Pad sequences at the end rather than the beginning to preserve the order of the input data. When tokenizing text data, assign a unique integer index to the "pad" tokens and ensure that it corresponds to a vector of zeros in the embedding matrix.
- Use "unk" tokens to represent out-of-vocabulary words that are not present in the vocabulary of the model. During inference, replace any words that are not present in the vocabulary with the "unk" tokens to ensure that the model can process the input.
- Exclude "pad" tokens from contributing to the loss during training to avoid skewing the learning process.
- Monitor the distribution of "unk" tokens in the dataset to assess the prevalence of out-of-vocabulary words and adjust the vocabulary size accordingly.


#### Create custom Dataset and DataLoader


In [5]:
from torch.utils.data import DataLoader, Dataset
import torch
from torch import Tensor

# Create a custom dataset class for the movie reviews
class MovieReviewDataset(Dataset):
    """
    Custom dataset for movie reviews
    """
    def __init__(self, data: pd.DataFrame, word_to_idx: dict[str, int]) -> None:
        self.data: pd.DataFrame = data
        self.word_to_idx: dict[str, int] = word_to_idx

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx: int) -> tuple[Tensor, Tensor, Tensor]:
        review: str = self.data.iloc[idx]['review']
        sentiment: int = self.data.iloc[idx]['sentiment']
        
        # Tokenize and convert to indices
        tokens: list[str] = tokenize(review)
        indices: list[int] = [self.word_to_idx.get(token, self.word_to_idx['<unk>']) for token in tokens]
        length: int = len(indices)
        
        return torch.tensor(indices), torch.tensor(sentiment), torch.tensor(length)


# Define the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define a custom collate function to handle variable-length sequences
def collate_fn(batch: list[tuple[Tensor, Tensor, Tensor]]) -> tuple[Tensor, Tensor, Tensor]:
    """
    Custom collate function to pad sequences and create batches
    """
    # Unzip the batch into separate lists
    reviews, sentiments, lengths = zip(*batch)
    
    # Pad sequences to the maximum length in the batch
    padded_reviews: Tensor = torch.nn.utils.rnn.pad_sequence(reviews, # reviews is a list of tensors
                                                             batch_first=True, 
                                                             padding_value=word_to_idx['<pad>'])
    # stack sentiments into a single tensor
    sentiments: Tensor = torch.stack(sentiments)
    # stack lengths into a single tensor
    lengths: Tensor = torch.stack(lengths)
    
    return padded_reviews.to(device), sentiments.to(device), lengths.to(device)


# Create DataLoader for training set
torch.manual_seed(42) 
batch_size: int = 4
train_dataset: MovieReviewDataset = MovieReviewDataset(train_data, word_to_idx)
train_loader: DataLoader = DataLoader(train_dataset, 
                                      batch_size=batch_size, 
                                      shuffle=True, 
                                      collate_fn=collate_fn)

# Check the first batch
reviews, sentiments, lengths = next(iter(train_loader))
print(reviews)
print(sentiments)
print(lengths)
print(reviews.shape)  # (batch_size, max_length)

tensor([[   10,   138,    29,  ...,     0,     0,     0],
        [ 4690,  3821,     7,  ...,     2, 18444,  1046],
        [   11,    18, 37583,  ...,     0,     0,     0],
        [   11,  1158,  3396,  ...,     0,     0,     0]], device='cuda:0')
tensor([0, 1, 0, 1], device='cuda:0')
tensor([319, 352, 170, 221], device='cuda:0')
torch.Size([4, 352])


In [6]:
# Finalize the DataSet and DataLoader for training, validation, and test sets
batch_size: int = 32
train_loader: DataLoader = DataLoader(train_dataset,
                                      batch_size=batch_size, 
                                      shuffle=True, 
                                      collate_fn=collate_fn)
val_dataset: MovieReviewDataset = MovieReviewDataset(val_data, word_to_idx)
val_loader: DataLoader = DataLoader(val_dataset, 
                                     batch_size=batch_size, 
                                     shuffle=False, 
                                     collate_fn=collate_fn)
test_dataset: MovieReviewDataset = MovieReviewDataset(test_data, word_to_idx)
test_loader: DataLoader = DataLoader(test_dataset, 
                                      batch_size=batch_size, 
                                      shuffle=False, 
                                      collate_fn=collate_fn)

#### Build a LSTM network


In [7]:
from torch import Tensor
import torch.nn as nn
from torch.nn.utils.rnn import PackedSequence

# 1, Define the network hyperparameters
vocab_size: int = len(word_to_idx)
embedding_dim: int = 128
hidden_dim: int = 64
fc_hidden_dim: int = 32

# 2, Define the model
class SentimentLSTM(nn.Module):
    """
    A simple RNN model for sentiment analysis
    """

    def __init__(self, vocab_size: int, embedding_dim: int, hidden_dim: int, fc_hidden_dim: int) -> None:
        super(SentimentLSTM, self).__init__()
        self.embedding = nn.Embedding(    # convert input word indices into dense word embeddings.
            vocab_size, 
            embedding_dim,
            padding_idx=word_to_idx['<pad>']) # pad_idx=0 indicates padding tokens will be ignored during embedding.
        self.rnn = nn.LSTM(input_size=embedding_dim, 
                           hidden_size=hidden_dim, 
                           num_layers=2,   # use stacked LSTM layers
                           dropout=0.4,  # dropout between LSTM layers
                           batch_first=True)  # means that the input has a batch size as the first dimension.
        
        # add a dropout layer to prevent overfitting
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(hidden_dim, fc_hidden_dim)
        self.relu = nn.ReLU()
        # The output layer is a single neuron with sigmoid activation for binary classification.
        self.fc2 = nn.Linear(fc_hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()


    def forward(self, x: torch.Tensor, lengths: torch.Tensor) -> torch.Tensor:
        # Embedding layer
        x = self.embedding(x)
        
        # Pack the sequences because the LSTM layer requires packed sequences.
        # The packed sequence is a more efficient representation of the input data.
        packed_x: PackedSequence = nn.utils.rnn.pack_padded_sequence(x, 
                                                                     lengths.cpu().numpy(), # should be a 1D CPU int64 tensor
                                                                     batch_first=True, 
                                                                     enforce_sorted=False)
        # RNN layer
        packed_output, _ = self.rnn(packed_x)
        
        # Unpack the sequences because the LSTM layer returns a packed sequence.
        # the unpacked output is a tensor of shape (batch_size, max_length, hidden_dim).
        output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, 
                                                     batch_first=True)
        
        # Get the last hidden state
        # This final hidden state vector represents the LSTM's "understanding" of the entire sequence and 
        # serves as the feature vector for subsequent classification layers.
        last_hidden_state: Tensor = output[torch.arange(output.size(0)), lengths - 1]
        
        # Apply dropout to the last hidden state
        last_hidden_state = self.dropout(last_hidden_state)

        # Fully connected layers
        x = torch.relu(self.fc1(last_hidden_state))
        x = self.fc2(x)
        # Sigmoid activation
        x = self.sigmoid(x)
        
        return x.squeeze()  # Remove the extra dimension
    
# 3, Initialize the model
model: SentimentLSTM = SentimentLSTM(vocab_size, embedding_dim, 
                                     hidden_dim, fc_hidden_dim).to(device)

In [8]:
# 4, Define the loss function and optimizer
loss_fn = nn.BCELoss()  # Binary Cross-Entropy Loss for binary classification
optimizer = torch.optim.AdamW(model.parameters(), 
                              lr=0.001, weight_decay=0.01)  # AdamW optimizer

In [9]:
# 5, Define a function to evaluate the model
def evaluate_model(model: nn.Module, test_loader: DataLoader) -> tuple[float, float]:
    """
    Evaluate the model on the test set
    """
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    total_accuracy = 0

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for batch in test_loader:
            reviews, sentiments, lengths = batch
            
            # Forward pass
            outputs = model(reviews, lengths)
            
            # Compute loss
            loss = loss_fn(outputs, sentiments.float())
            
            # Calculate the total loss for the batch
            total_loss += loss.item() * sentiments.size(0)
            # Calculate accuracy
            preds = (outputs >= 0.5).float()
            correct = (preds == sentiments.float()).float().sum().item()
            total_accuracy += correct

    # Compute average loss and accuracy for the test set
    avg_loss: float = total_loss / len(test_loader.dataset)
    avg_accuracy: float = total_accuracy / len(test_loader.dataset)
    
    return avg_loss, avg_accuracy


# 6, Define a function to train the model
def train_model(model: nn.Module, 
                train_loader: DataLoader, 
                loss_fn: nn.Module, 
                optimizer: torch.optim.Optimizer, 
                num_epochs: int) -> None:
    """
    Train the model
    """
    
    best_loss = float('inf') # Initialize best loss to infinity for early stopping
    patience = 4  # Number of epochs with no improvement after which training will be stopped
    counter = 0  # Counter for early stopping

    for epoch in range(num_epochs):
        model.train()  # Set the model to training mode
        total_loss = 0
        total_accuracy = 0
        for batch in train_loader:
            reviews, sentiments, lengths = batch
            
            # Forward pass
            outputs = model(reviews, lengths)  # Get the output from the model
            
            # Compute loss
            loss = loss_fn(outputs, sentiments.float())
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Calculate the total loss for the batch
            total_loss += loss.item() * sentiments.size(0)  # Multiply by batch size to get the total loss for the batch
            # Calculate accuracy
            preds = (outputs >= 0.5).float()
            correct = (preds == sentiments.float()).float().sum().item()
            total_accuracy += correct  # Sum the correct predictions for the batch

        # Compute average loss and accuracy for the epoch
        avg_loss_train: float = total_loss / len(train_loader.dataset)
        avg_accu_train: float = total_accuracy / len(train_loader.dataset)

        # Evaluate the model on the validation set for each epoch
        avg_loss_val, avg_accu_val = evaluate_model(model, val_loader)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Training loss: {avg_loss_train:.4f}, Training accu: {avg_accu_train * 100:.2f}%," 
              f" Val loss: {avg_loss_val:.4f}, Val accu: {avg_accu_val * 100:.2f}%")

        # Apply early stopping if the loss does not improve for 3 epochs on the validation set
        if avg_loss_val < best_loss:
            best_loss: float = avg_loss_val
            # Save the model if it improves
            torch.save(model.state_dict(), 'best_SentimentLSTM.pth')
            counter = 0
        else:
            counter += 1
            if counter >= patience:
                print("Early stopping")
                break

In [10]:
# 7, Train the model
num_epochs: int = 20
train_model(model, train_loader, loss_fn, optimizer, num_epochs)

Epoch [1/20], Training loss: 0.5831, Training accu: 68.67%, Val loss: 0.5144, Val accu: 74.94%
Epoch [2/20], Training loss: 0.4273, Training accu: 81.22%, Val loss: 0.4117, Val accu: 81.02%
Epoch [3/20], Training loss: 0.3068, Training accu: 87.89%, Val loss: 0.3027, Val accu: 87.62%
Epoch [4/20], Training loss: 0.2041, Training accu: 92.55%, Val loss: 0.2916, Val accu: 88.72%
Epoch [5/20], Training loss: 0.1396, Training accu: 95.31%, Val loss: 0.3201, Val accu: 88.10%
Epoch [6/20], Training loss: 0.0967, Training accu: 96.86%, Val loss: 0.5235, Val accu: 84.32%
Epoch [7/20], Training loss: 0.0703, Training accu: 97.87%, Val loss: 0.3884, Val accu: 89.22%
Epoch [8/20], Training loss: 0.0486, Training accu: 98.62%, Val loss: 0.4086, Val accu: 88.96%
Early stopping


In [11]:
# 8, Finally, evaluate the performance on the test set:
# Load the best model for evaluation
model.load_state_dict(torch.load('best_SentimentLSTM.pth', weights_only=True))

# Evaluate the model on the test set
test_loss, test_accuracy = evaluate_model(model, test_loader)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy * 100:.2f}%")

Test Loss: 0.2845, Test Accuracy: 88.80%
