## Sentiment Analysis for Amazon Reviews

The goal is to predict if a Amazon review is of positive or negative sentiment.

Following this blog post https://blog.floydhub.com/long-short-term-memory-from-zero-to-hero-with-pytorch/

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import re 
import nltk
nltk.download('punkt')
from nltk.tokenize import word_tokenize
from collections import Counter

import numpy as np
import torch

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


Helpers

In [3]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
    
    
def to_t(tensor):
    #convert tensor to gpu if available
    return tensor.to(device)

Load and preprocess data

In [178]:
with open('/content/drive/MyDrive/Colab Notebooks/RNNs/lstm/train.ft.txt') as f:
    train_file = f.readlines()
with open('/content/drive/MyDrive/Colab Notebooks/RNNs/lstm/test.ft.txt') as f:
    test_file = f.readlines()

In [179]:
# data set gives 3,600,000 training samples and 400,000 test samples
# we are going to use 300,000 training samples and 50,000 test samples to speed up training
num_train_samples = int(3e5)
num_test_samples = int(5e4)

train_sentences = [sentence.split(' ', 1)[1][:-1].lower() for sentence in train_file[:num_train_samples]]
test_sentences = [sentence.split(' ', 1)[1][:-1].lower() for sentence in test_file[:num_test_samples]]

# 0 if negative sentiment (__label__1), 1 if positive (__label__2)

train_y = np.array([0 if sentence.split(' ')[0] == '__label__1' else 1 for sentence in train_file])
test_y = np.array([0 if sentence.split(' ')[0] == '__label__1' else 1 for sentence in test_file])

In [180]:
train_x = []
test_x = []

vocabulary = Counter()

# tokenize each sentence and add tokens to vocabulary collection.
for i, sentence in enumerate(train_sentences):
    tokenized_sentence = []

    for word in word_tokenize(sentence):
        word = word.lower()
        # update vocabulary dict with word
        vocabulary.update([word])
        tokenized_sentence.append(word)

    train_x.append(tokenized_sentence)

for i, sentence in enumerate(test_sentences):
    tokenized_sentence = []

    for word in word_tokenize(sentence):
        word = word.lower()
        tokenized_sentence.append(word)

    test_x.append(tokenized_sentence)

In [181]:
# mappings for tokens and ids in vocabulary
word_to_id = {word:i for i,word in enumerate(Counter(['PADDING_CONST']) + vocabulary)}
id_to_word = {i:word for i,word in enumerate(Counter(['PADDING_CONST']) + vocabulary)}

In [182]:
# convert each sentence to an array of ids corresponding to each token
for x in (train_x, test_x):
    for i, sentence in enumerate(x):
        x[i] = [word_to_id[token] if token in word_to_id else 0 for token in sentence]

In [183]:
# make all sentences in data set of the same length
# pad shorter sentences with 0s and cut longer sentences

max_sentence_length = 150

adjusted_x = np.zeros((len(train_x), max_sentence_length), dtype=int)
for i, sentence in enumerate(train_x):
    adjusted_x[i][:min(max_sentence_length, len(sentence))] = sentence[:min(max_sentence_length, len(sentence))]
train_x = adjusted_x

adjusted_x = np.zeros((len(test_x), max_sentence_length), dtype=int)
for i, sentence in enumerate(test_x):
    adjusted_x[i][:min(max_sentence_length, len(sentence))] = sentence[:min(max_sentence_length, len(sentence))]
test_x = adjusted_x


In [186]:
train_x[0]

array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14,  4, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 13,  2, 24, 25, 26, 27, 28, 29, 30, 31,
       12, 21, 32, 33,  4, 30, 34, 35, 36, 37, 38, 39, 38,  4, 40, 21, 32,
       41, 33, 13, 42,  4, 43, 31, 12, 13, 44, 45, 46, 47, 48, 49, 50, 51,
       52, 53, 54, 55, 56, 49, 57, 58, 29, 13, 22, 59, 60, 26, 61, 24, 62,
       12, 63,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])

In [187]:
# convert data np arrays to pytorch tensors
train_x = to_t(torch.from_numpy(train_x))
test_x = to_t(torch.from_numpy(test_x))

train_y = to_t(torch.from_numpy(train_y))
test_y = to_t(torch.from_numpy(test_y))

# data sets

class AmazonSentimentDataset(torch.utils.data.Dataset):
    def __init__(self, tokenized_sentences, sentiments):
        self.tokenized_sentences = tokenized_sentences
        self.sentiments = sentiments

    def __len__(self):
        return len(self.tokenized_sentences)

    def __getitem__(self, i):
        item = (
            self.tokenized_sentences[i],
            self.sentiments[i]
        )
        return item 
        
train_dataset = AmazonSentimentDataset(train_x, train_y)
test_dataset = AmazonSentimentDataset(test_x, test_y)

In [188]:
# data loaders

batch_size = 250

train_dl = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dl = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

data_loaders = {'train': train_dl, 'test': test_dl}

### Amazon Review Sentiment Analysis Model

In [195]:
# model architecture from https://blog.floydhub.com/long-short-term-memory-from-zero-to-hero-with-pytorch/

class ARSAModel(torch.nn.Module):
    def __init__(self, hidden_size, output_size, num_layers, num_embeddings, embedding_dim):
        super(ARSAModel, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers

        self.sigmoid = torch.nn.Sigmoid()

        # lookup table for word embeddings
        self.embedding = to_t(torch.nn.Embedding(num_embeddings, embedding_dim))
        self.lstm = torch.nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x, h_0, c_0):
        # h_0 -> initial hidden state
        # c_0 -> initial cell state

        # get word embeddings for vectors in inputted batch
        # word embeddigns are jointly learned as the model is trained
        embeddings = self.embedding(x)
        out, hidden = self.lstm(embeddings, (h_0, c_0))

        # reshape out for fully connected layer
        out = out.reshape(-1, self.hidden_size)

        out = self.fc(out)
        out = self.sigmoid(out)

        # reshape output to represent probabilities of positive sentiment for each input in batch
        out = out.view(batch_size, -1)
        out = out[:,-1]

        return out, h_0, c_0

    def init_hidden(self):
        # return tuple containing two tensors of zeroes - (initial hidden state, initial cell state)
        # change next model parameters to zeroes in the process
        weight = next(self.parameters())
        
        return (
            weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(device),
            weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(device)
        )


In [196]:
model = ARSAModel(
    hidden_size = 512,
    output_size = 2,
    num_layers = 2,
    num_embeddings = len(word_to_id),
    embedding_dim = max_sentence_length,
)

# model to gpu if available
model.to(device)
model.train()

criterion = torch.nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)


Training model

In [197]:
num_epochs = 1

for epoch in range(num_epochs):
    h_0, c_0 = model.init_hidden()

    for i, (x, y) in enumerate(data_loaders['train']):
        # x and y are batches

        # don't accumlate gradients        
        optimizer.zero_grad()
        model.zero_grad()

        h_0 = h_0.detach()
        c_0 = c_0.detach()

        output, h_0, c_0 = model(x, h_0, c_0)

        # calculate loss using Binary Cross Entropy loss function
        loss = criterion(output, y.float())
        # calculate model gradients
        loss.backward()
        # step model parameters
        optimizer.step()
        
        if(i%250==0):
            print(
                f"Step {i}, "
                f"Loss: {loss.item():.4f}"
            )

    print(
        f"Epoch [{epoch + 1}/{num_epochs}], "
        f"Loss: {loss.item():.4f}"
    )


Step 0, Loss: 0.6941
Step 250, Loss: 0.4897
Step 500, Loss: 0.2834
Step 750, Loss: 0.1919
Step 1000, Loss: 0.2378
Epoch [1/1], Loss: 0.2425


### Results

Testing model accuracy

In [198]:
num_correct = 0

h_0, c_0 = model.init_hidden()

with torch.no_grad():
    model.eval()
    for x, y in data_loaders['test']:
        h_0 = h_0.detach()
        c_0 = c_0.detach()
        output, h_0, c_0 = model(x, h_0, c_0)
        
        pred = torch.round(output)
        correct = pred.eq(y.float())

        correct = correct.cpu().numpy()
        num_correct += np.sum(correct)

print(f"Accuracy: {num_correct / (len(data_loaders['test']) * batch_size) * 100:.4f}%")

Accuracy: 90.6220%


To improve model accuracy, we could train on the complete training dataset, alter the model architecture, tune hyperparameters, use pretrained word embeddings, etc.

Running personal test

In [204]:
h_0, c_0 =  (
    to_t(torch.zeros(model.num_layers, 250, model.hidden_size)),
    to_t(torch.zeros(model.num_layers, 250, model.hidden_size))
)

with torch.no_grad():
    model.eval()

    for sentence in [
        'I recently bought a product from a different brand that was terrible. This product was similar.',
        'Someone got this for me as a gift. I didnt like it at first but now I need more. Its wonderful.',
        'I wish I had something good to say about this product, but I dont.'
    ]:
    
        tokenized_sentence = []

        for word in word_tokenize(sentence):
            word = word.lower()
            tokenized_sentence.append(word)

        tokenized_sentence = [word_to_id[token] if token in word_to_id else 0 for token in tokenized_sentence]
        adjusted_x = np.zeros((1, max_sentence_length), dtype=int)
        adjusted_x[0][:min(max_sentence_length, len(tokenized_sentence))] = tokenized_sentence[:min(max_sentence_length, len(tokenized_sentence))]
        x = to_t(torch.from_numpy(adjusted_x)).repeat(batch_size, 1)

        output, _, _ = model(x, h_0, c_0)

        pred = torch.round(output)
        print(
            f"Sentence: {sentence} \n"
            f"Predicted Sentiment: {'Positive' if pred[0].item() == 1.0 else 'Negative'} \n"
        )

Sentence: I recently bought a product from a different brand that was terrible. This product was similar. 
Predicted Sentiment: Negative 

Sentence: Someone got this for me as a gift. I didnt like it at first but now I need more. Its wonderful. 
Predicted Sentiment: Positive 

Sentence: I wish I had something good to say about this product, but I dont. 
Predicted Sentiment: Negative 

