In [None]:
#!pip install -U datasets optuna

In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"
os.environ["CUDA_VISIBLE_DEVICES"] = "7"

In [None]:
from datasets import load_dataset
from transformers import set_seed, AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding
import torch
import numpy as np
from sklearn.metrics import recall_score, precision_score, f1_score
import optuna
import pandas as pd

In [None]:
seed = 42
set_seed(seed)

In [None]:
train = load_dataset("brighter-dataset/BRIGHTER-emotion-categories", "rus", split="train")
val = load_dataset("brighter-dataset/BRIGHTER-emotion-categories", "rus", split="dev")
test = load_dataset("brighter-dataset/BRIGHTER-emotion-categories", "rus", split="test")

In [None]:
emotion_cols = ['anger', 'fear', 'joy', 'disgust', 'sadness', 'surprise']

In [None]:
def create_labels(examples):
    labels = []
    for i in range(len(examples['text'])):
        label = [float(examples[col][i]) for col in emotion_cols]
        labels.append(label)
    examples['labels'] = labels
    return examples

train = train.map(create_labels, batched=True)
val = val.map(create_labels, batched=True)
test = test.map(create_labels, batched=True)

In [None]:
model_name = "DeepPavlov/rubert-base-cased-conversational"
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
def tokenize_function(examples):
    return tokenizer(examples['text'], truncation=True, padding=False, max_length=512)

train_tokenized = train.map(tokenize_function, batched=True)
val_tokenized = val.map(tokenize_function, batched=True)

In [None]:
train_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "labels"])
val_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

In [None]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = torch.sigmoid(torch.tensor(predictions)).numpy()
    y_pred = predictions > 0.5

    results = {}
    for average in ['micro', 'macro']:
        results[f'{average}_recall'] = recall_score(labels, y_pred, average=average, zero_division=0)
        results[f'{average}_precision'] = precision_score(labels, y_pred, average=average, zero_division=0)
        results[f'{average}_f1'] = f1_score(labels, y_pred, average=average, zero_division=0)

    return results

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
def objective(trial):
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 5e-4, log=True)
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32])
    weight_decay = trial.suggest_float("weight_decay", 0.0, 0.3)
    warmup_steps = trial.suggest_int("warmup_steps", 0, 500)
    num_epochs = trial.suggest_int("num_train_epochs", 2, 7)

    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=len(emotion_cols),
        problem_type="multi_label_classification"
    )

    training_args = TrainingArguments(
        output_dir=f'./results/trial_{trial.number}',
        num_train_epochs=num_epochs,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        logging_steps=50,
        eval_strategy="epoch",
        metric_for_best_model="eval_macro_f1",
        logging_dir=f'./logs/trial_{trial.number}',
        save_strategy="no",
        report_to=None,
        seed=seed,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_tokenized,
        eval_dataset=val_tokenized,
        compute_metrics=compute_metrics,
        data_collator=data_collator,
    )

    trainer.train()

    eval_results = trainer.evaluate()

    return eval_results["eval_macro_f1"]

In [None]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

print("\nBest hyperparameters:")
print(study.best_params)
print(f"Best macro F1: {study.best_value:.4f}")

In [None]:
best_params = study.best_params
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=len(emotion_cols),
    problem_type="multi_label_classification"
)

training_args = TrainingArguments(
    output_dir='./best_model',
    num_train_epochs=best_params["num_train_epochs"],
    per_device_train_batch_size=best_params["batch_size"],
    per_device_eval_batch_size=best_params["batch_size"],
    learning_rate=best_params["learning_rate"],
    weight_decay=best_params["weight_decay"],
    warmup_steps=best_params["warmup_steps"],
    logging_steps=50,
    eval_strategy="epoch",
    save_strategy="no",
    logging_dir='./logs/best_model',
    seed=seed,
)

from transformers import DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=val_tokenized,
    compute_metrics=compute_metrics,
    data_collator=data_collator,
)

trainer.train()

In [None]:
def find_best_threshold(model, val_dataset, thresholds=np.arange(0.1, 0.9, 0.05)):
    model.eval()
    predictions = trainer.predict(val_dataset)
    probs = torch.sigmoid(torch.tensor(predictions.predictions)).numpy()
    true_labels = predictions.label_ids
    best_threshold = 0.5
    best_f1 = 0
    print("Threshold optimization:")
    for threshold in thresholds:
        y_pred = probs > threshold
        f1_macro = f1_score(true_labels, y_pred, average='macro', zero_division=0)
        print(f"Threshold {threshold:.2f}: Macro F1 = {f1_macro:.4f}")
        if f1_macro > best_f1:
            best_f1 = f1_macro
            best_threshold = threshold
    print(f"\nBest threshold: {best_threshold:.2f} (Macro F1: {best_f1:.4f})")
    return best_threshold

In [None]:
best_threshold = find_best_threshold(model, val_tokenized)

In [None]:
test_tokenized = test.map(tokenize_function, batched=True)
test_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

test_predictions = trainer.predict(test_tokenized)
test_probs = torch.sigmoid(torch.tensor(test_predictions.predictions)).numpy()
test_pred_labels = test_probs > best_threshold
true_test_labels = test_predictions.label_ids

In [None]:
for average in ['micro', 'macro']:
    recall = recall_score(true_test_labels, test_pred_labels, average=average, zero_division=0)
    precision = precision_score(true_test_labels, test_pred_labels, average=average, zero_division=0)
    f1 = f1_score(true_test_labels, test_pred_labels, average=average, zero_division=0)
    print(f'{average.upper()} recall: {round(recall, 4)}, precision: {round(precision, 4)}, f1: {round(f1, 4)}')

print(f"\nPer-class Results:")
class_recall = recall_score(true_test_labels, test_pred_labels, average=None, zero_division=0)
class_precision = precision_score(true_test_labels, test_pred_labels, average=None, zero_division=0)
class_f1 = f1_score(true_test_labels, test_pred_labels, average=None, zero_division=0)

for i, emotion in enumerate(emotion_cols):
    print(f'{emotion.upper()}: recall: {round(class_recall[i], 4)}, precision: {round(class_precision[i], 4)}, f1: {round(class_f1[i], 4)}')

print(f"\nClass distribution in test set:")
for i, emotion in enumerate(emotion_cols):
    true_count = int(true_test_labels[:, i].sum())
    pred_count = int(test_pred_labels[:, i].sum())
    total = len(true_test_labels)
    print(f'{emotion.upper()}: true: {true_count}/{total} ({true_count/total:.1%}), predicted: {pred_count}/{total} ({pred_count/total:.1%})')