# Sentiment analysis using LSTM neural network and Pytorch

In [None]:
!pip install torchinfo --quiet
!pip install shap --quiet
!pip install gensim --quiet
import nltk
nltk.download('wordnet')
nltk.download('stopwords')
nltk.download('punkt')

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/540.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m532.5/540.1 kB[0m [31m18.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m540.1/540.1 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25h

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [5]:
import pandas as pd
import json, collections, time, re, string, os,  sys, random, sklearn, shap
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.utils.data import Dataset
from torch import optim
from sklearn.metrics import accuracy_score
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.svm import SVC
from nltk.tokenize import word_tokenize

import torch
import torch.nn as nn
from torchinfo import summary
from sklearn.model_selection import train_test_split


## Load data

In [8]:
df_reviews = pd.read_csv('https://www.dropbox.com/scl/fi/vdncgb0i4uipvnptdhg6h/imdb_reviews.csv?rlkey=n2iy9s7o750l9wodonjm00u41&st=8jrm7kub&dl=1', index_col = 0 )

In [9]:
df_reviews = df_reviews.sample(50000).reset_index(drop=True)
df_reviews['label'] = df_reviews['label'].map({'pos': 1,'neg': 0})
display(df_reviews['label'].value_counts())

print(len(df_reviews))

Unnamed: 0_level_0,count
label,Unnamed: 1_level_1
1,25000
0,25000


50000


In [10]:
label = df_reviews.label
reviews = df_reviews.review
x_train_token, x_test_token, y_train_token, y_test_token = train_test_split(reviews, label, test_size=0.2, train_size=0.8, random_state=42)


## Create vocabulary of words in text

In [None]:
# Tokenization and create vocab
tokenized_text = [word_tokenize(text.lower()) for text in x_train_token]              # Tokenize - List of words for each text
word_counts = Counter([word for text in tokenized_text for word in text])             # Count of each word

vocab = {word: idx + 1 for idx, (word, _) in enumerate(word_counts.most_common())}    # Map word -> index
index_word = {idex: word for word, idex in vocab.items()}                             # Map index -> word
max_len = max([len(word_tokenize(text.lower())) for text in reviews])                 # Find the longest text in the dataset so it can be used for padding
vocab['<PAD>'] = 0

def vectorize_text(tokenized_text, max_len):
  sequence_padded = []
  sequence_length = []

  text_sequences = [[vocab.get(word, 0) for word in text] for text in tokenized_text] # Convert text to sequences of indices

  # Pad sequences to the same length
  for seq in text_sequences:
    sequence_padded.append(seq + [0] * (max_len - len(seq)))                         # Pad with zero all sequences so they have the same length
    sequence_length.append(len(seq))                                                 # Store the non-padded (real) length of each sequence

  # Convert lists to tensors
  sequence_padded = torch.tensor(sequence_padded, dtype=torch.long)
  sequence_length = torch.tensor(sequence_length, dtype=torch.long)
  return sequence_padded, sequence_length

x_train, x_train_len = vectorize_text(tokenized_text, max_len)
x_test, x_test_len = vectorize_text([word_tokenize(text.lower()) for text in x_test_token], max_len)

y_train = torch.tensor(y_train_token.values, dtype=torch.long)
y_test = torch.tensor(y_test_token.values, dtype=torch.long)

In [None]:
class TextDataset(Dataset):                                                       # Extends Pytorch Dataset class
    def __init__(self, data_dict, label_list, x_train_len):
        self.data = data_dict.to(torch.int64)
        self.labels = label_list
        self.length = x_train_len
    def __len__(self): return(len(self.labels))
    def __getitem__(self, idx): return (self.data[idx], self.labels[idx], self.length[idx])

In [None]:
train_dataset = TextDataset(x_train,  y_train, x_train_len)                       # Convert to datatype used by Pytorch
val_dataset = TextDataset(x_test,  y_test, x_test_len)                            # Convert to datatype used by Pytorch

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=val_dataset, batch_size=64, shuffle=False)

## Define model and training algorithm

Define model

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, pad_idx, embed_layer = None):
        super(LSTMClassifier, self).__init__()
        if embed_layer is None: self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) # if pre-trained embedding isnt provided, word embedding is learned
        else : self.embedding = nn.Embedding.from_pretrained(embed_layer, freeze=False)     # if pre-trained embedding has been provided
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text, text_lengths):
        embedded = self.embedding(text)
        text_lengths = text_lengths.to(torch.int64).to('cpu')
        packed_embedded = pack_padded_sequence(embedded, text_lengths, batch_first=True, enforce_sorted=False)
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        output, output_lengths = pad_packed_sequence(packed_output, batch_first=True)
        # Use only the final hidden state for classification
        hidden = hidden[-1, :, :]
        return self.fc(hidden)


Define training

In [None]:
def train_model(model, train_loader, val_loader, N_EPOCHS = 10, device = 'cpu'):
  optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay = 5e-4)
  criterion = nn.BCEWithLogitsLoss()


  for i, epoch in enumerate(range(N_EPOCHS)):
    model.train()
    for batch in train_loader:
        encoded_text = batch[0].to(device)
        lengths = batch[2]
        label = batch[1].to(device)

        predictions = model(encoded_text, lengths).squeeze(1)
        loss = criterion(predictions, label.float())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    model.eval()
    y_hat_all = []
    y_true_all = []

    with torch.no_grad():
        for batch in val_loader:
          encoded_text = batch[0].to(device)
          lengths = batch[2]
          label = batch[1].to(device)
          predictions = model(encoded_text, lengths).squeeze(1)
          y_hat = (torch.sigmoid(predictions)>.5) *1
          #print(accuracy_score(label, y_hat))

          y_hat_all.extend([int(x) for x in y_hat])
          y_true_all.extend([int(x) for x in label])


        print(f"Accuracy for epoch : {i} :", accuracy_score(y_true_all, y_hat_all))



Create model instance, train model and calculate accuracy

In [None]:
# Define hyperparameters
vocab_size = len(vocab)
embedding_dim = 256
hidden_dim = 256
output_dim = 1  # Binary classification, 1 for positive sentiment, 0 for negative sentiment
pad_idx = 0     # Assuming 0 is used for padding index in vocabulary

# Create model instance
model = LSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, pad_idx)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Training on: ", device)

model.to(device)
train_model(model, train_loader, val_loader, N_EPOCHS = 5, device = device)

Training on:  cuda
Accuracy for epoch : 0 : 0.7029
Accuracy for epoch : 1 : 0.8625
Accuracy for epoch : 2 : 0.8781
Accuracy for epoch : 3 : 0.887
Accuracy for epoch : 4 : 0.9019


## Training model using pretrained embedding

In [None]:
import gensim.downloader as api
w2v_model = api.load("glove-twitter-25")

# Initialize blank matrix of  Dim = (size of vocabulary, embedding vector dim)
embedding_matrix = torch.zeros(len(vocab),w2v_model.vector_size)
print(embedding_matrix.shape)

# Embbed the vector of each word into the empty matrix just initialized
for i, token in enumerate(range(len(vocab))):
    embedding_matrix[i] = torch.from_numpy(w2v_model[token])

torch.Size([145822, 25])


  embedding_matrix[i] = torch.from_numpy(w2v_model[token])


In [None]:
pre_trained_model = LSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, pad_idx, embedding_matrix)
pre_trained_model.to(device)
train_model(model, train_loader, val_loader, N_EPOCHS = 5, device = device)


Accuracy for epoch : 0 : 0.9047
Accuracy for epoch : 1 : 0.9036
Accuracy for epoch : 2 : 0.9062
Accuracy for epoch : 3 : 0.9042
Accuracy for epoch : 4 : 0.9057


Using pre-trained embeddings achieves high accuracy while training on a fewer number of epochs