# Import

In [None]:
!pip install datasets
!pip install transformers[torch]
!pip install optuna

In [None]:
import os
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer, BertForSequenceClassification
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from transformers import Trainer, TrainingArguments
from transformers import pipeline
from datasets import concatenate_datasets, load_dataset
from huggingface_hub import hf_hub_download
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import datasets
from datasets import Dataset, Value, ClassLabel, Features
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from transformers import DataCollatorWithPadding
import optuna
from huggingface_hub import notebook_login
from sklearn.model_selection import StratifiedKFold

# lables value
# 2 = positive
# 1 = neutral
# 0 = negative

# Datasets

## Training&Validation

In [None]:
dataset = load_dataset("HFFErica/PublicDatasets", data_files={'train': 'dataset_tweet_train.csv', 'validation': 'dataset_tweet_validation.csv'})
dataset

dataset_tweet_train.csv:   0%|          | 0.00/1.81M [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 23832
    })
    validation: Dataset({
        features: ['text', 'label'],
        num_rows: 10215
    })
})

In [None]:
features = dataset["train"].features.copy()
features["label"] = ClassLabel(names=['negative', 'neutral', 'positive'])
def adjust_labels(batch):
    batch["label"] = [sentiment  for sentiment  in batch["label"]]
    return batch
label_dataset = dataset.map(adjust_labels, batched=True, features=features)
label_dataset['train']

Map:   0%|          | 0/23832 [00:00<?, ? examples/s]

Map:   0%|          | 0/10215 [00:00<?, ? examples/s]

Dataset({
    features: ['text', 'label'],
    num_rows: 23832
})

In [None]:
features = dataset["validation"].features.copy()
features["label"] = ClassLabel(names=['negative', 'neutral', 'positive'])
def adjust_labels(batch):
    batch["label"] = [sentiment  for sentiment  in batch["label"]]
    return batch
label_dataset = dataset.map(adjust_labels, batched=True, features=features)
label_dataset['validation']

Dataset({
    features: ['text', 'label'],
    num_rows: 10215
})

In [None]:
label_dataset

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 23832
    })
    validation: Dataset({
        features: ['text', 'label'],
        num_rows: 10215
    })
})

## Test (Annotated reviews)

In [None]:
# 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
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
test_dataset = load_dataset("HFFErica/TBSA_sentiment_round2", data_files={'train': 'train_df_balanced.csv', 'test': 'test_df_imbalanced.csv'})
test_dataset

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 1209
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 1963
    })
})

In [None]:
# Test dataset
features = test_dataset["test"].features.copy()
features["label"] = ClassLabel(names=['negative', 'neutral', 'positive'])
def adjust_labels(batch):
    batch["label"] = [sentiment  for sentiment  in batch["label"]]
    return batch
test_label_dataset = test_dataset.map(adjust_labels, batched=True, features=features)
test_label_dataset

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 1209
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 1963
    })
})

In [None]:
print(label_dataset["train"].shape)
print(label_dataset["validation"].shape)
print(test_label_dataset["test"].shape)

(23833, 2)
(10215, 2)
(1963, 2)


In [None]:
train_dataset = label_dataset['train']
validation_dataset = label_dataset['validation']
test_dataset = test_label_dataset['test']

In [None]:
print(train_dataset)
print(validation_dataset)
print(test_dataset)

Dataset({
    features: ['text', 'label'],
    num_rows: 23832
})
Dataset({
    features: ['text', 'label'],
    num_rows: 10215
})
Dataset({
    features: ['text', 'label'],
    num_rows: 1963
})


In [None]:
train_dataset = train_dataset.map(lambda examples: {'labels': examples['label']}, batched=True)
validation_dataset = validation_dataset.map(lambda examples: {'labels': examples['label']}, batched=True)
test_dataset = test_dataset.map(lambda examples: {'labels': examples['label']}, batched=True)

Map:   0%|          | 0/23832 [00:00<?, ? examples/s]

Map:   0%|          | 0/10215 [00:00<?, ? examples/s]

# Model

In [None]:
model_id="cardiffnlp/twitter-roberta-base-sentiment"
model = AutoModelForSequenceClassification.from_pretrained(model_id,
                                                           num_labels=train_dataset.features["label"].num_classes,
                                                          ignore_mismatched_sizes=True)
tokenizer = AutoTokenizer.from_pretrained(model_id)
model.config

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


RobertaConfig {
  "_name_or_path": "cardiffnlp/twitter-roberta-base-sentiment-latest",
  "architectures": [
    "RobertaForSequenceClassification"
  ],
  "attention_probs_dropout_prob": 0.1,
  "bos_token_id": 0,
  "classifier_dropout": null,
  "eos_token_id": 2,
  "gradient_checkpointing": false,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "id2label": {
    "0": "negative",
    "1": "neutral",
    "2": "positive"
  },
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "label2id": {
    "negative": 0,
    "neutral": 1,
    "positive": 2
  },
  "layer_norm_eps": 1e-05,
  "max_position_embeddings": 514,
  "model_type": "roberta",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 1,
  "position_embedding_type": "absolute",
  "torch_dtype": "float32",
  "transformers_version": "4.44.2",
  "type_vocab_size": 1,
  "use_cache": true,
  "vocab_size": 50265
}

In [None]:
MAX_LENGTH = 256
train_dataset = train_dataset.map(lambda e: tokenizer(e['text'], truncation=True, padding='max_length', max_length=MAX_LENGTH), batched=True)
validation_dataset = validation_dataset.map(lambda e: tokenizer(e['text'], truncation=True, padding='max_length', max_length=MAX_LENGTH), batched=True)
test_dataset = test_dataset.map(lambda e: tokenizer(e['text'], truncation=True, padding='max_length', max_length=MAX_LENGTH), batched=True)

Map:   0%|          | 0/23832 [00:00<?, ? examples/s]

Map:   0%|          | 0/10215 [00:00<?, ? examples/s]

Map:   0%|          | 0/1963 [00:00<?, ? examples/s]

In [None]:
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
validation_dataset.set_format(type='torch', columns=['input_ids',  'attention_mask', 'labels'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

# Hyperparameters search

In [None]:
def model_init():
    return AutoModelForSequenceClassification.from_pretrained(model_id, return_dict=True)

In [None]:
from sklearn.metrics import f1_score
# Define a function to compute the F1 score
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    f1 = f1_score(labels, predictions, average='weighted')
    return {"f1": f1}

In [None]:
training_args = TrainingArguments("test",  eval_steps=500, disable_tqdm=True, do_eval=True, evaluation_strategy='steps')
trainer = Trainer(
    args=training_args,
    data_collator=DataCollatorWithPadding(tokenizer),
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    # eval_dataset=test_dataset,
    model_init=model_init,
    compute_metrics=compute_metrics,
)

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
def hp_space_optuna(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 5e-5, log=True),
        "num_train_epochs": trial.suggest_int("num_train_epochs", 2, 4),
        "per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [16, 32, 64]),
    }

In [None]:
best_run = trainer.hyperparameter_search(direction="maximize", hp_space=hp_space_optuna, n_trials=10)
print(f"Best run: {best_run}")

# Trainer

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
training_args = TrainingArguments(
    output_dir='./Best_SA_results',          # output directory
    # learning_rate=5.1656126323445385e-05,
    # learning_rate=1.5465461778123127e-05,
    learning_rate=2.7395389817403357e-05,

    num_train_epochs=2,              # total number of training epochs
    per_device_train_batch_size=16,  # batch size per device during training
    per_device_eval_batch_size=16,   # batch size for evaluation
    logging_dir='./logs',            # directory for storing logs
    logging_steps=100,
    do_train=True,
    do_eval=True,
    no_cuda=False,
    load_best_model_at_end=True,
    eval_steps=100,
    evaluation_strategy="steps"
)


trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    compute_metrics=compute_metrics
)

train_out = trainer.train()
train_out

# CM

In [None]:
def plot_confusion_matrix(y_preds, y_true, labels):
#     cm = confusion_matrix(y_true, y_preds)
    cm = confusion_matrix(y_true, y_preds, normalize="true")
    fig, ax = plt.subplots(figsize=(10, 10))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(cmap="Blues", values_format=".2f", ax=ax, colorbar=False)
    plt.title("confusion matrix")
    plt.show()

In [None]:
y_valid = np.array(test_dataset["label"])
preds_output = trainer.predict(test_dataset)
preds_output.metrics

In [None]:
y_preds = np.argmax(preds_output.predictions, axis=1)
labels = train_dataset.features["label"].names
plot_confusion_matrix(y_preds, y_valid, labels)

In [None]:
def plot_confusion_matrix(y_preds, y_true, labels):
    cm = confusion_matrix(y_true, y_preds)
    # cm = confusion_matrix(y_true, y_preds, normalize="true")
    fig, ax = plt.subplots(figsize=(10, 10))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(cmap="Blues", values_format=".2f", ax=ax, colorbar=False)
    plt.title("confusion matrix")
    plt.show()

In [None]:
y_valid = np.array(test_dataset["label"])
preds_output = trainer.predict(test_dataset)
preds_output.metrics

In [None]:
y_preds = np.argmax(preds_output.predictions, axis=1)
labels = train_dataset.features["label"].names
plot_confusion_matrix(y_preds, y_valid, labels)