In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from string import punctuation
import string
import copy
import torch
from torch._C import dtype
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn

In [3]:
df = pd.read_csv('/content/drive/My Drive/review_head.csv')
print(df.columns)


Index(['Id', 'ProductId', 'UserId', 'ProfileName', 'HelpfulnessNumerator',
       'HelpfulnessDenominator', 'Score', 'Time', 'Summary', 'Text'],
      dtype='object')


In [4]:
# drop useless columns
df.drop(
    columns=[
        "Id",
        "ProductId",
        "UserId",
        "ProfileName",
        "HelpfulnessNumerator",
        "HelpfulnessDenominator",
        "Time",
        "Summary",
    ],
    inplace=True,
)

# make sentiment: 4,5 = positive, 1,2,3 = negative
st = []
count = 0
for s in list(df["Score"].values):
    if s == 5 or s == 4:
        st.append(1)
        count += 1
    else:
        st.append(0)
    pass
df["st"] = st
print("positive:", count)

# max_len of the reviews
max_len = 0
for t in list(df["Text"].values):
    temp = t.split()
    max_len = max(max_len, len(temp))
    pass
print("the max length is:", max_len)

# lower the case, remove the punctuations
res = []
for t in list(df["Text"].values):
    t = t.lower()

    for p in punctuation:
        t = t.replace(p, "")

    res.append(t)
    pass


print("review example:", res[0])
df["Text"] = res

# encoding
hist = {}
for t in list(df["Text"].values):
    temp = t.split()
    for word in temp:
        hist[word] = hist.get(word, 0) + 1
        pass
    pass

ind = 1
word2ind, ind2word = {}, {}

for k, v in sorted(hist.items(), key=lambda x: x[1], reverse=True):
    word2ind[k] = ind
    ind2word[ind] = k
    ind += 1
    pass


def encoder(df):
    X = []
    for t in list(df["Text"].values):
        temp = []
        words = t.split()
        for word in words:
            temp.append(word2ind[word])
            pass
        X.append(temp)
        pass
    return X

positive: 7616
the max length is: 1513
review example: i have bought several of the vitality canned dog food products and have found them all to be of good quality the product looks more like a stew than a processed meat and it smells better my labrador is finicky and she appreciates this product better than  most


In [5]:
# train, test, validation split
train, test = train_test_split(df, test_size=0.2, random_state=66)

print("train shape is: {}".format(train.shape))
print("test shape is: {}".format(test.shape))
print(
    "The train set contains {:.2f}% positive reviews".format(train["st"].mean() * 100)
)
print("The test set cintains {:.2f}% positive reviews".format(test["st"].mean() * 100))

train, validation = train_test_split(train, test_size=0.2, random_state=66)

# get X for every dataset
X_train = encoder(train)
X_val = encoder(validation)
X_test = encoder(test)

# padding and truncate
def padding_trun(max_feature, X):
    new_X = np.zeros((len(X), max_feature), dtype=int)
    for i, x in enumerate(X):
        if len(x) > max_feature:
            new_X[i, :] = np.array(x[:max_feature], dtype=int)
        elif len(x) < max_feature:
            temp = copy.deepcopy(x)
            while len(temp) != max_feature:
                temp.insert(0, 0)
                pass
            new_X[i, :] = np.array(temp, dtype=int)
        else:
            new_X[i, :] = np.array(x, dtype=int)
        pass
    return new_X


max_feature = 256
X_train = padding_trun(max_feature, X_train)
y_train = np.array(train["st"].values, dtype=int)

X_test = padding_trun(max_feature, X_test)
y_test = np.array(test["st"].values, dtype=int)

X_val = padding_trun(max_feature, X_val)
y_val = np.array(validation["st"].values, dtype=int)


# convert to tensor
train_data = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
valid_data = TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val))
test_data = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))

# build dataloaders
batch_size = 50
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, drop_last=True)
valid_loader = DataLoader(valid_data, shuffle=False, batch_size=batch_size, drop_last=True)
test_loader = DataLoader(test_data, shuffle=False, batch_size=batch_size, drop_last=True)


train shape is: (8000, 3)
test shape is: (2000, 3)
The train set contains 75.94% positive reviews
The test set cintains 77.05% positive reviews


In [6]:
class LSTM(nn.Module):
    def __init__(
        self,
        word_voc_count,
        output_size,
        embedding_dim,
        hidden_dim,
        layer_num,
        drop_prob,
    ):
    # inputs -> embedding layer -> LSTM -> fully connected layer -> sigmoid -> predictions
        super().__init__()

        self.hidden_dim = hidden_dim
        self.layer_num = layer_num
        self.output_size = output_size

        # embedding layer
        self.embedding = nn.Embedding(word_voc_count, embedding_dim)

        # LSTM layer
        self.lstm = nn.LSTM(
            embedding_dim, hidden_dim, layer_num, dropout=drop_prob, batch_first=True
        )

        # dropout
        self.dropout = nn.Dropout(0.2)

        # fully connected layer using linear function
        self.fc = nn.Linear(hidden_dim, output_size)

        # sigmoid activation function
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, hidden):
        # For prediction
        batch_size = x.size(0)

        embeds = self.embedding(x)
        lstm_out, hidden = self.lstm(embeds, hidden)

        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)

        out = self.dropout(lstm_out)
        out = self.fc(out)
        sigmoid_out = self.sigmoid(out)

        sigmoid_out = sigmoid_out.view(batch_size, -1)
        sigmoid_out = sigmoid_out[:, -1]

        return sigmoid_out, hidden

    def init_hidden(self, batch_size):

        weight = next(self.parameters()).data

        if device_gpu:
            hidden = (
                weight.new(self.layer_num, batch_size, self.hidden_dim).zero_().cuda(),
                weight.new(self.layer_num, batch_size, self.hidden_dim).zero_().cuda(),
            )
        else:
            hidden = (
                weight.new(self.layer_num, batch_size, self.hidden_dim).zero_(),
                weight.new(self.layer_num, batch_size, self.hidden_dim).zero_(),
            )

        return hidden


word_voc_count = len(word2ind) + 1
output_size = 1
embedding_dim = 512
hidden_dim = 256
layer_num = 2
drop_prob = 0.35
model = LSTM(word_voc_count, output_size, embedding_dim, hidden_dim, layer_num, drop_prob)

print(model)

LSTM(
  (embedding): Embedding(24896, 512)
  (lstm): LSTM(512, 256, num_layers=2, batch_first=True, dropout=0.35)
  (dropout): Dropout(p=0.2, inplace=False)
  (fc): Linear(in_features=256, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)


In [7]:
# loss and optimization functions
# GPU
device_gpu = True
learning_rate = 0.001

# binary cross entropy for the binary classification problems
criterion = nn.BCELoss()

# achieves acc of 82% which outperforms SGD with acc of 75%
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


epochs = 5

count = 0
print_epoch = 100
clip = 5

if device_gpu:
    model.cuda()

model.train()
for e in range(epochs):
    hid = model.init_hidden(batch_size)

    for predictors, lab_y in train_loader:
        count += 1

        if device_gpu:
            predictors, lab_y = predictors.cuda(), lab_y.cuda()

        hid = tuple([each.data for each in hid])

        model.zero_grad()

        predictors = predictors.type(torch.LongTensor)
        output, hid = model(predictors.cuda(), hid)

        loss = criterion(output.squeeze(), lab_y.float())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        if count % print_epoch == 0:
            val_hid = model.init_hidden(batch_size)
            val_losses = []
            model.eval()
            for predictors, lab_y in valid_loader:
                val_hid = tuple([each.data for each in val_hid])

                if device_gpu:
                    predictors, lab_y = predictors.cuda(), lab_y.cuda()

                predictors = predictors.type(torch.LongTensor)

                if (predictors.shape[0], predictors.shape[1]) != (batch_size, max_feature):
                    continue

                output, val_hid = model(predictors.cuda(), val_hid)
                val_loss = criterion(output.squeeze(), lab_y.float())

                val_losses.append(val_loss.item())

            model.train()
            print(
                "Epochs: {}, ".format(e + 1),
                "Steps: {}, ".format(count),
                "Loss: {:.6f}, ".format(loss.item()),
            )

Epochs: 1,  Steps: 100,  Loss: 0.551388, 
Epochs: 2,  Steps: 200,  Loss: 0.383713, 
Epochs: 3,  Steps: 300,  Loss: 0.168112, 
Epochs: 4,  Steps: 400,  Loss: 0.087152, 
Epochs: 4,  Steps: 500,  Loss: 0.031963, 
Epochs: 5,  Steps: 600,  Loss: 0.025938, 


In [10]:
loss_testing = list()
correct_count = 0
hid = model.init_hidden(batch_size)

res = []

model.eval()

for predictors, lab_y in test_loader:

    hid = tuple([each.data for each in hid])

    if device_gpu:
        predictors, lab_y = predictors.cuda(), lab_y.cuda()

    predictors = predictors.type(torch.LongTensor)
    output, hid = model(predictors.cuda(), hid)

    test_loss = criterion(output.squeeze(), lab_y.float())
    loss_testing.append(test_loss.item())

    pred = torch.round(output.squeeze())

    correct_tensor = pred.eq(lab_y.float().view_as(pred))

    if device_gpu:
        correct = np.squeeze(correct_tensor.cpu().numpy())
    else:
        correct = np.squeeze(correct_tensor.numpy())

    res += list(correct)
    correct_count += np.sum(correct)
    
acc_testing = correct_count / len(test_loader.dataset)

print("Loss for the test dataset:", np.mean(loss_testing))
print("Accuracy for the test dataset:", acc_testing)

Loss for the test dataset: 0.6717493239790201
Accuracy for the test dataset: 0.835
2000


In [14]:
# output file for cases analysis
test["res"] = res
test.to_csv('/content/drive/My Drive/testdata_real.csv')