<a href="https://colab.research.google.com/github/zeeshanahmad10809/covid_fake_news_classification/blob/main/Transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Sentiment Analysis (Machine Learning Techniques)

### Install & Import Dependencies

In [33]:
!pip install loguru
!pip install tqdm
!pip install transformers



In [34]:
import os
import re
import string
import random
from os.path import join
from os import path
from loguru import logger
from tqdm import tqdm
import time
from datetime import timedelta
import numpy as np
import pandas as pd
import nltk
from nltk.stem import WordNetLemmatizer
from math import ceil
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, BertConfig
from transformers import (
    RobertaTokenizer,
    RobertaForSequenceClassification,
    RobertaConfig,
)
from transformers import (
    DistilBertTokenizer,
    DistilBertForSequenceClassification,
    DistilBertConfig,
)
from transformers import AlbertTokenizer, AlbertForSequenceClassification, AlbertConfig
from transformers import (
    XLMRobertaTokenizer,
    XLMRobertaForSequenceClassification,
    XLMRobertaConfig,
)
from transformers import (
    ElectraTokenizer,
    ElectraForSequenceClassification,
    ElectraConfig,
)
from transformers import BartTokenizer, BartForSequenceClassification, BartConfig
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
)

nltk.download("wordnet")

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [35]:
# Set random seed values to attain deterministic behaviours
SEED_VALUE = 19
os.environ["PYTHONHASHSEED"] = str(SEED_VALUE)
random.seed(SEED_VALUE)
np.random.seed(SEED_VALUE)
torch.manual_seed(SEED_VALUE)
torch.cuda.manual_seed_all(SEED_VALUE)

In [36]:
# Use GPU if available, otherwise use CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### Utilities

In [38]:
def transformer_params(name):
    return {"batch_size": 32, "learning_rate": 1e-5}


def evaluation_metrics(Y_true, Y_pred, split="test"):
    metrics = dict()
    metrics[split + "_accuracy"] = accuracy_score(Y_true, Y_pred)
    metrics[split + "_precision"] = precision_score(Y_true, Y_pred, average="macro")
    metrics[split + "_recall"] = recall_score(Y_true, Y_pred, average="macro")
    metrics[split + "_f1_score"] = f1_score(Y_true, Y_pred, average="macro")
    metrics[split + "_confusion_matrix"] = confusion_matrix(Y_true, Y_pred)

    return metrics


def save_model(model, name, prev_name):
    if not path.exists(join(os.getcwd(), "trained")):
        os.mkdir(join(os.getcwd(), "trained"))
    if path.exists(join(join(os.getcwd(), "trained"), prev_name)):
        os.remove(join(join(os.getcwd(), "trained"), prev_name))
    torch.save(model, name)

### Text Preprocessing

In [39]:
def remove_url(tweet):
    return " ".join(
        re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t]) |(\w+:\/\/\S+)", " ", tweet).split()
    )


def remove_punctuation(tweet):
    for ch in string.punctuation:
        if ch in tweet:
            tweet = tweet.replace(ch, "")
    return tweet


def lower_case(tweet):
    return tweet.lower().strip()


def lemmatize(tweet):
    lemmatizer = WordNetLemmatizer()
    tweet = " ".join(lemmatizer.lemmatize(token) for token in tweet.split(" "))
    return tweet


def preprocess_tweet(tweet):
    # tweet = p.clean(tweet)
    tweet = remove_url(tweet)
    tweet = remove_punctuation(tweet)
    tweet = lower_case(tweet)
    tweet = lemmatize(tweet)
    return tweet

### Dataset

In [37]:
DATASET1 = "COVID FakeNews Data.csv"
DATASET2 = "dataset-Non-extremist-Extremist.csv"

In [40]:
class SentimentDataset(object):
    def __init__(self, dataset_name, mode):
        self.dataset_name = dataset_name
        if mode not in ["train", "test"]:
            raise ValueError
        self.mode = mode
        data = None
        try:
            data = pd.read_csv(self.dataset_name)
        except FileNotFoundError:
            logger.warning("Dataset File is missing!")
            os._exit(0)
        if self.dataset_name == "dataset-Non-extremist-Extremist.csv":
            data["Tweet label"] = data["Tweet label"].replace("Non-extremist", 0)
            data["Tweet label"] = data["Tweet label"].replace("Extremist", 1)
            col_list = data.columns.to_list()
            col_list = [col_list[-1], col_list[0]]
            data = data[col_list]

        data.iloc[:, 0] = data.iloc[:, 0].apply(preprocess_tweet)
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            data.iloc[:, 0], data.iloc[:, 1], stratify=data.iloc[:, 1], test_size=0.2
        )
        self.X_train, self.X_test, self.y_train, self.y_test = (
            list(self.X_train),
            list(self.X_test),
            list(self.y_train),
            list(self.y_test),
        )

    def __len__(self):
        if self.mode == "train":
            return len(self.X_train)
        else:
            return len(self.X_test)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        if self.mode == "train":
            return self.X_train[idx], self.y_train[idx]
        else:
            return self.X_test[idx], self.y_test[idx]

### Model

In [41]:
def load_transformer(name):
    logger.info(f"Loading model {name}!")
    if name == "bert-base":
        config = BertConfig.from_pretrained("bert-base-uncased")
        return {
            "model": BertForSequenceClassification.from_pretrained(
                "bert-base-uncased", config=config
            ),
            "tokenizer": BertTokenizer.from_pretrained("bert-base-uncased"),
        }
    elif name == "bert-large":
        config = BertConfig.from_pretrained("bert-large-uncased")
        return {
            "model": BertForSequenceClassification.from_pretrained(
                "bert-large-uncased", config=config
            ),
            "tokenizer": BertTokenizer.from_pretrained("bert-large-uncased"),
        }
    elif name == "roberta-base":
        config = RobertaConfig.from_pretrained("roberta-base")
        return {
            "model": RobertaForSequenceClassification.from_pretrained(
                "roberta-base", config=config
            ),
            "tokenizer": RobertaTokenizer.from_pretrained("roberta-base"),
        }
    elif name == "roberta-large":
        config = RobertaConfig.from_pretrained("roberta-large")
        return {
            "model": RobertaForSequenceClassification.from_pretrained(
                "roberta-large", config=config
            ),
            "tokenizer": RobertaTokenizer.from_pretrained("roberta-large"),
        }
    elif name == "distilbert":
        config = DistilBertConfig.from_pretrained("distilbert-base-uncased")
        return {
            "model": DistilBertForSequenceClassification.from_pretrained(
                "distilbert-base-uncased", config=config
            ),
            "tokenizer": DistilBertTokenizer.from_pretrained("distilbert-base-uncased"),
        }
    elif name == "albert-base-v2":
        config = AlbertConfig.from_pretrained("albert-base-v2")
        return {
            "model": AlbertForSequenceClassification.from_pretrained(
                "albert-base-v2", config=config
            ),
            "tokenizer": AlbertTokenizer.from_pretrained("albert-base-v2"),
        }
    elif name == "xlmroberta-base":
        config = XLMRobertaConfig.from_pretrained("xlm-roberta-base")
        return {
            "model": XLMRobertaForSequenceClassification.from_pretrained(
                "xlm-roberta-base", config=config
            ),
            "tokenizer": XLMRobertaTokenizer.from_pretrained("xlm-roberta-base"),
        }
    elif name == "electra-small":
        config = ElectraConfig.from_pretrained("google/electra-small-discriminator")
        return {
            "model": ElectraForSequenceClassification.from_pretrained(
                "google/electra-small-discriminator", config=config
            ),
            "tokenizer": ElectraTokenizer.from_pretrained(
                "google/electra-small-discriminator"
            ),
        }
    elif name == "bart-large":
        config = BartConfig.from_pretrained("facebook/bart-large")
        return {
            "model": BartForSequenceClassification.from_pretrained(
                "facebook/bart-large", config=config
            ),
            "tokenizer": BartTokenizer.from_pretrained("facebook/bart-large"),
        }
    else:
        raise ValueError

    config.num_labels = 2

### Training & Evaluation

In [42]:
def train_step(model, inputs, labels, optimizer):
    optimizer.zero_grad()

    labels = labels.unsqueeze(0)
    outputs = model(
        inputs["input_ids"], attention_mask=inputs["attention_mask"], labels=labels
    )
    loss, logits = outputs[:2]

    loss.backward()
    optimizer.step()

    return logits, loss

In [43]:
def eval_step(model, inputs, labels):
    labels = labels.unsqueeze(0)
    outputs = model(
        inputs["input_ids"], attention_mask=inputs["attention_mask"], labels=labels
    )
    loss, logits = outputs[:2]

    return logits, loss

In [44]:
def train_epoch(model, tokenizer, train_dataset, optimizer, batch_size):
    train_loader = DataLoader(
        dataset=train_dataset, batch_size=batch_size, shuffle=True
    )

    correct_count = 0
    total_loss = 0

    model.train()
    with tqdm(
        total=ceil(len(train_dataset) / batch_size), desc="train", unit="batch"
    ) as pbar:
        for text, sentiment in train_loader:
            text = tokenizer(text, padding=True, return_tensors="pt").to(device)
            sentiment = sentiment.to(device)

            logits, loss = train_step(model, text, sentiment, optimizer)

            preds = torch.argmax(logits, axis=1)
            correct_count += (preds == sentiment).sum().item()
            total_loss += loss.item()
            pbar.update(1)

    return correct_count / len(train_dataset), total_loss / len(train_dataset)

In [45]:
def eval_epoch(model, tokenizer, eval_dataset, batch_size, split):
    eval_loader = DataLoader(dataset=eval_dataset, batch_size=batch_size, shuffle=True)

    correct_count = 0
    total_loss = 0
    y_pred = list()
    y_true = list()

    model.eval()
    with torch.no_grad():
        with tqdm(
            total=ceil(len(eval_dataset) / batch_size), desc=split, unit="batch"
        ) as pbar:
            for text, sentiment in eval_loader:
                text = tokenizer(text, padding=True, return_tensors="pt").to(device)
                sentiment = sentiment.to(device)

                logits, loss = eval_step(model, text, sentiment)

                preds = torch.argmax(logits, axis=1)
                y_pred += preds.cpu().numpy().tolist()
                y_true += sentiment.cpu().numpy().tolist()

                correct_count += (preds == sentiment).sum().item()
                total_loss += loss.item()
                pbar.update(1)

    metrics_score = evaluation_metrics(y_true, y_pred, split=split)
    return (
        correct_count / len(eval_dataset),
        total_loss / len(eval_dataset),
        metrics_score,
    )

In [46]:
def train(name, dataset_name, epochs=25, patience=3, save=False):

    # load model and tokenizer..
    try:
        transformer_container = load_transformer(name)
    except ValueError:
        logger.error("Invalid transformer name!")
        os._exit(0)
    model = transformer_container["model"]
    model = model.to(device)
    tokenizer = transformer_container["tokenizer"]

    # load batch_size and learning rate..
    params_container = transformer_params(name)
    batch_size = params_container["batch_size"]
    learning_rate = params_container["learning_rate"]

    # load train, dev and test datasets..
    train_dataset = SentimentDataset(dataset_name=dataset_name, mode="train")
    test_dataset = SentimentDataset(dataset_name=dataset_name, mode="test")

    # Intialize optimizer..
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Initialize training variables..
    best_acc = 0.0
    best_loss = np.inf
    stopping_step = 0
    best_model_name = None

    total_train_seconds = 0
    for epoch in range(epochs):

        start = time.time()
        train_acc, train_loss = train_epoch(
            model, tokenizer, train_dataset, optimizer, batch_size
        )
        end = time.time()
        total_train_seconds += end - start
        logger.info(
            f"epoch: {epoch+1}, transformer: {name}, train_loss: {train_loss:.4f}, train_acc: {train_acc*100:.2f}"
        )

        test_acc, test_loss, test_evaluation_metrics = eval_epoch(
            model, tokenizer, test_dataset, batch_size, "test"
        )
        logger.info(
            f"epoch: {epoch+1}, transformer: {name}, test_loss: {test_loss:.4f}, test_acc: {test_acc*100:.2f}"
        )
        logger.info(
            f"epoch: {epoch+1}, transformer: {name}, "
            f"test_precision: {test_evaluation_metrics['test_precision']*100:.2f}, "
            f"test_recall: {test_evaluation_metrics['test_recall']*100:.2f}, "
            f"test_f1_score: {test_evaluation_metrics['test_f1_score']*100:.2f}, "
            f"test_accuracy_score: {test_evaluation_metrics['test_accuracy']*100:.2f}"
        )
        logger.info(
            f"epoch: {epoch+1}, transformer: {name}, test_confusion_matrix: \n"
            f"{test_evaluation_metrics['test_confusion_matrix']}"
        )

        logger.info(
            f"Total training time elapsed: {timedelta(seconds=total_train_seconds)}"
        )
        logger.info(
            f"Mean time per train epoch: {timedelta(seconds=total_train_seconds/(epoch+1))}"
        )

        # save best model and delete previous ones...
        if save:
            if test_acc > best_acc:
                best_acc = test_acc
                model_name = "{}_epoch{}_model.pickle".format(name, epoch)
                save_model(model, model_name, best_model_name)

        # Implement early stopping here
        if test_loss < best_loss:
            best_loss = test_loss
            stopping_step = 0
        else:
            stopping_step += 1

        if stopping_step >= patience:
            logger.info("EarlyStopping!")
            os._exit(1)

In [47]:
CURRENT_DATASET = DATASET1
EPOCHS = 5

In [48]:
train(
    name="bert-base",
    dataset_name=CURRENT_DATASET,
    epochs=EPOCHS,
    patience=3,
    save=False,
)

2021-02-27 08:11:14.727 | INFO     | __main__:load_transformer:2 - Loading model bert-base!
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForSequenceClassification: ['cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Som