In [27]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from transformers import AutoTokenizer, AdamW, get_scheduler, AutoModelForSequenceClassification
import seaborn as sns
from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import warnings

warnings.filterwarnings('ignore')

In [2]:
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed(42)

if torch.cuda.is_available():
    device = torch.device('cuda:0')
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
else:
    device = torch.device('cpu')

print(device)

In [3]:
train_dataset_path = "../input/semeval2018/train_task_a_full_analyzed.txt"
test_dataset_path = "../input/semeval2018/test_task_a_full_analyzed.txt"

In [5]:
train_df = pd.read_csv(train_dataset_path, sep="\t")
test_df = pd.read_csv(test_dataset_path, sep="\t")
train_df, validation_df = train_test_split(train_df, test_size=0.2)

train_df.head()

In [6]:
value_counts = train_df["label"].value_counts()
sns.barplot(value_counts.index, value_counts)

In [7]:
class SarcasticSentenceDataset(Dataset):
    def __init__(self, sentences, labels, tokenizer, max_len=128):
        if len(sentences) != len(labels):
            raise ValueError("Sentences and labels should have the same number of elements.")

        self.sentences = sentences
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __getitem__(self, index: int):
        inputs = self.tokenizer(self.sentences[index],
                                truncation=True,
                                pad_to_max_length=True,
                                return_tensors="pt",
                                max_length=self.max_len)

        return {
            "input_ids": inputs["input_ids"].squeeze(),
            "attention_mask": inputs["attention_mask"].squeeze(),
            "labels": torch.tensor(self.labels[index], dtype=torch.long)
        }

    def __len__(self):
        return len(self.sentences)

In [8]:
tokenizer = AutoTokenizer.from_pretrained("vinai/bertweet-base", normalization=True)

In [9]:
train_dataset = SarcasticSentenceDataset(sentences=train_df["text"].tolist(),
                                         labels=train_df["label"].tolist(),
                                         tokenizer=tokenizer)

validation_dataset = SarcasticSentenceDataset(sentences=validation_df["text"].tolist(),
                                         labels=validation_df["label"].tolist(),
                                         tokenizer=tokenizer)

test_dataset = SarcasticSentenceDataset(sentences=test_df["text"].tolist(),
                                        labels=test_df["label"].tolist(),
                                        tokenizer=tokenizer)

In [10]:
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=8)
test_loader = DataLoader(test_dataset, batch_size=8)

In [11]:
from tqdm.auto import tqdm
import copy


def train(model,
          train_loader,
          eval_loader,
          device,
          lr=5e-5,
          num_epochs=5):
    num_training_steps = len(train_loader) * num_epochs
    optimizer, lr_scheduler = setup_optimizer_and_scheduler(model,
                                                            lr,
                                                            0,
                                                            num_training_steps)

    progress_bar = tqdm(range(num_training_steps))

    best_f1 = 0
    best_epoch = -1
    best_params = copy.deepcopy(model.state_dict())
    patience = 0

    for epoch in range(num_epochs):
        model.train()
        for batch in train_loader:
            batch = {k: v.to(device) for k, v in batch.items()}

            outputs = model(**batch)
            loss = outputs.loss
            loss.backward()

            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

            progress_bar.update(1)

        metrics = evaluate(model, eval_loader, device)
        print(f"validation accuracy: {metrics['accuracy']}\n"
              f"validation precision: {metrics['precision']}\n"
              f"validation recall: {metrics['recall']}\n"
              f"validation f1: {metrics['f1']}\n")

        if metrics["f1"] > best_f1:
            best_f1 = metrics["f1"]
            best_epoch = epoch
            best_params = copy.deepcopy(model.state_dict())
            patience = 0
        else:
            patience += 1

        print(f"patience: {patience}\n")
        if patience == 3:
            break

    print(f"best epoch: {best_epoch}\n"
          f"best f1: {best_f1}\n")

    model.load_state_dict(best_params)
    return model


def setup_optimizer_and_scheduler(model, lr, num_warmup_steps, num_training_steps):
    optimizer = AdamW(model.parameters(), lr=lr)
    scheduler = get_scheduler(name="linear",
                              optimizer=optimizer,
                              num_warmup_steps=num_warmup_steps,
                              num_training_steps=num_training_steps)
    return optimizer, scheduler


def evaluate(model, eval_loader, device):
    model.eval()
    labels_list = []
    preds_list = []
    with torch.no_grad():
        for batch in eval_loader:
            batch = {k: v.to(device) for k, v in batch.items()}

            outputs = model(**batch)
            _, preds = torch.max(outputs.logits, dim=1, keepdim=False)
            labels_list.extend(batch["labels"].cpu().numpy().tolist())
            preds_list.extend(preds.cpu().numpy().tolist())

    return compute_metrics(labels_list, preds_list)


def compute_metrics(y_true, y_pred):
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="binary")
    return {
        "accuracy": accuracy_score(y_true, y_pred),
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "confusion_matrix": confusion_matrix(y_true, y_pred)
    }

In [12]:
model = AutoModelForSequenceClassification.from_pretrained("vinai/bertweet-base", num_labels=2).to(device)

In [13]:
model = train(model,
              train_loader,
              validation_loader,
              device,
              num_epochs=10,
              lr=1e-5)

In [14]:
metrics = evaluate(model, test_loader, device)
print(f"test accuracy: {metrics['accuracy']}\n"
      f"test precision: {metrics['precision']}\n"
      f"test recall: {metrics['recall']}\n"
      f"test f1: {metrics['f1']}\n")

sns.heatmap(metrics["confusion_matrix"], annot=True, cmap='Blues', fmt="d")

In [17]:
import emoji
from transformers_interpret import SequenceClassificationExplainer


def get_mean_importance_and_prediction(model, tokenizer, df):
    model.eval()
    cls_explainer = SequenceClassificationExplainer(model, tokenizer)

    progress_bar = tqdm(range(len(df)))
    
    importance = []
    predictions = []
    for index, row in df.iterrows():
        text = row["text"]
        emojis = row["emojis"]
        
        attributions = cls_explainer(text)
        predictions.append(cls_explainer.predicted_class_index)
    
        if not isinstance(emojis, str):
            importance.append(np.nan)
            continue
    
        emoji_importance = []
        for emoji_tag in [emoji.demojize(e) for e in emojis]:
            for (token, token_importance) in attributions:
                if emoji_tag == token:
                    emoji_importance.append(token_importance)

        importance.append(np.mean(emoji_importance))
        progress_bar.update(1)
        
    return importance, predictions

In [18]:
importance, predictions = get_mean_importance_and_prediction(model, tokenizer, test_df)

In [22]:
test_df["emoji_importance"] = importance
test_df["predictions"] = predictions

In [32]:
test_df.to_csv("test_task_a_full_postprocessed_bertweet.txt", sep="\t", index=False)

In [24]:
from transformers_interpret import SequenceClassificationExplainer

cls_explainer = SequenceClassificationExplainer(model, tokenizer)
word_attributions = cls_explainer("Perfect time to get really sick  😫😷")
cls_explainer.visualize()

In [39]:
sns.regplot(test_df[test_df["predictions"] == 0]["emoji_sentiment_scores"], test_df[test_df["predictions"] == 0]["emoji_importance"])
sns.regplot(test_df[test_df["predictions"] == 1]["emoji_sentiment_scores"], test_df[test_df["predictions"] == 1]["emoji_importance"])

In [23]:
torch.save(model.state_dict(), "bertweet_full.pth")
print("model params saved")