# Common imports


In [1]:
import torch

DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
#DEVICE = torch.device("cpu")

# Text Transformations

# Dataset

In [2]:
import pandas as pd
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchtext.transforms as T
#from torch.hub import load_state_dict_from_url
import torch.nn as nn

class Custom_IMDB(Dataset):
    def __init__(self, data_split_type, data_path='../data/IMDB Dataset.csv'):
        
        self.data = self.get_data(data_split_type, data_path)

        padding_idx = 1
        bos_idx = 0
        eos_idx = 2
        max_seq_len = 512

        VOCAB_FILE = "https://huggingface.co/bert-base-uncased/resolve/main/vocab.txt"

        self.text_transform = T.Sequential(
            T.BERTTokenizer(vocab_path=VOCAB_FILE),
            T.StrToIntTransform(),
            T.Truncate(max_seq_len),
            # T.AddToken(token=bos_idx, begin=True),
            # T.AddToken(token=eos_idx, begin=False),
            T.ToTensor(),
            T.PadTransform(max_seq_len, pad_value = 0),
        )

    def label_to_index(self, index):
        label = self.data['sentiment'][index]
        label_to_index = {"positive" : [1] , "negative" : [0]}

        return torch.tensor(label_to_index[label]).float()
    
    def __len__(self):
        return len(self.data.index)

    def __getitem__(self, index):
        review_text = self.data['review'][index]
        transformed_text = self.text_transform(review_text).float()

        label = self.label_to_index(index)

        return {'text': transformed_text, 'label': label, 'length' : transformed_text.size(dim=0)}
    
    def get_data(self, data_split_type, data_path):
        data = pd.read_csv(data_path)
        self.train_slice = int(len(data)*0.7)
        self.val_slice = self.train_slice + int(len(data)*0.2)
        #self.test_slice = val_slice + int(len(data)*0.1)

        if data_split_type == 'train':
            return data[:self.train_slice].reset_index()
        elif data_split_type == 'val':
            return data[self.train_slice:self.val_slice].reset_index()
        elif data_split_type == 'test':
            return data[self.val_slice:].reset_index()
        else:
            print('Error!!')

    # def get_vocab(self):

    #     # VOCAB_FILE = "https://huggingface.co/bert-base-uncased/resolve/main/vocab.txt"

    #     # #all_lines = [line for line in data['review']]


# Data Preparations

In [3]:

batch_size = 10

train_datapipe = Custom_IMDB(data_split_type="train")
val_datapipe = Custom_IMDB(data_split_type="val")
test_datapipe = Custom_IMDB(data_split_type="test")


train_dataloader = DataLoader(train_datapipe, batch_size=batch_size, collate_fn=None, shuffle=True)

val_dataloader = DataLoader(val_datapipe, batch_size=batch_size, collate_fn=None)

test_dataloader = DataLoader(test_datapipe, batch_size=batch_size, collate_fn=None)

100%|██████████| 232k/232k [00:00<00:00, 825kB/s] 
100%|██████████| 232k/232k [00:00<00:00, 788kB/s] 
100%|██████████| 232k/232k [00:00<00:00, 773kB/s] 


# Model Defenition

In [4]:
class Custom_RNN(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers):
        super(Custom_RNN, self).__init__()

        # Defining some parameters
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        #Defining the layers
        # RNN Layer
        self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=False)   
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, output_size)
        # Activation Layer
        self.activate = nn.Sigmoid()
    
    def forward(self, x):
        
        batch_size = x.size(0)

        x = x.unsqueeze(0)

        # Initializing hidden state for first input using method defined below
        hidden = self.init_hidden(batch_size).to(DEVICE)

        # Passing in the input and hidden state into the model and obtaining outputs
        out, hidden = self.rnn(x, hidden)
        
        # Reshaping the outputs such that it can be fit into the fully connected layer
        out = out.contiguous().view(-1, self.hidden_dim)
        out = self.fc(out)
        out = self.activate(out)
        
        return out#, hidden
    
    def init_hidden(self, batch_size):
        # This method generates the first hidden state of zeros which we'll use in the forward pass
        # We'll send the tensor holding the hidden state to the device we specified earlier as well
        hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim)
        return hidden

# Model Preparation 

In [5]:

model = Custom_RNN(input_size=512, output_size=1, hidden_dim=256, n_layers=2)
model.to(DEVICE)

Custom_RNN(
  (rnn): RNN(512, 256, num_layers=2)
  (fc): Linear(in_features=256, out_features=1, bias=True)
  (activate): Sigmoid()
)

# Training methods

In [6]:
import torchtext.functional as F
from torch.optim import Adam

learning_rate = 0.01
optim = Adam(model.parameters(), lr=learning_rate)
criteria = nn.BCELoss()


def train_step(input, target):
    output = model(input)
    loss = criteria(output, target)
    optim.zero_grad()
    loss.backward()
    optim.step()


def eval_step(input, target):
    output = model(input)
    loss = criteria(output, target).item()
    counter = 0
    correct_predictions = 0
    for predict_idx in range(target.size(0)):
        if output[predict_idx] >= 0.5:
            if target[predict_idx] == torch.tensor([1.0]).to(DEVICE):
                correct_predictions += 1
                counter += 1
            else :
                counter += 1
        elif output[predict_idx] <= 0.5:
            if target[predict_idx] == torch.tensor([0.0]).to(DEVICE):
                correct_predictions += 1
                counter += 1
            else :
                counter += 1
        else:
            print('Error')
    return float(loss), correct_predictions, counter


def evaluate():
    model.eval()
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0
    counter = 0
    with torch.no_grad():
        for batch in val_dataloader:
            input = batch['text'].to(DEVICE)
            target = batch['label'].to(DEVICE)
            loss, predictions, ret_count = eval_step(input, target)
            total_loss += loss
            correct_predictions += predictions
            total_predictions += len(target)
            counter += ret_count

    return total_loss / counter, correct_predictions / total_predictions

def test():
    model.eval()
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0
    counter = 0
    with torch.no_grad():
        for batch in test_dataloader:
            input = batch['text'].to(DEVICE)
            target = batch['label'].to(DEVICE)
            loss, predictions = eval_step(input, target)
            total_loss += loss
            correct_predictions += predictions
            total_predictions += len(target)
            counter += 1

    return total_loss / counter, correct_predictions / total_predictions

# Train

In [8]:
from torch.utils.tensorboard import SummaryWriter

num_epochs = 10

writer = SummaryWriter()

for e in range(num_epochs):
    
    model.train()

    for index, batch in enumerate(train_dataloader):
        print('index ', index, end=' \r')
        input = batch['text'].to(DEVICE)
        target = batch['label'].to(DEVICE)
        train_step(input, target)

    loss, accuracy = evaluate()
    print(f"Epoch = [{e}], loss = [{loss}], accuracy = [{accuracy}]")
    writer.add_scalar("Validate Epoch x loss", e, loss)
    writer.add_scalar("Validate Epoch x accuracy", e, accuracy)

Epoch = [0], loss = [0.06992299737334251], accuracy = [0.496]
Epoch = [1], loss = [0.07123513069152831], accuracy = [0.496]
Epoch = [2], loss = [0.08749255921244621], accuracy = [0.504]
Epoch = [3], loss = [0.07386309220194817], accuracy = [0.496]
index  1770 

KeyboardInterrupt: 

# Test

In [10]:
loss, accuracy = test()
print(f"Epoch = [{e}], loss = [{loss}], accuracy = [{accuracy}]")
writer.add_scalar("Test loss", loss)
writer.add_scalar("Test accuracy", accuracy)
writer.close()

Epoch = [2], loss = [0.7003979082107544], accuracy = [5.06]
