## using Bidirectional LSTMs for sentiment analysis

LSTMs, as we know, are more capable of handling longer sequences due to their *memory cell gates*,
which help retain important information from several time steps before and forget irrelevant information even if it was recent. With the exploding and vanishing gradients problem in check, LSTMs
should be able to perform well when processing long movie reviews in our case.

so, we will be using a bidirectional model as it broadens the context window at any time step
for the model to make a more informed decision about the sentiment of the movie review. The RNN
model we looked at [here](./RNN.ipynb) overfitted the dataset during training, so to tackle that,
we will be using *dropouts* as a *regularization* mechanism in our LSTM model.

In [2]:
# requires python3.9
!pip install torch==1.9
!pip install torchtext==0.10
!pip install matplotlib==3.8.3


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import os
import time
import numpy as np
from tqdm import tqdm
from string import punctuation
from collections import Counter
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

torch.use_deterministic_algorithms(True)

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
import random
from torchtext.legacy import datasets
from torchtext.legacy import data

In [5]:
TEXT_FIELD = data.Field(tokenize = data.get_tokenizer("basic_english"), include_lengths = True)
LABEL_FIELD = data.LabelField(dtype = torch.float)

train_dataset, test_dataset = datasets.IMDB.splits(TEXT_FIELD, LABEL_FIELD)
train_dataset, valid_dataset = train_dataset.split(random_state = random.seed(123))

In [6]:
MAX_VOCABULARY_SIZE = 25000

TEXT_FIELD.build_vocab(train_dataset, 
                 max_size = MAX_VOCABULARY_SIZE)

LABEL_FIELD.build_vocab(train_dataset)

In [7]:
B_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_data_iterator, valid_data_iterator, test_data_iterator = data.BucketIterator.splits(
    (train_dataset, valid_dataset, test_dataset), 
    batch_size = B_SIZE,
    sort_within_batch = True,
    device = device)

#### instantiating and training LSTM model

In [8]:
## If you are training using GPUs, we need to use the following function for the pack_padded_sequence method to work 
## (reference : https://discuss.pytorch.org/t/error-with-lengths-in-pack-padded-sequence/35517/3)
if torch.cuda.is_available():
    torch.set_default_tensor_type(torch.cuda.FloatTensor)
from torch.nn.utils.rnn import pack_padded_sequence, PackedSequence

def cuda_pack_padded_sequence(input, lengths, batch_first=False, enforce_sorted=True):
    lengths = torch.as_tensor(lengths, dtype=torch.int64)
    lengths = lengths.cpu()
    if enforce_sorted:
        sorted_indices = None
    else:
        lengths, sorted_indices = torch.sort(lengths, descending=True)
        sorted_indices = sorted_indices.to(input.device)
    batch_dim = 0 if batch_first else 1
    input = input.index_select(batch_dim, sorted_indices)

    data, batch_sizes = \
    torch._C._VariableFunctions._pack_padded_sequence(input, lengths, batch_first)
    return PackedSequence(data, batch_sizes, sorted_indices)

In [9]:
class LSTM(nn.Module):
    def __init__(self, vocabulary_size, embedding_dimension, hidden_dimension, output_dimension, dropout, pad_index):
        super().__init__()
        self.embedding_layer = nn.Embedding(vocabulary_size, embedding_dimension, padding_idx = pad_index)
        self.lstm_layer = nn.LSTM(embedding_dimension, 
                           hidden_dimension, 
                           num_layers=1, 
                           bidirectional=True, 
                           dropout=dropout)
        self.fc_layer = nn.Linear(hidden_dimension * 2, output_dimension) # note that we are using a bidirectional LSTM
        self.dropout_layer = nn.Dropout(dropout)
        
    def forward(self, sequence, sequence_lengths=None):
        if sequence_lengths is None:
            sequence_lengths = torch.LongTensor([len(sequence)])
        
        # sequence := (sequence_length, batch_size)
        embedded_output = self.dropout_layer(self.embedding_layer(sequence))
        
        
        # embedded_output := (sequence_length, batch_size, embedding_dimension)
        if torch.cuda.is_available():
            packed_embedded_output = cuda_pack_padded_sequence(embedded_output, sequence_lengths)
        else:
            packed_embedded_output = nn.utils.rnn.pack_padded_sequence(embedded_output, sequence_lengths)
        
        packed_output, (hidden_state, cell_state) = self.lstm_layer(packed_embedded_output)
        # hidden_state := (num_layers * num_directions, batch_size, hidden_dimension)
        # cell_state := (num_layers * num_directions, batch_size, hidden_dimension)
        
        op, op_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)
        # op := (sequence_length, batch_size, hidden_dimension * num_directions)
        
        hidden_output = torch.cat((hidden_state[-2,:,:], hidden_state[-1,:,:]), dim = 1)        
        # hidden_output := (batch_size, hidden_dimension * num_directions)
        
        return self.fc_layer(hidden_output)

    
INPUT_DIMENSION = len(TEXT_FIELD.vocab)
EMBEDDING_DIMENSION = 100
HIDDEN_DIMENSION = 32
OUTPUT_DIMENSION = 1
DROPOUT = 0.5
PAD_INDEX = TEXT_FIELD.vocab.stoi[TEXT_FIELD.pad_token]

lstm_model = LSTM(INPUT_DIMENSION, 
            EMBEDDING_DIMENSION, 
            HIDDEN_DIMENSION, 
            OUTPUT_DIMENSION, 
            DROPOUT, 
            PAD_INDEX)



In [10]:
UNK_INDEX = TEXT_FIELD.vocab.stoi[TEXT_FIELD.unk_token]

lstm_model.embedding_layer.weight.data[UNK_INDEX] = torch.zeros(EMBEDDING_DIMENSION)
lstm_model.embedding_layer.weight.data[PAD_INDEX] = torch.zeros(EMBEDDING_DIMENSION)

In [11]:
optim = torch.optim.Adam(lstm_model.parameters())
loss_func = nn.BCEWithLogitsLoss()

lstm_model = lstm_model.to(device)
loss_func = loss_func.to(device)

In [12]:
def accuracy_metric(predictions, ground_truth):
    predictions = torch.round(torch.sigmoid(predictions))
    correct_predictions = (predictions == ground_truth).float()
    accuracy = correct_predictions.sum() / len(correct_predictions)
    return accuracy

In [13]:
def train(model, data_iterator, optim, loss_func):
    loss = 0
    accuracy = 0
    model.train()

    for curr_batch in data_iterator:
        optim.zero_grad()
        sequence, sequence_lengths = curr_batch.text
        preds = lstm_model(sequence, sequence_lengths).squeeze(1)

        loss_curr = loss_func(preds, curr_batch.label)
        accuracy_curr = accuracy_metric(preds, curr_batch.label)

        loss_curr.backward()
        optim.step()

        loss += loss_curr.item()
        accuracy += accuracy_curr.item()
    
    return loss / len(data_iterator), accuracy / len(data_iterator)

In [14]:
def validate(model, data_iterator, loss_func):
    loss = 0
    accuracy = 0
    model.eval()

    with torch.no_grad():
        for curr_batch in data_iterator:
            sequence, sequence_lengths = curr_batch.text
            preds = lstm_model(sequence, sequence_lengths).squeeze(1)

            loss_curr = loss_func(preds, curr_batch.label)
            accuracy_curr = accuracy_metric(preds, curr_batch.label)

            loss += loss_curr.item()
            accuracy += accuracy_curr.item()

    return loss / len(data_iterator), accuracy / len(data_iterator) 

In [None]:
num_epochs = 10
best_validation_loss = float('inf')

for ep in range(num_epochs):
    time_start = time.time()

    training_loss, train_accuracy = train(lstm_model, train_data_iterator, optim, loss_func)
    validation_loss, validation_accuracy = validate(lstm_model, valid_data_iterator, loss_func)

    time_end = time.time()
    time_delta = time_end - time_start

    if validation_loss < best_validation_loss:
        best_validation_loss = validation_loss
        torch.save(lstm_model.state_dict(), 'lstm_model.pt')
    
    print(f'Epoch Number: {ep+1} | Time Elapsed: {time_delta}s')
    print(f'Training Loss: {training_loss:.3f} | Training Accuracy: {train_accuracy*100:.2f}%')
    print(f'validation loss: {validation_loss:.3f} |  validation accuracy: {validation_accuracy*100:.2f}%')
    print(50*"-")

In [None]:
# load the best-performing model and evaluate on test-set
lstm_model.load_state_dict(torch.load('./lstm_model.pt'))

test_loss, test_accuracy = validate(lstm_model, test_data_iterator, loss_func)

print(f'Test Loss: {test_loss:.3f} | Test Accuracy: {test_accuracy*100:.2f}%')

In [None]:
def sentiment_inference(model, sentence):
    model.eval()

    # text transformation
    tokenized = data.get_tokenizer('basic_english')(sentence)
    tokenized = [TEXT_FIELD.vocab.stoi[t] for t in tokenized]

    # model inference
    model_input = torch.LongTensor(tokenized).to(device)
    model_input = model_input.unsqueeze(1)

    pred = torch.sigmoid(model(model_input))

    return pred.item()

In [None]:
print(sentiment_inference(lstm_model, "This film is horrible"))
print(sentiment_inference(lstm_model, "Director tried too hard but this film is bad"))
print(sentiment_inference(lstm_model, "This film will be houseful for weeks"))
print(sentiment_inference(lstm_model, "I just really loved the movie"))