## 1- Importing Libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoModel, AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset, ClassLabel
import torch
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import os
os.environ["WANDB_DISABLED"] = "true"
import warnings
warnings.filterwarnings('ignore')

## 2- Loading Datasets

In [None]:
train = load_dataset('csv', data_files = '/kaggle/input/emotions-dataset-for-nlp/train.txt',
                     sep=';',
                     names=['text', 'label'])['train']
val = load_dataset('csv', data_files = '/kaggle/input/emotions-dataset-for-nlp/val.txt',
                     sep=';',
                     names=['text', 'label'])['train']
test = load_dataset('csv', data_files = '/kaggle/input/emotions-dataset-for-nlp/test.txt',
                     sep=';',
                     names=['text', 'label'])['train']

In [None]:
train.set_format(type="pandas")
df = train[:]
df.head()

In [None]:
df.label.value_counts(ascending=True).plot.barh()
plt.title('Class Distribution')
plt.show()

In [None]:
df['tweets_len'] = df.text.str.split().apply(len)
df.boxplot("tweets_len", by= 'label', grid=False, color='blue')
plt.suptitle("")
plt.xlabel("")
plt.show()

In [None]:
train.reset_format()

## 3- Loading DistilBERT tokenizer and tokenizing datasets

In [None]:
ckpt = 'distilbert-base-uncased'
tokenizer = AutoTokenizer.from_pretrained(ckpt)

In [None]:
print(f"Vocab size -> {tokenizer.vocab_size}\n\
Max input length -> {tokenizer.model_max_length}\n\
Expected Input Column names -> {tokenizer.model_input_names}")

In [None]:
def tokenize(batch):
    return tokenizer(batch['text'], padding=True, truncation=True)

In [None]:
train_tokenized = train.map(tokenize, batched=True, batch_size=None)

In [None]:
train_tokenized

In [None]:
print(train_tokenized[0])

In [None]:
val_tokenized = val.map(tokenize, batched=True, batch_size=None)
print(val_tokenized[0])

In [None]:
test_tokenized = val.map(tokenize, batched=True, batch_size=None)
print(test_tokenized[0])

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = AutoModel.from_pretrained(ckpt).to(device)

## 4- Getting last hidden states of all tokens(features for Logistic Regression)

In [None]:
def extract_last_hidden_states(batch):
    inputs = {k: v.to(device) for k, v in batch.items() if k in tokenizer.model_input_names}
    with torch.no_grad():
        hidden_states = model(**inputs).last_hidden_state
    return {'hidden_state': hidden_states[:, 0].cpu().numpy()}

In [None]:
train_tokenized.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])

In [None]:
train_hidden_states = train_tokenized.map(extract_last_hidden_states, batched=True)

In [None]:
train_hidden_states

In [None]:
val_tokenized.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
val_hidden_states = val_tokenized.map(extract_last_hidden_states, batched=True)
val_hidden_states

In [None]:
test_tokenized.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
test_hidden_states = test_tokenized.map(extract_last_hidden_states, batched=True)
test_hidden_states

In [None]:
X_train = np.array(train_hidden_states["hidden_state"])
X_valid = np.array(val_hidden_states["hidden_state"])
y_train = np.array(train_hidden_states["label"])
y_valid = np.array(val_hidden_states["label"])
X_train.shape, X_valid.shape

In [None]:
lr_clf = LogisticRegression(max_iter=3000).fit(X_train, y_train)
lr_clf.score(X_valid, y_valid)

In [None]:
print(f1_score(y_valid, lr_clf.predict(X_valid), average='weighted'))

- this score is not too bad given that the data is imbalanced

In [None]:
labels = list(df.label.unique())

In [None]:
cm = confusion_matrix(y_valid, lr_clf.predict(X_valid), normalize='true')
sns.heatmap(cm, annot=True, cmap='Blues', xticklabels=labels, yticklabels=labels, fmt='.2f')
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.title("Normalized Confusion Matrix")
plt.show()

## 5- Fine-tuning DistilBERT

In [None]:
# we will use HF datasets' native encoding class to encode labels into integers
# First, Define the ClassLabel feature
class_labels = ClassLabel(names=list(labels))

# Encode the labels
def encode_labels(example):
    example['label'] = class_labels.str2int(example['label'])
    return example

In [None]:
class_labels.str2int(train_tokenized['label'][4])

In [None]:
train_encoded = train_tokenized.map(encode_labels, batched=True)
train_encoded

In [None]:
val_encoded = val_tokenized.map(encode_labels, batched=True)
val_encoded

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(ckpt, num_labels=6).to(device)

In [None]:
def compute_metrics(preds):
    labels = preds.label_ids
    pred = preds.predictions.argmax(-1)
    f1 = f1_score(labels, pred, average='weighted')
    acc = accuracy_score(labels, pred)
    return {'accuracy': acc, 'f1-score': f1}

In [None]:
batch_size = 64
logging_steps = len(train_tokenized) // batch_size
model_name = f"{ckpt}-finetuned-emotion"
training_args = TrainingArguments(output_dir=model_name,
                                num_train_epochs=5,
                                learning_rate=2e-5,
                                per_device_train_batch_size=batch_size,
                                per_device_eval_batch_size=batch_size,
                                weight_decay=0.01,
                                eval_strategy="epoch",
                                # logging_steps=logging_steps,
                                push_to_hub=False,
                                # log_level="error",
                                report_to=None)

In [None]:
trainer = Trainer(model=model, args=training_args,
                  compute_metrics=compute_metrics,
                  train_dataset=train_encoded,
                  eval_dataset=val_encoded)
trainer.train()

## 6- Evaluation and Testing

In [None]:
results = trainer.evaluate()

In [None]:
print(f"Evaluation Accuracy -> {results['eval_accuracy']*100:.2f}%")
print(f"Evaluation Loss -> {results['eval_loss']:.5f}")

In [None]:
val_predictions = trainer.predict(val_encoded)


In [None]:
y_preds = np.argmax(val_predictions.predictions, axis=1)
cm = confusion_matrix(val_encoded['label'], y_preds, normalize='true')
sns.heatmap(cm, annot=True, cmap='Blues', xticklabels=labels, yticklabels=labels, fmt='.2f')
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.title("Normalized Confusion Matrix")
plt.show()

In [None]:
# Encode the labels in test set
test_encoded = test_tokenized.map(encode_labels, batched=True)

# Compute predictions on the test set
test_predictions = trainer.predict(test_encoded)
y_test_preds = np.argmax(test_predictions.predictions, axis=1)

# Calculate metrics
test_accuracy = accuracy_score(test_encoded['label'], y_test_preds)
test_f1 = f1_score(test_encoded['label'], y_test_preds, average='weighted')

print(f"Test Set Accuracy -> {test_accuracy * 100:.2f}%")
print(f"Test Set F1-score -> {test_f1:.4f}")

# Confusion Matrix for test set
cm = confusion_matrix(test_encoded['label'], y_test_preds, normalize='true')
sns.heatmap(cm, annot=True, cmap='Blues', xticklabels=labels, yticklabels=labels, fmt='.2f')
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.title("Test Set Normalized Confusion Matrix")
plt.show()


In [None]:
test_encoded = test_tokenized.map(encode_labels, batched=True)
test_preds_output = trainer.predict(test_encoded)
y_preds = np.argmax(test_preds_output.predictions, axis=1)
cm = confusion_matrix(test_encoded['label'], y_preds, normalize='true')
sns.heatmap(cm, annot=True, cmap='Blues', xticklabels=labels, yticklabels=labels, fmt='.2f')
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.title("Normalized Confusion Matrix")
plt.show()

In [None]:
# Define the predict function
def predict(texts):
    # Tokenize the texts
    inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt").to(device)
    # Get the model's predictions
    with torch.no_grad():
        logits = model(**inputs).logits
    # Convert logits to predicted labels
    predictions = torch.argmax(logits, dim=-1).tolist()
    # Decode the predicted labels
    predicted_labels = [class_labels.int2str(pred) for pred in predictions]
    return predicted_labels

# Example usage
test_texts = ['I am so happy', 'The man felt lonely', 'The guests felt satisfied']
predicted_emotions = predict(test_texts)
print(predicted_emotions)


In [None]:
# Save model and tokenizer to Kaggle's working directory
save_directory = "/kaggle/working/emotion_extraction_vitavoice"
model.save_pretrained(save_directory)
tokenizer.save_pretrained(save_directory)