In [None]:
!pip install -q -U bitsandbytes
!pip install -q -U git+https://github.com/huggingface/transformers.git 
!pip install -q -U git+https://github.com/huggingface/peft.git
!pip install -q -U git+https://github.com/huggingface/accelerate.git
!pip install -q datasets
!pip install evaluate

In [1]:
from datasets import load_dataset, DatasetDict, Dataset, concatenate_datasets

from transformers import (
    AutoTokenizer,
    AutoConfig, 
    AutoModelForSequenceClassification,
    BitsAndBytesConfig,
    DataCollatorWithPadding,
    TrainingArguments,
    Trainer)

from peft import PeftModel, PeftConfig, get_peft_model, LoraConfig
import evaluate
import torch
import numpy as np
import pandas as pd
from collections import Counter
import random
from transformers import EarlyStoppingCallback
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Load dataset
dataset = load_dataset('google-research-datasets/go_emotions')

# Concatenate training, validation and test subset to one dataset
dataset = concatenate_datasets([dataset['train'], dataset['validation'], dataset['test']])

# Choose just single label rows
dataset = dataset.filter(lambda dic: len(dic["labels"]) == 1)

# Flatten the labels
dataset = dataset.map(lambda row: {"labels": row["labels"][0]})

# Count each label
label_counts = Counter(dataset['labels'])

# Filter labels with more than 150 counts
labels_more_than_800 = [label for label, count in sorted(label_counts.items(), key=lambda item: item[1], reverse=True) if count > 800]

print(labels_more_than_800)


# Set a fixed seed for reproducibility
random.seed(11)

# Preselect labels for each num_labels once
label_choices = {}
for num_labels in [2, 4, 8, 16]:
    if num_labels == 2:
        label_choices[num_labels] = random.sample(labels_more_than_800[:9], num_labels)
    else:
        label_choices[num_labels] = random.sample(labels_more_than_800, num_labels)

# Function to create balanced dataset with provided labels
def create_balanced_dataset(num_labels, num_samples, dataset, selected_labels):

    # Create placeholders
    train_dataset_balanced = None
    val_dataset_balanced = None
    test_dataset_balanced = None

    for label in selected_labels:
        single_label_dataset = dataset.filter(lambda dic: dic["labels"] == label)

        # Sample for train/val/test
        n = int(num_samples / num_labels)
        train = single_label_dataset.select(range(n))
        val = single_label_dataset.select(range(n, int(1.1 * n)))
        test = single_label_dataset.select(range(int(1.1 * n), int(1.2 * n)))

        # Combine
        train_dataset_balanced = train if train_dataset_balanced is None else concatenate_datasets([train_dataset_balanced, train])
        val_dataset_balanced = val if val_dataset_balanced is None else concatenate_datasets([val_dataset_balanced, val])
        test_dataset_balanced = test if test_dataset_balanced is None else concatenate_datasets([test_dataset_balanced, test])

    return DatasetDict({
        "train": train_dataset_balanced,
        "validation": val_dataset_balanced,
        "test": test_dataset_balanced
    })

# Now generate datasets with consistent label sets
balanced_datasets = {}
for num_labels in [2, 4, 8, 16]:
    selected_labels = label_choices[num_labels]
    for num_samples in [800, 1600, 2400]:
        print(f"Creating balanced dataset for {num_labels} labels and {num_samples} samples...")
        ds = create_balanced_dataset(num_labels, num_samples, dataset, selected_labels)

        # Map labels to 0...num_labels-1
        label_mapping = {old_label: new_label for new_label, old_label in enumerate(selected_labels)}
        print('Label mapping:', label_mapping)
        ds = ds.map(lambda example: {"labels": label_mapping[example["labels"]]})
        balanced_datasets[f"{num_labels}_labels_{num_samples}"] = ds

In [None]:
# import metrics
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

# Define metrics computation
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=1)

    return {
        "accuracy": accuracy.compute(predictions=predictions, references=labels)["accuracy"],
        "precision": precision.compute(predictions=predictions, references=labels, average="weighted")["precision"],
        "recall": recall.compute(predictions=predictions, references=labels, average="weighted")["recall"],
        "f1": f1.compute(predictions=predictions, references=labels, average="weighted")["f1"],
    }

In [None]:
def fine_tuned_model(model_base, dataset, num_labels, num_samples):
    
    # Quantized the model
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,  # 4-bit quantization
        bnb_4bit_compute_dtype=torch.float16,  # Use float16 for stability
        bnb_4bit_use_double_quant=True  # Enable double quantization for memory efficiency
    )

    #logging.set_verbosity_error()

    # generate classification model from model_checkpoint
    model = AutoModelForSequenceClassification.from_pretrained(
        model_base,
        num_labels=num_labels, 
        quantization_config=bnb_config,
        device_map= {"":0}
    )
    
    # create tokenizer
    tokenizer= AutoTokenizer.from_pretrained(model_base)
    
    # add pad token if none exists
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': '[PAD]'})
        model.resize_token_embeddings(len(tokenizer))

    # create tokenize function
    def tokenize_function(examples):
        # extract text
        text = examples["text"]
    
        #tokenize and truncate text
        tokenizer.truncation_side = "left"
        tokenized_inputs = tokenizer(
            text,
            return_tensors="np",
            truncation=True,
            max_length=512
        )
    
        return tokenized_inputs
    
    tokenized_dataset = balanced_datasets[f"{num_labels}_labels_{num_samples}"].map(tokenize_function, batched=True)
    
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    
    peft_config = LoraConfig(task_type="SEQ_CLS",
                            r=4,
                            lora_alpha=32,
                            lora_dropout=0.05,
                            target_modules = ["q_proj", "k_proj", "v_proj"]
     )

    model = get_peft_model(model, peft_config)
    
    training_args = TrainingArguments(
        output_dir="./results",
        eval_strategy="steps",         
        eval_steps=50,                 
        save_strategy="steps",          
        save_steps=50,
        logging_steps=50,
        per_device_train_batch_size=4,  
        per_device_eval_batch_size=4,
        gradient_accumulation_steps=1,
        learning_rate=1e-4,            
        num_train_epochs=5,
        weight_decay=0.01,
        warmup_steps=100,              
        lr_scheduler_type="cosine",     
        load_best_model_at_end=True,
        metric_for_best_model="accuracy",
        fp16=False,
        bf16=True,
        optim="adamw_bnb_8bit",
        report_to="none",
    )


    model.config.pad_token_id = tokenizer.pad_token_id
    
    early_stopping = EarlyStoppingCallback(
        early_stopping_patience=20  
    )
    
    # creater trainer object
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["validation"],
        tokenizer=tokenizer,
        data_collator=data_collator, # this will dynamically pad examples in each batch to be equal length
        compute_metrics=compute_metrics,
        callbacks=[early_stopping]
    )
    
    # train model
    trainer.train()

    log_history = trainer.state.log_history
    
    # Separate logs by training and evaluation
    train_logs = [log for log in log_history if 'loss' in log and 'step' in log]
    eval_logs = [log for log in log_history if 'eval_loss' in log]
    
    # Convert to DataFrames
    df_train = pd.DataFrame(train_logs)
    df_eval = pd.DataFrame(eval_logs)
    
    # Merge both on step
    df_merged = pd.merge(df_train, df_eval, on='step', how='outer')
    
    # Sort and reset index
    df_merged = df_merged.sort_values("step").reset_index(drop=True)
    
    # Rename for readability (optional)
    df_merged.rename(columns={
        "loss": "Training Loss",
        "eval_loss": "Validation Loss",
        "eval_accuracy": "Accuracy",
        "eval_precision": "Precision",
        "eval_recall": "Recall",
        "eval_f1": "F1"
    }, inplace=True)
    
    # Display final table
    df_merged.to_csv(f"results_{num_labels}_{num_samples}.csv")

    test_results = trainer.evaluate(eval_dataset=tokenized_dataset["test"])
    df_test_results = pd.DataFrame([test_results])
    df_test_results["Dataset"] = f"{num_labels}_labels_{num_samples}_samples"
    df_test_results["Model"] = model_base

    return df_test_results

In [None]:
model_base= "Qwen/Qwen2.5-1.5B" 
dataset = balanced_datasets

all_results = []
num_labels = 2
for num_labels in [2, 4, 8, 16]:
    for num_samples in [800, 1600, 2400]:
    
        df = fine_tuned_model(model_base, dataset, num_labels, num_samples)
        all_results.append(df)

final_results = pd.concat(all_results, ignore_index=True)

In [None]:
final_results.to_csv('final_results.csv')