In [2]:
###############################################################################
#######################  1. LOAD THE TRAINING TEXT  ###########################
###############################################################################
with open("../data/reviews.txt") as f:
    reviews = f.read()
    
with open("../data/labels.txt") as f:
    labels = f.read()
    

###############################################################################
##########################  2. TEXT PRE-PROCESSING  ###########################
###############################################################################
import string

def preprocess(text):
    text = text.lower()
    text=text.translate(str.maketrans('', '', string.punctuation))
    all_reviews = text.split("\n")
    text =text.split()
    text=" ".join(text)
    all_words = text.split(' ')
    return all_reviews, all_words

all_reviews, all_words  = preprocess(reviews)


###############################################################################
##################  3. CREATE DICTIONARIES & ENCODE REVIEWS  ##################
###############################################################################
from collections import Counter

word_counts = Counter(all_words)
word_list = sorted(word_counts, key = word_counts.get, reverse = True)
vocab_to_int = {word:idx+1 for idx, word in enumerate(word_list)}
int_to_vocab = {idx:word for word, idx in vocab_to_int.items()}
encoded_reviews = [[vocab_to_int[word] for word in review.split()] for review in all_reviews]



###############################################################################
#############################  4. ENCODE LABELS ###############################
###############################################################################
all_labels = labels.split("\n")
encoded_labels = [1 if label == "positive" else 0 for label in all_labels]
assert len(encoded_reviews) == len(encoded_labels), "# of encoded reivews & encoded labels must be the same!"

###############################################################################
#####################  5. GET RID OF LENGTH-0 REVIEWS   #######################
###############################################################################
import numpy as np
import torch

encoded_labels = np.array( [label for idx, label in enumerate(encoded_labels) if len(encoded_reviews[idx]) > 0] )
encoded_reviews = [review for review in encoded_reviews if len(review) > 0]

###############################################################################
######################  6. MAKE ALL REVIEWS SAME LENGTH  #######################
###############################################################################
def pad_text(encoded_reviews, seq_length):
    
    reviews = []
    
    for review in encoded_reviews:
        if len(review) >= seq_length:
            reviews.append(review[:seq_length])
        else:
            reviews.append([0]*(seq_length-len(review)) + review)
        
    return np.array(reviews)


padded_reviews = pad_text(encoded_reviews, seq_length = 200)


###############################################################################
##############  7.SPLIT DATA & GET (REVIEW, LABEL) DATALOADER  ###############
###############################################################################
train_ratio = 0.8
valid_ratio = (1 - train_ratio)/2
total = padded_reviews.shape[0]
train_cutoff = int(total * train_ratio)
valid_cutoff = int(total * (1 - valid_ratio))

train_x, train_y = padded_reviews[:train_cutoff], encoded_labels[:train_cutoff]
valid_x, valid_y = padded_reviews[train_cutoff : valid_cutoff], encoded_labels[train_cutoff : valid_cutoff]
test_x, test_y = padded_reviews[valid_cutoff:], encoded_labels[valid_cutoff:]

from torch.utils.data import TensorDataset, DataLoader
import pandas as pd 
device ="cpu" ## DEFAULTING DEVICE TO CPU

train_x = torch.from_numpy(train_x)
train_y = torch.from_numpy(train_y)
valid_x = torch.from_numpy(valid_x)
valid_y = torch.from_numpy(valid_y)
test_x = torch.from_numpy(test_x)
test_y = torch.from_numpy(test_y)

train_x= torch.tensor(train_x).to(device).long()
train_y= torch.tensor(train_y).to(device).long()
valid_x= torch.tensor(valid_x).to(device).long()
valid_y= torch.tensor(valid_y).to(device).long()
test_x= torch.tensor(train_x).to(device).long()
test_y= torch.tensor(train_x).to(device).long()

train_data = TensorDataset(train_x, train_y)
valid_data = TensorDataset(valid_x, valid_y)
test_data = TensorDataset(test_x, test_y)

batch_size = 50
train_loader = DataLoader(train_data, batch_size = batch_size, shuffle = True)
valid_loader = DataLoader(valid_data, batch_size = batch_size, shuffle = True)
test_loader = DataLoader(test_data, batch_size = batch_size, shuffle = True)




In [4]:
###############################################################################
#########################  8.DEFINE THE LSTM MODEL  ##########################
###############################################################################
from torch import nn

class SentimentNet(nn.Module):
    
    
    def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5):
        super(SentimentNet, self).__init__()
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(hidden_dim, output_size) #fully connected 
        self.sigmoid = nn.Sigmoid()        
    
    def forward(self, x, hidden):
        batch_size = x.size(0)
        x = x.long()
        embeds = self.embedding(x)
        lstm_out, hidden = self.lstm(embeds, hidden)
        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)
        
        out = self.dropout(lstm_out)
        out = self.fc(out)
        out = self.sigmoid(out)
        
        out = out.view(batch_size, -1)
        out = out[:,-1]
        return out, hidden
    
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(device),
                      weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(device))
        return hidden

In [5]:
###############################################################################
############ 9.PARAMETER INITIALIZATON ##########################################
###############################################################################
vocab_size = len(vocab_to_int) + 1
output_size = 1
embedding_dim = 200
hidden_dim = 512
n_layers = 2

model = SentimentNet(vocab_size, output_size, embedding_dim, hidden_dim, n_layers)
model.to(device)

lr=0.005
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [None]:
###############################################################################
############ 10.TRAINING ######################################################
###############################################################################

epochs = 10
counter = 0
print_every = 1000
clip = 5
valid_loss_min = np.Inf

model.train()
for i in range(epochs):
    h = model.init_hidden(batch_size)
    
    for inputs, labels in train_loader:
        counter += 1
        h = tuple([e.data for e in h])
        inputs, labels = inputs.to(device), labels.to(device)
        model.zero_grad()
        output, h = model(inputs, h)
        loss = criterion(output.squeeze(), labels.float())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        
        if counter%print_every == 0:
            val_h = model.init_hidden(batch_size)
            val_losses = []
            model.eval()
            for inp, lab in val_loader:
                val_h = tuple([each.data for each in val_h])
                inp, lab = inp.to(device), lab.to(device)
                out, val_h = model(inp, val_h)
                val_loss = criterion(out.squeeze(), lab.float())
                val_losses.append(val_loss.item())
                
            model.train()
            print("Epoch: {}/{}...".format(i+1, epochs),
                  "Step: {}...".format(counter),
                  "Loss: {:.6f}...".format(loss.item()),
                  "Val Loss: {:.6f}".format(np.mean(val_losses)))
            if np.mean(val_losses) <= valid_loss_min:
                torch.save(model.state_dict(), './state_dict.pt')
                print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min,np.mean(val_losses)))
                valid_loss_min = np.mean(val_losses)

In [None]:
################################################################################
############11.LOADING THE BEST MODEL AND TEST THE TRAINED MODEL ON TEST SET####
################################################################################
model.load_state_dict(torch.load('./state_dict.pt'))

test_losses = []
num_correct = 0
h = model.init_hidden(batch_size)

model.eval()
for inputs, labels in test_loader:
    h = tuple([each.data for each in h])
    inputs, labels = inputs.to(device), labels.to(device)
    output, h = model(inputs, h)
    test_loss = criterion(output.squeeze(), labels.float())
    test_losses.append(test_loss.item())
    pred = torch.round(output.squeeze())  # Rounds the output to 0/1
    correct_tensor = pred.eq(labels.float().view_as(pred))
    correct = np.squeeze(correct_tensor.cpu().numpy())
    num_correct += np.sum(correct)

print("Test loss: {:.3f}".format(np.mean(test_losses)))
test_acc = num_correct/len(test_loader.dataset)
print("Test accuracy: {:.3f}%".format(test_acc*100))

In [None]:
###############################################################################
############  TEST THE TRAINED MODEL ON A RANDOM SINGLE REVIEW ################
###############################################################################
def predict(net, review, seq_length = 200):
    device = "cpu"
    
    text, words = preprocess(review)
    encoded_words = [vocab_to_int[word] for word in words]
    padded_words = pad_text([encoded_words], seq_length)
    padded_words = torch.from_numpy(padded_words).to(device)
    
    if(len(padded_words) == 0):
        "Your review must contain at least 1 word!"
        return None
    
    net.eval()
    h = net.init_hidden(1)
    output, h = net(padded_words, h)
    pred = torch.round(output.squeeze())
    msg = "This is a positive review." if pred == 0 else "This is a negative review."
    
    return msg


review1 = "It made me cry."
review2 = "It was so good it made me cry."
review3 = "It's ok."
review4 = "This movie had the best acting and the dialogue was so good. I loved it."
review5 = "Garbage"
                       ### OUTPUT ###
predict(net, review1)  ## negative ##
predict(net, review2)  ## positive ##
predict(net, review3)  ## negative ##
predict(net, review4)  ## positive ##
predict(net, review5)  ## negative ##