<a href="https://colab.research.google.com/github/kamranr123/kamranr123.github.io/blob/master/fine_tune_multilingual_bert_on_emotions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
# Install dependencies
!pip install -q transformers datasets accelerate

In [None]:
!wget -P /content https://github.com/nazaninsbr/Persian-Emotion-Detection/raw/refs/heads/main/dataset.csv

In [1]:
import os
import pandas as pd
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
import torch

# Disable W&B
os.environ["WANDB_DISABLED"] = "true"


In [2]:
import pandas as pd
import numpy as np
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split

# Load dataset
df = pd.read_csv("/content/dataset.csv")

# Emotion label columns (vote counts 0-5)
label_cols = ["Anger", "Fear", "Happiness", "Hatred", "Sadness", "Wonder"]

# Binarize emotions: 1 if >=3 votes (majority), else 0
df[label_cols] = df[label_cols].apply(lambda row: [1 if x >= 3 else 0 for x in row], axis=1, result_type='expand')

# Add Neutral: 1 if no emotions have majority (sum of binarized emotions == 0)
df["Neutral"] = (df[label_cols].sum(axis=1) == 0).astype(int)

# Final labels
final_labels = label_cols + ["Neutral"]
df["labels"] = df[final_labels].apply(lambda row: [float(x) for x in row], axis=1)

# Keep only text + labels
df = df[["text", "labels"]]

# Train/validation split
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

# Create Hugging Face datasets
train_ds = Dataset.from_pandas(train_df.reset_index(drop=True))
val_ds = Dataset.from_pandas(val_df.reset_index(drop=True))
dataset = DatasetDict({"train": train_ds, "validation": val_ds})

# Verify
train_labels = np.array(dataset["train"]["labels"])
val_labels = np.array(dataset["validation"]["labels"])
print("Unique training label values:", np.unique(train_labels))
print("Unique validation label values:", np.unique(val_labels))
print("Training label frequencies:", np.sum(train_labels, axis=0) / train_labels.shape[0])
print("Validation label frequencies:", np.sum(val_labels, axis=0) / val_labels.shape[0])

Unique training label values: [0. 1.]
Unique validation label values: [0. 1.]
Training label frequencies: [0.05570833 0.02308333 0.023625   0.042125   0.05891667 0.033625
 0.796125  ]
Validation label frequencies: [0.04916667 0.02266667 0.02083333 0.04083333 0.05933333 0.02983333
 0.80483333]


In [None]:
import numpy as np
from datasets import Dataset

# Check current label values
train_labels = np.array(dataset["train"]["labels"])
val_labels = np.array(dataset["validation"]["labels"])
print("Unique training label values (before):", np.unique(train_labels))
print("Unique validation label values (before):", np.unique(val_labels))

# Binarize labels with a threshold (e.g., 0.5)
thresholds = [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.05]  # Lower threshold for Label 6
def binarize_labels(example):
    labels = np.array(example["labels"])
    example["labels"] = (labels > np.array(thresholds)).astype(int)
    return example

dataset["train"] = dataset["train"].map(binarize_labels)
dataset["validation"] = dataset["validation"].map(binarize_labels)

# Verify binarization
train_labels = np.array(dataset["train"]["labels"])
val_labels = np.array(dataset["validation"]["labels"])
print("Unique training label values (after):", np.unique(train_labels))
print("Unique validation label values (after):", np.unique(val_labels))
print("Training label frequencies:", np.sum(train_labels, axis=0) / train_labels.shape[0])
print("Validation label frequencies:", np.sum(val_labels, axis=0) / val_labels.shape[0])

In [None]:
import nlpaug.augmenter.word as naw
aug = naw.SynonymAug(aug_p=0.3)
def augment_data(example):
    example["text"] = aug.augment(example["text"])[0]
    return example
dataset["train"] = dataset["train"].map(augment_data)

In [None]:
for i in range(10):
  print(dataset["train"][i]['labels'])

In [None]:
# model_name = "google-bert/bert-base-multilingual-cased"
model_name = "HooshvareLab/bert-fa-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=256)

dataset = dataset.map(tokenize, batched=True)
# dataset = dataset.remove_columns(["text"])


In [10]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=7,
    problem_type="multi_label_classification"
)
model.config.hidden_dropout_prob = 0.3
model.config.attention_probs_dropout_prob = 0.3

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at HooshvareLab/bert-fa-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [14]:
import numpy as np
from sklearn.metrics import f1_score, accuracy_score, hamming_loss

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    probs = torch.sigmoid(torch.tensor(logits)).numpy()
    labels = labels.astype(int)

    thresholds = np.arange(0.1, 0.6, 0.1)
    best_preds = np.zeros_like(labels)
    best_thresholds = np.ones(labels.shape[1]) * 0.5
    for i in range(labels.shape[1]):
        best_f1 = 0
        for t in thresholds:
            preds_i = (probs[:, i] > t).astype(int)
            f1_i = f1_score(labels[:, i], preds_i, average='binary')
            if f1_i > best_f1:
                best_f1 = f1_i
                best_thresholds[i] = t
                best_preds[:, i] = preds_i

    f1_macro = f1_score(labels, best_preds, average='macro')
    acc = accuracy_score(labels, best_preds)
    hamming = hamming_loss(labels, best_preds)
    per_label_f1 = f1_score(labels, best_preds, average=None)

    print(f"Best thresholds: {best_thresholds}")
    print(f"Positive predictions: {np.sum(best_preds, axis=0)}")
    print(f"Positive true labels: {np.sum(labels, axis=0)}")
    print(f"Average logits: {np.mean(logits, axis=0)}")
    print(f"Average probabilities: {np.mean(probs, axis=0)}")

    metrics = {"accuracy": acc, "f1_macro": f1_macro, "hamming_loss": hamming}
    for i, f1_score_label in enumerate(per_label_f1):
        metrics[f"f1_label_{i}"] = f1_score_label
    return metrics

In [5]:
import shutil
shutil.rmtree('/content/bert-persian-emotions')

In [12]:
from transformers import TrainingArguments, DataCollatorWithPadding

args = TrainingArguments(
    output_dir="/content/bert-persian-emotions",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=8,
    gradient_accumulation_steps=8,
    per_device_eval_batch_size=8,
    num_train_epochs=10,
    weight_decay=0.1,
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro",
    greater_is_better=True,
    lr_scheduler_type="cosine",
    warmup_steps=500,
    max_grad_norm=1.0,
    fp16=True,
    report_to="none",
    optim="adamw_torch_fused",
)

In [None]:
from transformers import Trainer
import torch.nn as nn

# Focal loss
class FocalLoss(nn.Module):
    def __init__(self, alpha=1.0, gamma=3.0, label_smoothing=0.1):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.label_smoothing = label_smoothing
    def forward(self, logits, targets):
        targets = targets * (1 - self.label_smoothing) + 0.5 * self.label_smoothing
        bce_loss = nn.BCEWithLogitsLoss(reduction='none')(logits, targets)
        pt = torch.exp(-bce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * bce_loss
        return focal_loss.mean()

class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels").float()
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = FocalLoss(alpha=1.0, gamma=3.0)
        loss = loss_fct(logits, labels)
        return (loss, outputs) if return_outputs else loss

trainer = WeightedTrainer(
    model=model,
    args=args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["validation"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics  # your F1/multi-label metrics
)


trainer.train()

  trainer = WeightedTrainer(


Epoch,Training Loss,Validation Loss


In [32]:
print("Unique label values:", np.unique(small_dataset["validation"]["labels"]))

Unique label values: [0.         0.07692308 0.08333333 0.08333333 0.09090909 0.1
 0.1        0.1        0.11111111 0.11111111 0.11111111 0.125
 0.125      0.14285714 0.14285714 0.15384615 0.16666667 0.16666667
 0.18181818 0.2        0.2        0.2        0.22222222 0.22222222
 0.22222222 0.23076923 0.25       0.25       0.27272727 0.28571429
 0.28571429 0.3        0.3        0.3        0.30769231 0.33333333
 0.33333333 0.33333333 0.36363636 0.375      0.375      0.4
 0.42857143 0.42857143 0.44444444 0.44444444 0.44444444 0.5
 0.5        0.57142857 0.57142857 0.6        0.66666667 0.75
 1.        ]


In [None]:
texts = [
    "من امروز خیلی خوشحالم",      # Happy
    "احساس می‌کنم ناراحت و خسته‌ام", # Sad
    "از تاریکی می‌ترسم",           # Fear
    "قدم زدن زیر بارون شاید بهترین مسکن درد هاست..." # Neutral
]

inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=128)
outputs = model(**inputs)
probs = torch.sigmoid(outputs.logits).detach().numpy()

for text, p in zip(texts, probs):
    labels_pred = [final_labels[i] for i, v in enumerate(p) if v > 0.5]
    print(text, "->", labels_pred)


In [None]:
import numpy as np
from sklearn.metrics import f1_score, classification_report

# collect true labels and preds on validation set
preds_logits = trainer.predict(dataset["validation"]).predictions  # raw logits
probs = 1 / (1 + np.exp(-preds_logits))
preds = (probs > 0.5).astype(int)

# binarize references (soft labels → 0/1)
refs = np.stack([ex["labels"] for ex in dataset["validation"]])
refs_bin = (refs >= 0.5).astype(int)

# overall per-label counts
pos_counts = refs_bin.sum(axis=0)
neg_counts = refs_bin.shape[0] - pos_counts
print("pos counts per label:", pos_counts)
print("neg counts per label:", neg_counts)

# per-label f1
for i, name in enumerate(final_labels):
    print(name, "F1:", f1_score(refs_bin[:, i], preds[:, i], zero_division=0))

# full classification report
print(classification_report(refs_bin, preds, zero_division=0))


In [None]:
from sklearn.metrics import f1_score
# macro F1:
macro = f1_score(refs, preds, average="macro", zero_division=0)
micro = f1_score(refs, preds, average="micro", zero_division=0)
print("macro, micro:", macro, micro)


In [4]:
import numpy as np

# Convert Hugging Face dataset into numpy array of labels
all_labels = np.stack(dataset["train"]["labels"])  # shape: (num_samples, num_labels)

# Count positives and negatives per label
pos_counts = all_labels.sum(axis=0)
neg_counts = all_labels.shape[0] - pos_counts

print("pos counts per label:", pos_counts)
print("neg counts per label:", neg_counts)

# If you want mapping to label names:
for name, pos, neg in zip(final_labels, pos_counts, neg_counts):
    print(f"{name:10s} | pos: {int(pos):5d} | neg: {int(neg):5d}")

val_labels = np.array(small_dataset["validation"]["labels"])
print("Validation label frequencies:", np.sum(val_labels, axis=0) / val_labels.shape[0])

pos counts per label: [4435.79776857 3377.90284005 3411.00137803 4083.14986549 4608.76150663
 3826.38664122  257.        ]
neg counts per label: [19564.20223143 20622.09715995 20588.99862197 19916.85013451
 19391.23849337 20173.61335878 23743.        ]
Anger      | pos:  4435 | neg: 19564
Fear       | pos:  3377 | neg: 20622
Happiness  | pos:  3411 | neg: 20588
Hatred     | pos:  4083 | neg: 19916
Sadness    | pos:  4608 | neg: 19391
Wonder     | pos:  3826 | neg: 20173
Neutral    | pos:   257 | neg: 23743
Validation label frequencies: [0.16867392 0.14521033 0.14398886 0.1701612  0.18877742 0.16365703
 0.01953125]
