In [None]:
import numpy as np 
import pandas as pd 
import torch
import torch.nn as nn
import torch.nn.functional as F
from nltk.corpus import stopwords 
from collections import Counter
import string
import re
import seaborn as sns
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

In [None]:
df = pd.read_csv('../../data/raw/reviews.csv')
df.head()

In [None]:
X,y = df['Text'].values,df['Sentiment'].values
x_train,x_test,y_train,y_test = train_test_split(X,y,stratify=y, random_state = 4263)
print(f'shape of train data is {x_train.shape}')
print(f'shape of test data is {x_test.shape}')

In [None]:
dd = pd.Series(y_train).value_counts()
sns.barplot(x=np.array(['negative','positive']),y=dd.values)
plt.show()

In [None]:
def preprocess_string(s):
    # Remove all non-word characters (everything except numbers and letters)
    s = re.sub(r"[^\w\s]", '', s)
    # Replace all runs of whitespaces with no space
    s = re.sub(r"\s+", '', s)
    # replace digits with no space
    s = re.sub(r"\d", '', s)

    return s

def create_word_list(x_train):
    word_list = []
    stop_words = set(stopwords.words('english'))
    for sent in x_train:
        for word in sent.lower().split(' '):
            word = preprocess_string(word)
            if word not in stop_words and word != '':
                word_list.append(word)
    return word_list
word_list = create_word_list(x_train)

In [None]:
def tokenize(x_train, y_train, x_test, y_test):
    corpus = Counter(word_list)
    corpus_ = sorted(corpus.items(), key = lambda x: x[1], reverse=True)[:1000]
    onehot_dict = {w[0]:i+1 for i, w in enumerate(corpus_)}

    final_list_train,final_list_test = [],[]
    for sent in x_train:
            final_list_train.append([onehot_dict[preprocess_string(word)] for word in sent.lower().split() 
                                     if preprocess_string(word) in onehot_dict.keys()])
    for sent in x_test:
            final_list_test.append([onehot_dict[preprocess_string(word)] for word in sent.lower().split() 
                                    if preprocess_string(word) in onehot_dict.keys()])
            
    encoded_train = [1 if label =='positive' else 0 for label in y_train]  
    encoded_test = [1 if label =='positive' else 0 for label in y_test] 
    return np.array(final_list_train, dtype = 'object'), np.array(encoded_train),np.array(final_list_test, dtype = 'object'), np.array(encoded_test),onehot_dict

In [None]:
x_train,y_train,x_test,y_test,vocab = tokenize(x_train,y_train,x_test,y_test)

In [None]:
rev_len = [len(i) for i in x_train]
pd.Series(rev_len).hist()
plt.show()
pd.Series(rev_len).describe()

In [None]:
def padding(sents, seq_len):
    features = np.zeros((len(sents), seq_len), dtype = int)
    for i, rev in enumerate(sents):
        if len(rev) != 0:
            features[i, -len(rev):] = np.array(rev)[:seq_len]
    return features

In [None]:
# we have very less number of reviews of length > 30, so we will take review up till length 50 only
x_train_pad = padding(x_train, 50)
x_test_pad = padding(x_test, 50)

In [None]:
train_data = TensorDataset(torch.from_numpy(x_train_pad), torch.from_numpy(y_train))
test_data = TensorDataset(torch.from_numpy(x_test_pad), torch.from_numpy(y_test))

# batch size
batch_size = 50

# shuffle data

train_loader = DataLoader(train_data, shuffle=True, batch_size = batch_size, drop_last = True)
test_loader = DataLoader(test_data, shuffle= True, batch_size = batch_size, drop_last = True)

In [None]:
for i, batch in enumerate(train_loader):
#     print(len(batch))
    print(batch[0].shape)
    print(batch[1].shape)
    print(batch[0])
    print(batch[1])

    break

In [None]:
#obtain one batch
dataiter = iter(train_loader)
sample_x, sample_y = next(dataiter)

print(f"Size of Batch {sample_x.size()}")
print(f"Sample input {sample_x}")
print(f"Sample output {sample_y}")
# obtain one batch of training data

In [None]:
class SentimentLSTM(nn.Module):
    def __init__(self, no_layers, vocab_size, hidden_dim, embedding_dim, drop_prob = 0.5):
        super(SentimentLSTM, self).__init__()
        
        self.no_layers = no_layers
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        
        #embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)  
        
        #LSTM
        self.lstm = nn.LSTM(input_size = embedding_dim, hidden_size = self.hidden_dim, num_layers = no_layers, batch_first=True)
        
        #dropout layers
        self.dropout = nn.Dropout(0.3)
        
        #linear and Sigmoid layer
        
        self.fc = nn.Linear(self.hidden_dim, self.output_dim)
        self.sig = nn.Sigmoid()
        
    def forward(self, x, hidden):
        # we just passed a batch
        batch_size = x.size(0) # batch size -> B
        #embed shape -> [B, max_len, embed_dim]
        embeds = self.embedding(x)
        
        
        lstm_out, hidden = self.lstm(embeds, hidden)
        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)
        
        
        # drop out and fully connected
        out = self.dropout(lstm_out)
        out = self.fc(out)
        
        # sigmoid 
        
        sig_out = self.sig(out)
        #reshape to batch size first
        
        sig_out = sig_out.view(batch_size, -1)
        
        sig_out = sig_out[:, -1]
        
        
        return sig_out, hidden
    
    
    def init_hidden(self, batch_size):
        
        # create hidden state and cell state tensors with size [no_layers x batch_size x hidden_dim]
        
        hidden_state = torch.zeros((self.no_layers, batch_size, self.hidden_dim))
        cell_state = torch.zeros((self.no_layers, batch_size, self.hidden_dim))
        hidden = (hidden_state, cell_state)
        return hidden

In [None]:
no_layers = 2 # no of layers in lstm
vocab_size = len(vocab) + 1 # extra for 0 (padding symbol)
embedding_dim = 64
output_dim = 1
hidden_dim = 256

model = SentimentLSTM(no_layers, vocab_size, hidden_dim, embedding_dim, drop_prob = 0.5)

model

In [None]:
# loss and optimization features

lr = 0.001
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
# accuracy function
def accuracy(pred, label):
    pred =torch.round(pred.squeeze())
    return torch.sum(pred == label.squeeze()).item()

In [None]:
clip = 5
epochs = 5
test_loss_min = np.Inf
epoch_tr_loss, epoch_tst_loss = [], []
epoch_tr_acc, epoch_tst_acc = [], []

for epoch in range(epochs):
    train_loss = []
    train_acc = 0
    h = model.init_hidden(batch_size)
    model.train()
    # processing each batch
    for x, y in train_loader:
        x, y = x, y
        
        h = tuple([each.data for each in h])
        model.zero_grad()
        output, h = model(x, h)
        
        #calculate loss
        
        loss = criterion(output.squeeze(), y.float())
        loss.backward()
        train_loss.append(loss.item())
        
        #acuracy
        
        acc = accuracy(output, y)
        
        train_acc += acc
        #clip_Grad_norm clips the grad or simply prevents exploding of gradient
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        
    test_h = model.init_hidden(batch_size)
    test_loss = []
    test_acc = 0
    model.eval()
    for x, y in test_loader:
        x, y = x, y
        test_h = tuple([each.data for each in test_h])
        
        output, test_h = model(x, test_h)
        
        loss = criterion(output.squeeze(), y.float())
        test_loss.append(loss.item())
        
        acc = accuracy(output, y)
        
        test_acc += acc
        
    epoch_train_loss = np.mean(train_loss) # take average loss for each batch
    epoch_test_loss = np.mean(test_loss)
    
    epoch_tr_loss.append(epoch_train_loss)
    epoch_tst_loss.append(epoch_test_loss)
    
    epoch_train_acc = train_acc / len(train_loader.dataset)
    
    epoch_test_acc = test_acc / len(test_loader.dataset)
    
    epoch_tr_acc.append(epoch_train_acc)
    epoch_tst_acc.append(epoch_test_acc)
    
    print(f'Epoch {epoch+1}') 
    print(f'train_loss : {epoch_train_loss} test_loss : {epoch_test_loss}')
    print(f'train_accuracy : {epoch_train_acc*100} test_accuracy : {epoch_test_acc*100}')
    if epoch_test_loss <= test_loss_min:
        # torch.save(model.state_dict(), '../working/state_dict.pt')
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(test_loss_min,epoch_test_loss))
        test_loss_min = epoch_test_loss
    print(25*'==')

In [None]:
fig = plt.figure(figsize = (20, 6))
plt.subplot(1, 2, 1)
plt.plot(epoch_tr_acc, label='Train Acc')
plt.plot(epoch_tst_acc, label='Test Acc')
plt.title("Accuracy")
plt.legend()
plt.grid()
    
plt.subplot(1, 2, 2)
plt.plot(epoch_tr_loss, label='Train loss')
plt.plot(epoch_tst_loss, label='Test loss')
plt.title("Loss")
plt.legend()
plt.grid()

plt.show()

In [None]:
def predict_sentiment(text):
    word_seq = np.array([vocab[preprocess_string(word)] for word in text.split() if preprocess_string(word) in vocab.keys()])
    word_seq = np.expand_dims(word_seq, axis = 0)
    print(word_seq)
    pad = torch.from_numpy(padding(word_seq, 500))
    
    inputs = pad
    batch_size = 1
    h = model.init_hidden(batch_size)
    output, h = model(inputs, h)
    return output.item()

In [None]:
df[df['Sentiment'] == 'negative']

In [None]:
index = 44
review, sentiment = df['Text'][index], df['Sentiment'][index]
print("The statement is:")
print(review)
print("=="*25)
print(f"Original Sentiment is: {sentiment}")
print("=="*25)
pred = predict_sentiment(review)
status = "positive" if pred > 0.5 else "negative"
print(f"Predicted Sentiment is {status} with probability of {pred}.")

In [None]:
index = 70
review, sentiment = df['Text'][index], df['Sentiment'][index]
print("The statement is:")
print(review)
print("=="*25)
print(f"Original Sentiment is: {sentiment}")
print("=="*25)
pred = predict_sentiment(review)
status = "positive" if pred > 0.5 else "negative"
print(f"Predicted Sentiment is {status} with probability of {pred}.")

In [None]:
index = 62
review, sentiment = df['Text'][index], df['Sentiment'][index]
print("The statement is:")
print(review)
print("=="*25)
print(f"Original Sentiment is: {sentiment}")
print("=="*25)
pred = predict_sentiment(review)
status = "positive" if pred > 0.5 else "negative"
print(f"Predicted Sentiment is {status} with probability of {pred}.")

Test on user generated data

In [None]:
review = "The product does seem to be different from what is show on the webite, it is also slightly expensive, would not buy again"
pred = predict_sentiment(review)
status = "positive" if pred > 0.5 else "negative"
print(f"Predicted Sentiment is {status} with probability of {pred}.")

19 March 2023

26% of data has negative sentiments
74% of data has positive sentiments

Data is slightly unbalanced and as seen from the model, the predicted probability for negative sentiment seems to be quite low. The model is good at classifying positive reviews but can be better on negative ones.

Things to do next:
- hyperparameter tuning 
- can tune threshold in which we determine whether sentiment is +/-, currently it is at 0.5
- try other models such as XGboost which is great for unbalanced dataset
    


20 March 2023

- reduce the length of reviews but changing the padding amount from 500 to 50 as most of the reviews are very short, this proved to improve the model significantly as the training/testing loss and accuracy improved and there is lesser overfitting

- Model highest train accuracy is 88% and test accuracy is 82%




In [None]:
model = Sequential()
embedding_layer = Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=maxlen , trainable=False)
model.add(embedding_layer)

model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))

In [None]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])

print(model.summary())

In [None]:
history = model.fit(X_train, y_train, batch_size=128, epochs=6, verbose=1, validation_split=0.2)

In [None]:
score = model.evaluate(X_test, y_test, verbose=1)

In [None]:
print("Test Score:", score[0])
print("Test Accuracy:", score[1])

In [None]:
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])

plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

In [None]:
model = Sequential()
embedding_layer = Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=maxlen , trainable=False)
model.add(embedding_layer)
model.add(LSTM(128))

model.add(Dense(1, activation='sigmoid'))
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])

In [None]:
print(model.summary())

In [None]:
history = model.fit(X_train, y_train, batch_size=128, epochs=6, verbose=1, validation_split=0.2)

score = model.evaluate(X_test, y_test, verbose=1)

In [None]:
print("Test Score:", score[0])
print("Test Accuracy:", score[1])

In [None]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])

plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])

plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()