In [6]:
import pandas as pd
import re
from sklearn.model_selection import train_test_split # type: ignore
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.metrics import roc_auc_score, roc_curve

import torch
from transformers import BertweetTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
from transformers import DataCollatorWithPadding, EarlyStoppingCallback
from datasets import Dataset

import matplotlib.pyplot as plt
import seaborn as sns

ImportError: cannot import name 'PreTrainedModel' from 'transformers' (/Applications/anaconda3/envs/IFN580/lib/python3.12/site-packages/transformers/__init__.py)

In [None]:
df = pd.read_csv("datasets/hydrogen.csv")
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   label   5000 non-null   int64 
 1   text    5000 non-null   object
dtypes: int64(1), object(1)
memory usage: 78.3+ KB


## Analyse

In [None]:
df["label"].unique()

array([0, 1])

In [None]:
df["text"].iloc[88]

'discover dentists everything dental all in one place'

In [None]:
df[df["label"] == 0].head(5)

Unnamed: 0,label,text
0,0,theres hydrogen and helium then lithium beryll...
1,0,theres hydrogen and helium then lithium beryll...
2,0,got called the square in a brony aerospace mee...
3,0,in a hydrogen war ravaged society the nubile y...
4,0,i am made of flesh lotsa carbon and hydrogen a...


In [None]:
df['label'].value_counts()

label
1    2736
0    2264
Name: count, dtype: int64

In [None]:
random_state = 10

X = df['text'].values
y = df['label'].values

X_train, X_test, y_train, y_test = train_test_split(X, y, stratify = y, test_size = 0.3, random_state = random_state)

print("Training set size:", len(X_train))
print("Testing set size:", len(X_test))

NameError: name 'train_test_split' is not defined

## Tokenizing data

In [None]:
train_df = pd.DataFrame({"text": X_train, "label": y_train})
test_df = pd.DataFrame({"text": X_test, "label": y_test})
train_ds = Dataset.from_pandas(train_df)
test_ds = Dataset.from_pandas(test_df)

print("Train dataset:", train_ds)
print("Test dataset:", test_ds)

In [None]:
model_name = "vinai/bertweet-base"
tokenizer = BertweetTokenizer.from_pretrained(model_name)

In [None]:
def tokenize(batch):
    return tokenizer(batch['text'], truncation=True, padding = True)

train_ds = train_ds.map(tokenize, batched = True)
test_ds = test_ds.map(tokenize, batched = True)

In [None]:
train_ds

## Loading the Model

In [None]:
try:
    del model
except NameError:
    pass

model = RobertaForSequenceClassification.from_pretrained(
    model_name, num_labels = df["label"].nunique(),
    problem_type="single_label_classification")

## Training the Model

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    prec, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="binary", pos_label = 1)
    
    return {
        "accuracy": acc,
        "precision": prec,
        "recall": recall,
        "f1": f1
    }

In [None]:
training_args = TrainingArguments(
    output_dir = "./results",
    num_train_epochs = 50,
    per_device_train_batch_size = 32,
    per_device_eval_batch_size = 64,
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate = 2e-5,
    weight_decay = 0.01,
    logging_dir = "./logs",
    logging_steps = 10,

    metric_for_best_model = "loss",
    load_best_model_at_end = True
)

In [None]:
model.train()

trainer = Trainer(
    model = model,
    args = training_args,
    train_dataset = train_ds,
    eval_dataset = test_ds,
    data_collator = DataCollatorWithPadding(tokenizer),
    callbacks = [EarlyStoppingCallback(early_stopping_patience = 3)]
)

trainer.train()

## Model Evaluation

In [None]:
model.eval()

train_results = trainer.evaluate(train_ds)
test_results = trainer.evaluate(test_ds)

In [None]:
def display_evaluation(setname, results):
    print(f"{setname} Set Accuracy:", round(results["eval_accuracy"], 3))
    print(f"{setname} Set Precision:", round(results["eval_precision"], 3))
    print(f"{setname} Set Recall:", round(results["eval_recall"], 3))
    print(f"{setname} Set F1 Score:", round(results["eval_f1"], 3))

display_evaluation("Training", train_results)
display_evaluation("Testing", test_results)

In [None]:
training_args_1 = TrainingArguments(
    output_dir = "./results",
    num_train_epochs = 3,
    per_device_train_batch_size = 32,
    per_device_eval_batch_size = 64,
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate = 1e-5,
    weight_decay = 0.01,
    logging_dir = "./logs",
    logging_steps = 10
)

In [None]:
model_1 = RobertaForSequenceClassification.from_pretrained(
    model_name, num_labels = df["label"].nunique(),
    problem_type="single_label_classification")

model_1.train()

trainer_1 = Trainer(
    model = model,
    args = training_args_1,
    train_dataset = train_ds,
    eval_dataset = test_ds,
    data_collator = DataCollatorWithPadding(tokenizer),
    compute_metrics = compute_metrics
)

trainer.train()

In [None]:
model_1.eval()

train_results_1 = trainer_1.evaluate(train_ds)
test_results_1 = trainer_1.evaluate(test_ds)

display_evaluation("Training", train_results_1)
display_evaluation("Testing", test_results_1)

In [None]:
training_args_2 = TrainingArguments(
    output_dir = "./results",
    num_train_epochs = 3,
    per_device_train_batch_size = 32,
    per_device_eval_batch_size = 64,
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate = 5e-5,
    weight_decay = 0.01,
    logging_dir = "./logs",
    logging_steps = 10
)

In [None]:
model_2 = RobertaForSequenceClassification.from_pretrained(
    model_name, num_labels = df["label"].nunique(),
    problem_type="single_label_classification")

model_2.train()

trainer_2 = Trainer(
    model = model,
    args = training_args_2,
    train_dataset = train_ds,
    eval_dataset = test_ds,
    data_collator = DataCollatorWithPadding(tokenizer),
    compute_metrics = compute_metrics
)

trainer_2.train()

In [None]:
model_2.eval()

train_results_2 = trainer_2.evaluate(train_ds)
test_results_2 = trainer_2.evaluate(test_ds)

display_evaluation("Training", train_results_2)
display_evaluation("Testing", test_results_2)

## Early Stopping

In [None]:
training_args = TrainingArguments(
    output_dir = "./results",
    num_train_epochs = 50,
    per_device_train_batch_size = 32,
    per_device_eval_batch_size = 64,
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate = 2e-5,
    weight_decay = 0.01,
    logging_dir = "./logs",
    logging_steps = 10,

    metric_for_best_model = "loss",
    load_best_model_at_end = True
)

trainer = Trainer(
    model = model,
    args = training_args,
    train_dataset = train_ds,
    eval_dataset = test_ds,
    data_collator = DataCollatorWithPadding(tokenizer),
    compute_metrics = compute_metrics,
    callbacks = [EarlyStoppingCallback(early_stopping_patience = 3)],
)

trainer.train()

## Bert-based (uncased)

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline

pipe = pipeline("fill-mask", model="google-bert/bert-base-uncased")

# Load model directly
from transformers import BertTokenizer, BertForSequenceClassification

tokenizer_bertbased = BertTokenizer.from_pretrained("google-bert/bert-base-uncased")
model_bertbased = BertForSequenceClassification.from_pretrained("google-bert/bert-base-uncased")

In [None]:
train_df_bert = pd.DataFrame({"text": X_train, "label": y_train})
test_df_bert = pd.DataFrame({"text": X_test, "label": y_test})
train_ds_bert = Dataset.from_pandas(train_df_bert)
test_ds_bert = Dataset.from_pandas(test_df_bert)

train_ds_bert = train_ds_bert.map(tokenize, batched=True)
test_ds_bert = test_ds_bert.map(tokenize, batched=True)

In [None]:
training_args_bert = TrainingArguments(
    output_dir = "./results",
    num_train_epochs = 3,
    per_device_train_batch_size = 32,
    per_device_eval_batch_size = 64,
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate = 2e-5,
    weight_decay = 0.01,
    logging_dir = "./logs",
    logging_steps = 10
)

model_bertbased.train()

trainer_bert = Trainer(
    model = model_bertbased,
    args = training_args_bert,
    train_dataset = train_ds_bert,
    eval_dataset = test_ds_bert,
    data_collator = DataCollatorWithPadding(tokenizer_bertbased),
    compute_metrics = compute_metrics
)

trainer_bert.train()

model_bertbased.eval()

train_results_2 = trainer_2.evaluate(train_ds_bert)
test_results_2 = trainer_2.evaluate(test_ds_bert)

display_evaluation("Training", train_results_2)
display_evaluation("Testing", test_results_2)

In [None]:
batch_sizes = [8, 16]

learning_rate = [1e-5, 2e-5, 3e-5]

weight_decay = [0.1, 0.01, 0.001]


for lr in learning_rate:
    for size in batch_size:
        for weight in weight_decay:
            try:
                del model
            except NameError:
                pass

            training_args_bert = TrainingArguments(
                output_dir = "./results",
                num_train_epochs = 20,
                per_device_train_batch_size = size,
                per_device_eval_batch_size = 64,
                eval_strategy = "epoch",
                save_strategy = "epoch",
                learning_rate = lr,
                weight_decay = weight,
                logging_dir = "./logs",
                logging_steps = 10,

                metric_for_best_model = "loss",
                load_best_model_at_end = True
            )
            
            model_bertbased.train()
            trainer_bert = Trainer(
                model = model_bertbased,
                args = training_args_bert,
                train_dataset = train_ds_bert,
                eval_dataset = test_ds_bert,
                processing_class = tokenizer_bertbased,
                data_collator = DataCollatorWithPadding(tokenizer_bertbased),
                compute_metrics = compute_metrics,

                callbacks = [EarlyStoppingCallback(early_stopping_patience = 3)]
            )

            trainer_bert.train()

            model_bertbased.eval()
            
            train_results_2 = trainer_2.evaluate(train_ds_bert)
            test_results_2 = trainer_2.evaluate(test_ds_bert)
            
            display_evaluation("Training", train_results_2)
            
            display_evaluation("Testing", test_results_2)
    

## Examining Attention Weights

In [None]:
def compute_attention_matrix(tokenizer, model, text):
    tokens  = tokenizer(text, return_tensors = "pt").to(model.device)

    with torch.no_grad():
        pred = model(**tokens, output_attentions = True)

    attentions = torch.stack(pred.attentions).cpu()

    attentions = attentions.squeeze(1)

    attentions = attentions.mean(dim = 0).mean(dim = 0)

    pred_class = pred.logits.cpu().argmax(-1).item()

    token_strs = tokenizer.convert_ids_to_tokens(tokens["input_ids"][0])

    return (attentions, pred_class, token_strs)

In [None]:
def plot_attention(attentions, tokens, title):
    plt.figure(figsize = (10,8))
    plt.title(title)

    sns.heatmap(attentions,
    xticklabels = tokens,
    yticklabels = tokens,
    cmap = 'binary',
    cbar = True
               )
    plt.show()

In [None]:
def display_attention_matrix(tokenizer, model, text):
    attention, pred_class, tokens = compute_attention_matrix(tokenizer, model, text)
    pred_label = "Relevant" if pred_class == 1 else "Not relevant"
    plot_attention(attention, tokens, text + f"\nPredicted class: {pred_label}")

In [None]:
display_attention_matrix(tokenizer, model, df[df["label"] == 0].iloc[0]["text"])

## Computing the ROC Curve

In [None]:
pred = trainer.predict(test_ds)
pred_small = small_trainer.predict(small_test_ds)

In [None]:
pred_probs = torch.nn.functional.softmax(torch.Tensor(pred.predictions)).numpy()
pred_probs_small = torch.nn.functional.softmax(torch.Tensor(pred_small.predictions)).numpy()

In [None]:
roc_index_nn = roc_auc_score(y_test, pred_probs[:, 1])
roc_index_small = roc_auc_score(y_small_test, pred_probs_small[:, 1])

fpr_nn, tpr_nn, bertweet_thresholds_nn = roc_curve(y_test, pred_probs[:, 1])
fpr_nn_small, tpr_nn_small, bertweet_thresholds_nn_small = roc_curve(y_small_test, pred_probs_small[:, 1])

plt.plot(fpr_nn, tpr_nn, label = "BERTweet Model: {:.3f}".format(roc_index_nn), color = 'red', lw = 0.5)
plt.plot(fpr_nn_small, tpr_nn_small, label = "BERTweet Small Model: {:.3f}".format(roc_index_small), color = 'green', lw = 0.5)
plt.plot([0, 1], [0, 1], color = 'navy', lw = 0.5, linestyle = '--')
plt.xlim([0.0,1.0])
plt.ylim([0.0,1.0])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver operating characteristic for positive sentiment")
plt.legend(loc = "lower right")
plt.show()

## Logistic Regression Model

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score

scaler = StandardScaler()

tfidf = pd.read_csv("datasets/tfidf_features.csv")

tfidf.drop(['tweet_id'], axis = 1, inplace = True)

y_lr = df['label'].values

X_lr = tfidf

X_lr = pd.get_dummies(X_lr)

random_state = 10
test_set_size = 0.3

X_train_lr, X_test_lr, y_train_lr, y_test_lr = train_test_split(X_lr.values, y_lr, test_size = test_set_size,
                                                    stratify = y_lr, random_state = random_state)

X_train_lr = scaler.fit_transform(X_train_lr, y_train_lr)

X_test_lr = scaler.transform(X_test_lr)

In [None]:
from sklearn.linear_model import LogisticRegression

lr_model = LogisticRegression(random_state = random_state)

lr_model.fit(X_train_lr, y_train_lr)

In [None]:
from sklearn.metrics import classification_report

#training and test accuracy
print("Train accuracy:", lr_model.score(X_train_lr, y_train_lr))
print("Test accuracy:", lr_model.score(X_test_lr, y_test_lr))

# classification report on test data

y_pred_lr = lr_model.predict(X_test_lr)

print(classification_report(y_test_lr, y_pred_lr))

In [None]:
pred_probs_lr = lr_model.predict_proba(X_test_lr)

roc_index_lr = roc_auc_score(y_test, pred_probs_lr[:, 1])

fpr_lr, tpr_lr, thresholds_lr = roc_curve(y_test_lr, pred_probs_lr[:, 1])

print("ROC index on test for lr model:", roc_index_lr)

In [None]:
plt.plot(fpr_nn, tpr_nn, label = "BERTweet Model: {:.3f}".format(roc_index_nn), color = 'red', lw = 0.5)
plt.plot(fpr_lr, tpr_lr, label = f'Logistic Regression model {roc_index_lr:.3f}', color = 'green', lw = 0.5)
plt.plot([0, 1], [0, 1], color = 'navy', lw = 0.5, linestyle = '--')
plt.xlim([0.0,1.0])
plt.ylim([0.0,1.0])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver operating characteristic for positive sentiment")
plt.legend(loc = "lower right")
plt.show()