In [None]:
! pip3 install transformers

In [None]:
import torch
import transformers
import matplotlib.pyplot as plt

print(torch.__version__)
print(transformers.__version__)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

In [None]:
import pandas as pd

train_df = pd.read_csv("data/train_data_clean.csv", header=None, encoding = "ISO-8859-1")
train_sentiment_df = train_df.iloc[:, 0]

res = train_sentiment_df.value_counts()
print(f"sentiment counts: \n{res}")


In [None]:

from torch.utils.data import Dataset, DataLoader
from torch.utils.data import random_split
import pandas as pd


class MyDataset(Dataset):
    def __init__(self, data_path="data/test_data.csv",
                 sentiment_column_index=0, text_column_index=5):
        super().__init__()
        self.df = pd.read_csv(data_path, header=None, encoding="ISO-8859-1")
        self.sentiment_column_index = sentiment_column_index
        self.text_column_index = text_column_index
        self.sentiment_map = {
            0: 0,  # Negative sentiment
            2: 1,  # Neutral sentiment # absent in train dataset
            4: 2,  # Positive sentiment
        }
        self.idx = 0

    def __len__(self):
        return len(self.df)

    def generate_item(self, idx):
        try:
            input_text = str(self.df.iloc[idx, self.text_column_index])
            sentiment = self.sentiment_map[
                self.df.iloc[idx, self.sentiment_column_index]
            ]

            # clean input_text by removing user_name at beginning of sentence
            input_text = input_text.strip()
            if input_text.startswith("@"):
                first_space = input_text.find(" ")
                if first_space == -1:  # It means nothing  else is there apart from username
                    raise Exception("Invalid input_text")
                input_text = input_text[first_space].strip()

        except Exception as ex:
            return None, None

        return input_text, sentiment

    def __getitem__(self, idx):
        self.idx = idx
        input_text = None
        sentiment = None
        # check if input_text is empty or None and sentiment is None
        while (not (input_text) or sentiment is None):
            input_text, sentiment = self.generate_item(self.idx)
            self.idx = self.idx + 1

        # print("HERE: ", input_text, sentiment)
        return str(input_text), sentiment


train_batch_size = 128
test_batch_size = 1
val_batch_size = 64

train_dataset = MyDataset(
    data_path="data/train_data_clean.csv",
    sentiment_column_index=0, text_column_index=5
)
print("Entire train dataset length before creating val set: ", len(train_dataset))
# splitting dataset to create validation split
train_dataset, val_dataset = random_split(train_dataset, [0.99, 0.01],
                                          generator=torch.Generator().manual_seed(42)
                                          )  # 5% data as val set
print(f"Train data len: {len(train_dataset)}, Val data len: {len(val_dataset)}")

test_dataset = MyDataset(
    data_path="data/test_data.csv",
    sentiment_column_index=0, text_column_index=5,
)

train_loader = DataLoader(train_dataset,
                          batch_size=train_batch_size, shuffle=True
                          )
val_loader = DataLoader(val_dataset,
                        batch_size=val_batch_size, shuffle=True)
test_loader = DataLoader(test_dataset,
                         batch_size=test_batch_size, shuffle=True
                         )

In [None]:
from sklearn.metrics import accuracy_score, classification_report
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import seaborn as sns
from tqdm import tqdm


def get_classification_report(actual_sentiment, pred_sentiment):
    # class_names = ["pos", "neutral", "neg"]
    return classification_report(actual_sentiment, pred_sentiment)


def plot_data(actual_sentiment, pred_sentiment):
    cm = confusion_matrix(actual_sentiment, pred_sentiment)
    disp = ConfusionMatrixDisplay(cm)
    disp.plot(cmap="Blues")
    plt.show()


def plot_losses(val_loss_logs, train_loss_logs, log_interval=50):
    plt.plot([(x + 1) * log_interval for x in range(len(val_loss_logs))], val_loss_logs, label="validation")
    plt.plot([x for x in range(len(train_loss_logs))], train_loss_logs, label="train")
    plt.show()


def soft_prob_assign_based_on_threshold(output, classification_threshold):
    probs = torch.exp(output)

    # # check if probabilities of both classes are less than 0.7 then classify as normal class
    prob_more_than_thres = torch.where(probs > classification_threshold, 1.0, 0.0)
    #  prob_more_than_thres will have [0,0,0] if all none prob is more than thres

    correctly_classified = torch.any(prob_more_than_thres.bool(), dim=-1)  # [False]
    # if not correctly classified then assign it to class 1(i.e. neutral class)
    prob_more_than_thres[correctly_classified == False] = torch.tensor([0.1, 0.8, 0.1])

    res = prob_more_than_thres
    return res


def train(model, optimizer, scheduler, train_loader, val_loader,
          lr=0.00025, warmup_step=100,
          classification_threshold=0.7, log_interval=50):
    model.train()
    model.to(device)
    prev_acc_score = -2.0
    prev_val_loss = 0.333  # 0.424
    val_loss_logs = []
    train_loss_logs = []
    loss = None
    train_step = 0
    while True:
        pred_sentiment = []
        actual_sentiment = []
        for input_text, sentiment in tqdm(train_loader):
            optimizer.zero_grad()
            output = model(list(input_text))

            loss = F.nll_loss(output, sentiment)


            # store label to calculate accuracy score
            current_pred_sentiment = output.argmax(dim=-1)

            # print(current_pred_sentiment.shape, current_pred_sentiment)
            pred_sentiment += list(current_pred_sentiment.detach().numpy())
            actual_sentiment += list(sentiment.detach().numpy())
            train_loss_logs.append(float(loss))

            loss.backward()
            optimizer.step()

            train_step += 1
            if scheduler is not None:
                # linear warmup stage
                if train_step <= warmup_step:
                    curr_lr = lr * train_step / warmup_step
                    optimizer.param_groups[0]['lr'] = curr_lr;

                scheduler.step();
                # scheduler.step(train_step)

            if train_step % log_interval == 0:
                # Negative log likelihood loss logging for val_dataset
                _, val_loss, _ = eval(model=model, test_loader=val_loader,
                                      classification_threshold=classification_threshold)
                val_loss_logs.append(float(val_loss))
                print(f"train_step: {train_step} lr: {optimizer.param_groups[0]['lr']}, "
                      f"train_loss: {float(loss)}, val_loss: {float(val_loss)}")

                if val_loss < prev_val_loss:
                    # saving model
                    torch.save(model.state_dict(), "model.pt")
                    torch.save(optimizer.state_dict(), "optim.pt")

                if prev_val_loss - val_loss < 0.005:
                    break

                prev_acc_score = val_loss

            # break
        acc_score = accuracy_score(actual_sentiment, pred_sentiment)
        if prev_acc_score - acc_score < 0.008:  # acc score does not increase
            print("Exiting training due to plateau in accuracy_score")
            break

        # break

        prev_acc_score = acc_score

    print("Loss logs ", train_loss_logs, val_loss_logs)
    plot_losses(val_loss_logs, train_loss_logs, log_interval=log_interval)

    return train_loss_logs, val_loss_logs, float(loss)


def eval(model, test_loader, classification_threshold=0.7):
    model.eval()
    loss_logs = []
    pred_sentiment = []
    actual_sentiment = []
    acc_score = 0.0
    for input_text, sentiment in tqdm(test_loader):
        output = model(list(input_text))
        loss = F.nll_loss(output, sentiment)

        if classification_threshold > 0.0:
            output = soft_prob_assign_based_on_threshold(output, classification_threshold)
        current_pred_sentiment = output.argmax(dim=-1)

        pred_sentiment += list(current_pred_sentiment.detach().numpy())
        actual_sentiment += list(sentiment.detach().numpy())

        loss_logs.append(float(loss))

    acc_score = accuracy_score(actual_sentiment, pred_sentiment)
    print("Metrics: \n", get_classification_report(actual_sentiment, pred_sentiment))
    plot_data(actual_sentiment=actual_sentiment, pred_sentiment=pred_sentiment)

    return acc_score, sum(loss_logs) / len(loss_logs), pred_sentiment



def predict(input_sentence, classification_threshold=0.7):
    model.eval()
    if isinstance(input_sentence, str):
        input_sentence = [input_sentence]
    output = model(list(input_sentence))
    probs = torch.exp(output).gather(-1, output.argmax(dim=-1)[:, None])
    if classification_threshold > 0.0:
        output = soft_prob_assign_based_on_threshold(output, classification_threshold)

    current_pred_sentiment = output.argmax(dim=-1)
    pred_sentiment = list(current_pred_sentiment.detach().numpy())

    sentiment_map = {
        0: "Negative",  # 0 class
        1: "Neutral",  # 1 class
        2: "Positive",  # 2 class
    }

    print("Probs: ", probs.squeeze().detach().numpy())
    print("Predicted: ", pred_sentiment)
    sentiment_list = []
    for s in pred_sentiment:
        sentiment_list.append(sentiment_map[s])
    print("sentiments: ", sentiment_list)



In [None]:

# models.py
from transformers import AutoTokenizer, BertConfig, BertModel
import torch.nn as nn
import torch.nn.functional as F

model_name = bert_model_name = "bert-base-uncased"


class SentimentModel(nn.Module):
    def __init__(self, n_classes=3, train_only_final_layer=True):
        super().__init__()

        self.tokenizer = AutoTokenizer.from_pretrained(bert_model_name)

        pretrained_model = BertModel.from_pretrained(bert_model_name)
        self.model = pretrained_model

        # self.model = nn.Sequential(*list(pretrained_model.children())[:-1]) # Without the last pooler layer
        model_hidden_size = pretrained_model.config.hidden_size

        self.drop = nn.Dropout(p=0.1)
        self.classification_layer = nn.Linear(model_hidden_size, n_classes)
        nn.init.xavier_normal_(self.classification_layer.weight)

        if train_only_final_layer:
            self.freeze_parameters()

    def freeze_parameters(self):
        for p in self.model.parameters():
            p.requires_grad = False

    def forward(self, input_texts):
        tokenized_inputs = self.tokenizer(input_texts, return_tensors="pt",
                                          max_length=160, padding=True, truncation=True
                                          )

        tokenized_inputs = tokenized_inputs.to(device)

        final_outputs = self.model(input_ids=tokenized_inputs.input_ids,
                                   attention_mask=tokenized_inputs.attention_mask,
                                   token_type_ids=tokenized_inputs.token_type_ids,
                                   output_hidden_states=True
                                   )

        # Take the [CLS] token to predict the final layer output
        n_dims = len(final_outputs.last_hidden_state.shape)

        cls_final_embedding = None
        if n_dims == 3:
            cls_final_embedding = final_outputs.last_hidden_state[:, 0, :]  # B*n_tokens*hidden_size
        else:
            cls_final_embedding = final_outputs.last_hidden_state[0, :]

        classification_logits = self.classification_layer(self.drop(cls_final_embedding))
        res = F.log_softmax(classification_logits, dim=-1)
        return res


model = SentimentModel(n_classes=3, train_only_final_layer=False)

In [None]:
# Global params
lr = 0.00025
use_scheduler = True

In [None]:

def get_optimizer_scheduler(model, lr, use_scheduler=False, warmup_step=100):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = None
    if use_scheduler:
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, warmup_step)
    return optimizer, scheduler


optimizer, scheduler =  get_optimizer_scheduler(model, lr, use_scheduler=False, warmup_step=100)


In [None]:

if __name__ == "__main__":

    model.load_state_dict(torch.load("model.pt"))
    optimizer.load_state_dict(torch.load("optim.pt"))

    # predict testing
    input_sentence = ["I am sad", "We are going to president's house", "I am happy"]
    predict(input_sentence, classification_threshold=0.7)

    # train(model, optimizer, scheduler, train_loader, val_loader,
    #       lr=0.0001, warmup_step=100,
    #       classification_threshold=0.7, log_interval=400)


    # Evaluate
    acc_score, nll_loss, pred_sentiment = eval(model, test_loader, classification_threshold=0.7)
    print("Eval acc_score: ", acc_score)

    # count params
    print("Trainable_params: ",sum([p.numel() for p in model.parameters() if p.requires_grad == True]))
