# Disaster Classification Using Large Language Models (LLMs)
This project leverages three different popular LLMs such as BERT, RoBERTa and DistilBERT to classify different types of disasters from tweets. In addition, a majority voting based classification of disasters from tweets have been implemented in this project where we chose the best performing models after applying three different fine-tuning approaches such as - standard fine-tuning, LoRA and few-shot learning on all the three models. After that we choose the category of the disaster based on the majority voting of the three configurations.

### Objectives:
- To get familiar with existing pretrained LLMs.
- To get hands on experience with fine-tuning existing pretrained LLMs.
- To explore different fine-tuning techniques.
- To classify disaster categories from tweets.
- To leverage pretrained LLMs for improving accuracy of classification.

**Dataset link:** https://archive.ics.uci.edu/ml/datasets/Multimodal+Damage+Identification+for+Humanitarian+Computing

### Importing Required Libraries

In [1]:
import os
import re
import torch
import random
import pandas as pd
import numpy as np
from tqdm import tqdm
import seaborn as sns
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
from collections import defaultdict, Counter
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from peft import LoraConfig, get_peft_model, TaskType, PromptTuningConfig
from transformers import (
    AutoTokenizer, 
    AutoModelForSequenceClassification,
    Trainer, 
    TrainingArguments,
    BertConfig,
    RobertaConfig,
    DistilBertConfig
)
import warnings
warnings.filterwarnings('ignore')
from transformers import DataCollatorWithPadding, logging
logging.set_verbosity_error()  # Suppress initialization warnings
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, classification_report

In [2]:
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

In [3]:
# Setting the device
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

Using device: mps


### Load the Dataset

In [4]:
BASE_DIR = os.getcwd()
DATA_DIR = os.path.join(BASE_DIR, 'data', 'multimodal')

print(f"Data directory: {DATA_DIR}")
print(f"Disaster categories: {os.listdir(DATA_DIR)}")

Data directory: /Users/sahmed8/Desktop/llm-project/v2.0/data/multimodal
Disaster categories: ['human_damage', 'fires', '.DS_Store', 'damaged_nature', 'flood', 'non_damage', 'damaged_infrastructure']


### Create the Dataset
- Create the text dataset from different directories.
- The dataset used here is a multimodal dataset. However, we used only the text in this project.

In [5]:
# Initialize a list to store data
data = []

# Iterate through each disaster class folder
for class_name in os.listdir(DATA_DIR):
    class_folder = os.path.join(DATA_DIR, class_name)

    # Ensure it's a directory
    if os.path.isdir(class_folder):
        # Go one level deeper into the `text` subfolder
        text_folder = os.path.join(class_folder, 'text')

        if os.path.exists(text_folder) and os.path.isdir(text_folder):
            for file_name in os.listdir(text_folder):
                if file_name.endswith('.txt'):
                    file_path = os.path.join(text_folder, file_name)

                    # Read text content
                    with open(file_path, 'r', encoding='utf-8') as f:
                        text = f.read().strip()

                    # Append text with class label
                    data.append({'text': text, 'label': class_name})


# Print the total number of samples
print(f"Length of the dataset: {len(data)}")

Length of the dataset: 5831


### Statistical Information About the Dataset

In [6]:
# Creating a Pandas dataframe
df = pd.DataFrame(data)

# Display first 5 rows of the dataframe
display(df.head())

# Get the number of categories
num_classes = len(df['label'].unique())

Unnamed: 0,text,label
0,How they welcome syrian refugees in Macedonia ...,human_damage
1,When the coalition is no different than Assad ...,human_damage
2,Syrian children are the primary target of Syri...,human_damage
3,Syrian genocide continues........#assadcrimes ...,human_damage
4,This is #Yemen| Child BURIED under RUBBLE as 9...,human_damage


In [7]:
print(f"Number of tweets in each category:")
print(f"===================================")
df['label'].value_counts()

Number of tweets in each category:


label
non_damage                2957
damaged_infrastructure    1390
damaged_nature             514
flood                      384
fires                      346
human_damage               240
Name: count, dtype: int64

### Text Preprocessing
- We used beautiful soap to remove the HTML tags.
- Removed special characters except # symbols as the symbol in tweets contain information about certain events.
- Converted to lower case and removed the white spaces.

In [8]:
# Clean text
def clean_text(txt):
    txt = BeautifulSoup(txt, 'html.parser').get_text()
    txt = re.sub(r'https?://\S+', '', txt)
    txt = re.sub(r'[^#@A-Za-z0-9 ]+', ' ', txt).lower().strip()
    return txt

In [9]:
# Store the cleaned tweets in a new column of the dataframe
df['cleaned'] = df['text'].apply(clean_text)

In [10]:
# Data samples after cleaning
print("Data samples after cleaning:")
print(f"=====================================")

for i in range(10):
    print(f"Original Data: {df.text[i]}")
    print("--"*65)
    print(f"Cleaned Data: {df.cleaned[i]}")
    print(f"=="*65)

Data samples after cleaning:
Original Data: How they welcome syrian refugees in Macedonia .......shame on this world to see the suffering of Syrian and doing nothing to help them ..........#syria #syrians #syrie #assadcrimes #isiscrimes #refugees #syriangenocide #genocide #un #unitednations #syrianrefugees #syrianorphans
----------------------------------------------------------------------------------------------------------------------------------
Cleaned Data: how they welcome syrian refugees in macedonia  shame on this world to see the suffering of syrian and doing nothing to help them  #syria #syrians #syrie #assadcrimes #isiscrimes #refugees #syriangenocide #genocide #un #unitednations #syrianrefugees #syrianorphans
Original Data: When the coalition is no different than Assad and Russia in attacking civilians with White Phosphorus.  This descending hell fire has killed 14 civilians in #Raqqa,  right on the spot.

#Coalition_kills_civilians .
.
#everychildismychild #AssadHolocaust

In [11]:
# Encode labels as ints
pd.set_option('future.no_silent_downcasting', True)
df['enc_label'] = df['label'].replace({'non_damage':0, 
                                       'damaged_infrastructure':1, 
                                       'damaged_nature':2, 
                                       'fires':3, 
                                       'flood':4, 
                                       'human_damage':5})

# Display the updated df
display(df.head())

Unnamed: 0,text,label,cleaned,enc_label
0,How they welcome syrian refugees in Macedonia ...,human_damage,how they welcome syrian refugees in macedonia ...,5
1,When the coalition is no different than Assad ...,human_damage,when the coalition is no different than assad ...,5
2,Syrian children are the primary target of Syri...,human_damage,syrian children are the primary target of syri...,5
3,Syrian genocide continues........#assadcrimes ...,human_damage,syrian genocide continues #assadcrimes #isiscr...,5
4,This is #Yemen| Child BURIED under RUBBLE as 9...,human_damage,this is #yemen child buried under rubble as 9...,5


In [12]:
print(f"Number of tweets in each category:")
print(f"===================================")
df['enc_label'].value_counts()

Number of tweets in each category:


enc_label
0    2957
1    1390
2     514
4     384
3     346
5     240
Name: count, dtype: int64

### Dataset Splitting
- Split the dataset into training, validation and test sets.
- The ratio is Train (80%), Validation (10%) and Test (10%).

In [13]:
# Split data
train_texts, temp_texts, train_labels, temp_labels = train_test_split(
    df['cleaned'], df['enc_label'],
    test_size=0.2, stratify=df['enc_label'], random_state=42
)

val_texts, test_texts, val_labels, test_labels = train_test_split(
    temp_texts, temp_labels, test_size=0.5, stratify=temp_labels, random_state=42
)

print(f"Train size: {len(train_texts)}, Val size: {len(val_texts)}, Test size: {len(test_texts)}")

Train size: 4664, Val size: 583, Test size: 584


In [14]:
# Number of tweets in each category after splitting
print(f"Training set:")
print(train_labels.value_counts())

print(f"Validation set:")
print(val_labels.value_counts())

print(f"Test set:")
print(test_labels.value_counts())

Training set:
enc_label
0    2365
1    1112
2     411
4     307
3     277
5     192
Name: count, dtype: int64
Validation set:
enc_label
0    296
1    139
2     51
4     38
3     35
5     24
Name: count, dtype: int64
Test set:
enc_label
0    296
1    139
2     52
4     39
3     34
5     24
Name: count, dtype: int64


### Dataset Preparation

In [15]:
# Dataset class
torch_device = device  # for Trainer

class DisasterDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
        
    def __len__(self):
        return len(self.labels)
        
    def __getitem__(self, idx):
        item = {k:v[idx].to(torch_device) for k,v in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], device=torch_device)
        return item

In [16]:
# Prepare tokenized datasets
def prepare_datasets(model_name, max_length=150):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    def tok(txts):
        return tokenizer(txts, padding=True, truncation=True,
                         max_length=max_length, return_tensors='pt')
    train_enc = tok(train_texts.tolist())
    val_enc   = tok(val_texts.tolist())
    test_enc  = tok(test_texts.tolist())
    
    return (
        DisasterDataset(train_enc, train_labels.tolist()),
        DisasterDataset(val_enc,   val_labels.tolist()),
        DisasterDataset(test_enc,  test_labels.tolist()),
        tokenizer
    )

- Selects number of samples based on the parameter `SHOTS_PER_CLASS` for few-shot learning.
- We used the training dataset to select samples for few-shot learning.

In [17]:
# Helper for few-shot subsampling
def get_few_shot_dataset(full_dataset, shots_per_class):
    labels = full_dataset.labels
    indices_by_label = defaultdict(list)
    
    for idx, lab in enumerate(labels):
        indices_by_label[lab].append(idx)
        
    selected = []
    
    for lab, idxs in indices_by_label.items():
        k = min(shots_per_class, len(idxs))
        selected += random.sample(idxs, k)
        
    selected.sort()
    
    enc = {k: v[selected] for k, v in full_dataset.encodings.items()}
    labs = [labels[i] for i in selected]
    
    return DisasterDataset(enc, labs)

**Metics function to compute precision, recall and f1 score.**

In [18]:
# Metrics function
def compute_metrics(eval_pred):
    preds, labs = eval_pred
    pred_labels = np.argmax(preds, axis=1)
    
    precision, recall, f1, _ = precision_recall_fscore_support(
        labs, pred_labels, average='weighted', zero_division=0
    )
    
    acc = (pred_labels == labs).mean()
    
    return {'precision': precision, 'recall': recall, 'f1': f1, 'accuracy': acc}

**Hyperparameters to fine-tune the models.**

In [19]:
# Hyperparameters:
NUM_EPOCHS = 8

# few-shot: number of examples per class
SHOTS_PER_CLASS = 20  

### Standard Fine-Tuning
- Updated all layers to better represent the features of disaster classification.

In [20]:
# Standard Fine-Tuning
def run_standard_finetuning(model_name, tokenizer):
    print(f"\n=== Standard Fine-Tuning ({model_name}) ===")
    
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name, 
        num_labels=num_classes
    )
    
    training_args = TrainingArguments(
        output_dir=f'./{model_name}_standard_ft',
        num_train_epochs=NUM_EPOCHS,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=32,
        eval_strategy="epoch",
        save_strategy="epoch",
        logging_steps=10,
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        dataloader_pin_memory=False,
        report_to="none",
        logging_strategy="epoch"
    )
    
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
        data_collator=DataCollatorWithPadding(tokenizer) if tokenizer else None
    )
    
    trainer.train()
    results = trainer.evaluate(test_dataset)
    
    preds = trainer.predict(test_dataset)
    y_pred = np.argmax(preds.predictions, axis=1)
    y_true = preds.label_ids
    report = classification_report(y_true, y_pred, output_dict=True)
    
    return {
        'model': model,
        'metrics': results,
        'classification_report': report,
        'num_params': sum(p.numel() for p in model.parameters()),
        'trainable_params': sum(p.numel() for p in model.parameters() if p.requires_grad)
    }

### LoRA Fine-Tuning
- Fine-Tune the model using Low Rank Adapter approach.
- `r` in the `LoraConfig()` represents the rank of LoRA matrices. So, instead of using a full weight matrix $W \in R^{d_{out} \times d_{in}}$, LoRA decomposes it into smaller matrices: $W_{lora} = A . B, \quad A \in R^{d_{out} \times r}, \quad B \in R^{r \times d_{in}}$.
- We choose to use use `r=8` as it gives reasonable F1 score for the computing resources at hand.

In [21]:
# LoRA Fine-Tuning
def run_lora_finetuning(model_name, tokenizer):
    print(f"\n=== LoRA Fine-Tuning ({model_name}) ===")
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name, 
        num_labels=num_classes
    )
    
    # LoRA configuration
    target_modules = {
        "bert": ["query", "value"],
        "roberta": ["query", "value"],
        "distilbert": ["q_lin", "v_lin"]
    }[model_name.split('-')[0]]
    
    peft_config = LoraConfig(
        task_type=TaskType.SEQ_CLS,
        inference_mode=False,
        r=8, 
        lora_alpha=16,
        lora_dropout=0.1,
        target_modules=target_modules
    )
    
    model = get_peft_model(model, peft_config)
    model.print_trainable_parameters()
    
    training_args = TrainingArguments(
        output_dir=f'./{model_name}_lora_ft',
        num_train_epochs=NUM_EPOCHS,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=32,
        eval_strategy="epoch",
        save_strategy="epoch",
        logging_steps=10,
        load_best_model_at_end=True,
        metric_for_best_model='f1',
        dataloader_pin_memory=False,
        report_to="none",
        logging_strategy="epoch"
    )
    
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
        data_collator=DataCollatorWithPadding(tokenizer) if tokenizer else None
    )
    
    trainer.train()
    results = trainer.evaluate(test_dataset)
    
    preds = trainer.predict(test_dataset)
    y_pred = np.argmax(preds.predictions, axis=1)
    y_true = preds.label_ids
    report = classification_report(y_true, y_pred, output_dict=True)
    
    return {
        'model': model,
        'metrics': results,
        'classification_report': report,
        'num_params': sum(p.numel() for p in model.parameters()),
        'trainable_params': sum(p.numel() for p in model.parameters() if p.requires_grad)
    }

### Prompt Tuning
- It trains the model using few-shot learning where the number of examples is chosen based on the value of `SHOTS_PER_CLASS`.

In [22]:
def run_prompt_tuning(model_name, tokenizer, train_ds, val_ds, test_ds, shots_per_class=None):
    print(f"\n=== Prompt Tuning ({model_name}) ===")
    
    # If few-shot requested, subsample
    if shots_per_class is not None:
        print(f"Using {shots_per_class} shots per class (total ~{shots_per_class * num_classes} samples)")
        train_ds = get_few_shot_dataset(train_ds, shots_per_class)
        
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name, num_labels=num_classes
    )
    
    # Prompt tuning config
    peft_cfg = PromptTuningConfig(
        task_type=TaskType.SEQ_CLS,
        num_virtual_tokens=20,
        num_layers=model.config.num_hidden_layers,
        token_dim=model.config.hidden_size,
        num_attention_heads=model.config.num_attention_heads,
        inference_mode=False
    )
    
    model = get_peft_model(model, peft_cfg)
    model.print_trainable_parameters()
    
    args = TrainingArguments(
        output_dir=f'./{model_name}_prompt_tuning_fs',
        num_train_epochs=NUM_EPOCHS,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=32,
        eval_strategy="epoch",
        save_strategy="epoch",
        logging_steps=10,
        load_best_model_at_end=True,
        metric_for_best_model='f1',
        dataloader_pin_memory=False,
        report_to="none",
        logging_strategy="epoch"
    )
    
    trainer = Trainer(
        model=model, args=args,
        train_dataset=train_ds, eval_dataset=val_ds,
        compute_metrics=compute_metrics,
        data_collator=DataCollatorWithPadding(tokenizer)
    )
    
    trainer.train()
    
    results = trainer.evaluate(test_ds)
    preds = trainer.predict(test_ds)
    y_pred = np.argmax(preds.predictions, axis=1)
    y_true = preds.label_ids
    
    report = classification_report(
        y_true, y_pred, output_dict=True
    )
    
    return {
        'model': model,
        'metrics': results, 
        'classification_report': report,
        'num_params': sum(p.numel() for p in model.parameters()),
        'trainable_params': sum(p.numel() for p in model.parameters() if p.requires_grad)
    }


### Model Training
- Train the each of the three models for different fine-tuning approaches.

In [23]:
MODELS = ["bert-base-uncased", "roberta-base", "distilbert-base-uncased"]

# Run experiments for all models
all_results = {}

for model_name in MODELS:
    # Prepare datasets for current model
    train_dataset, val_dataset, test_dataset, tokenizer = prepare_datasets(model_name, max_length=150)
    
    # Get model key (e.g., 'bert' from 'bert-base-uncased')
    model_key = model_name.split('-')[0]
    
    # Run all methods for current model
    model_results = {}
    
    print(f"\n===== Running Experiments for {model_name.upper()} =====")
    model_results['standard_ft'] = run_standard_finetuning(model_name, tokenizer)
    model_results['lora_ft'] = run_lora_finetuning(model_name, tokenizer)
    model_results['prompt_tuning'] = run_prompt_tuning(model_name, 
                                                       tokenizer, 
                                                       train_dataset, 
                                                       val_dataset, 
                                                       test_dataset,
                                                       shots_per_class=SHOTS_PER_CLASS
                                                      )
    
    all_results[model_key] = model_results


===== Running Experiments for BERT-BASE-UNCASED =====

=== Standard Fine-Tuning (bert-base-uncased) ===
{'loss': 0.5468, 'grad_norm': 2.456413507461548, 'learning_rate': 4.3771404109589045e-05, 'epoch': 1.0}
{'eval_loss': 0.28122493624687195, 'eval_precision': 0.9177878873888076, 'eval_recall': 0.9142367066895368, 'eval_f1': 0.9149854091529339, 'eval_accuracy': 0.9142367066895368, 'eval_runtime': 8.4483, 'eval_samples_per_second': 69.008, 'eval_steps_per_second': 2.249, 'epoch': 1.0}
{'loss': 0.2426, 'grad_norm': 0.45614320039749146, 'learning_rate': 3.752140410958904e-05, 'epoch': 2.0}
{'eval_loss': 0.34063026309013367, 'eval_precision': 0.9100120647702898, 'eval_recall': 0.9056603773584906, 'eval_f1': 0.9070894213768718, 'eval_accuracy': 0.9056603773584906, 'eval_runtime': 8.3722, 'eval_samples_per_second': 69.635, 'eval_steps_per_second': 2.269, 'epoch': 2.0}
{'loss': 0.1651, 'grad_norm': 6.342898368835449, 'learning_rate': 3.127140410958904e-05, 'epoch': 3.0}
{'eval_loss': 0.37090

### Performance Comparision of All Models
- Compare the performance of all the three models (BERT, RoBERTa and DistilBERT) based on the F1 score.
- Display the parameters for all the configurations.
- Plot the confusion matrices of each configurations.

In [24]:
# Comprehensive comparison tables
def create_comparison_table(all_results):
    table_data = []
    
    for model_key, methods in all_results.items():
        for method, res in methods.items():
            row = {
                'Model': model_key.capitalize(),
                'Method': method.replace('_', ' ').title(),
                'F1 Score': f"{res['metrics']['eval_f1']:.4f}",
                'Precision': f"{res['metrics']['eval_precision']:.4f}",
                'Recall': f"{res['metrics']['eval_recall']:.4f}",
                'Total Params': f"{res['num_params']:,}",
                'Trainable Params': f"{res.get('trainable_params', res['num_params']):,}"
            }
            table_data.append(row)
    
    return pd.DataFrame(table_data)


comparison_table = create_comparison_table(all_results)


print("\n=== Performance Comparison of All Models===")
print(comparison_table.to_markdown(index=False))


=== Performance Comparison of All Models===
| Model      | Method        |   F1 Score |   Precision |   Recall | Total Params   | Trainable Params   |
|:-----------|:--------------|-----------:|------------:|---------:|:---------------|:-------------------|
| Bert       | Standard Ft   |     0.8957 |      0.8996 |   0.8973 | 109,486,854    | 109,486,854        |
| Bert       | Lora Ft       |     0.8227 |      0.8295 |   0.839  | 109,786,380    | 299,526            |
| Bert       | Prompt Tuning |     0.1032 |      0.4    |   0.2346 | 109,502,214    | 15,360             |
| Roberta    | Standard Ft   |     0.8938 |      0.8964 |   0.8955 | 124,650,246    | 124,650,246        |
| Roberta    | Lora Ft       |     0.8991 |      0.9032 |   0.9007 | 125,540,364    | 890,118            |
| Roberta    | Prompt Tuning |     0.0878 |      0.0557 |   0.2072 | 124,665,606    | 15,360             |
| Distilbert | Standard Ft   |     0.9054 |      0.9064 |   0.9058 | 66,958,086     | 66,958,086   

In [25]:
# Class mapping with proper names
class_mapping = {
    0: 'non_damage',
    1: 'damaged_infrastructure', 
    2: 'damaged_nature',
    3: 'fires',
    4: 'flood',
    5: 'human_damage'
}

# Get the class names in correct order
class_names = [class_mapping[i] for i in sorted(class_mapping.keys())]

# Plot all the confusion matrices
results = []
for model_key, methods in all_results.items():
    for method, res in methods.items():
        trainer = Trainer(
            model=res['model'],
            args=TrainingArguments(
                output_dir='./temp',
                per_device_eval_batch_size=32,
                report_to="none",
                logging_strategy="epoch",
                dataloader_pin_memory=False
            ),
            compute_metrics=compute_metrics
        )
        preds = trainer.predict(test_dataset)
        results.append({
            'model': model_key,
            'method': method,
            'y_true': preds.label_ids,
            'y_pred': np.argmax(preds.predictions, axis=1)
        })


for result in results:
    plt.figure(figsize=(12, 10))
    cm = confusion_matrix(result['y_true'], result['y_pred'])
    
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names, 
                cbar=False)
    
    # Customize plot
    plt.xlabel('Predicted Disaster Type', fontsize=12)
    plt.ylabel('Actual Disaster Type', fontsize=12)
    plt.title(f"{result['model'].upper()} - {method.replace('_', ' ').title()}", 
              fontsize=14, pad=20)
    
    # Rotate labels for better readability
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    
    plt.tight_layout()
    os.makedirs('confusion_matrices', exist_ok=True)
    filename = f"{result['model']}_{result['method']}_cm.png"
    plt.savefig(f'confusion_matrices/{filename}', 
               bbox_inches='tight', dpi=700)
    print("Confusion matrices saved in 'confusion_matrices' directory")
    # plt.show()
    plt.close()

Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory
Confusion matrices saved in 'confusion_matrices' directory


In [26]:
# Function to plot confusion matrix
def plot_confusion_matrix(y_true, y_pred, class_names, title):
    """
    Plot confusion matrix using seaborn heatmap
    
    Args:
        y_true: Array of true labels
        y_pred: Array of predicted labels
        class_names: List of class names
        title: Title for the plot
    """
    cm = confusion_matrix(y_true, y_pred)
    
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=class_names, 
                yticklabels=class_names)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title(title)
    filename = "majority_voting_ensemble_cm.png"
    plt.savefig(f'confusion_matrices/{filename}', bbox_inches='tight', dpi=700)
    print(f"Saved the confusion matrix with title - {title}")
    plt.close()

### Perform Majority Voting
- Take the predictions of the three best (based on weighted F1 score) models of each configurations (standard fine-tuning, LoRa and few-shot learning) for each sample of the test set and predict the class based on the predictions of the majority voting of the models.
- In case tie, choose the predictions of the best model (not applicable in this scenerio as we have odd number of models).
- Display the output of all configurations in a table.

In [27]:
# Majority voting using the best models of all three fine-tuning approaches
def majority_voting(all_results, test_dataset):
    # Get the best model of each type (standard, lora, prompt) across all architectures
    best_models = {
        'standard': None,
        'lora': None,
        'prompt': None
    }
    
    # Find best performing model for each method type
    best_f1 = {'standard': -1, 'lora': -1, 'prompt': -1}
    
    for model_key, methods in all_results.items():
        # Standard fine-tuning
        if methods['standard_ft']['metrics']['eval_f1'] > best_f1['standard']:
            best_f1['standard'] = methods['standard_ft']['metrics']['eval_f1']
            best_models['standard'] = methods['standard_ft']['model']
        
        # LoRA fine-tuning
        if methods['lora_ft']['metrics']['eval_f1'] > best_f1['lora']:
            best_f1['lora'] = methods['lora_ft']['metrics']['eval_f1']
            best_models['lora'] = methods['lora_ft']['model']
        
        # Prompt tuning
        if methods['prompt_tuning']['metrics']['eval_f1'] > best_f1['prompt']:
            best_f1['prompt'] = methods['prompt_tuning']['metrics']['eval_f1']
            best_models['prompt'] = methods['prompt_tuning']['model']
    
    print("\nBest models selected for majority voting based on F1 score:")
    for method, model in best_models.items():
        print(f"METHOD: {method} \tF1 score: {best_f1[method]:.4f}")
    
    # Get predictions from each best model
    all_preds = []
    for method, model in best_models.items():
        trainer = Trainer(model=model, args=TrainingArguments(dataloader_pin_memory=False))
        preds = trainer.predict(test_dataset)
        class_preds = np.argmax(preds.predictions, axis=1)
        all_preds.append(class_preds)
    
    # Perform majority voting
    final_preds = []
    for i in range(len(test_dataset)):
        votes = [pred[i] for pred in all_preds]
        
        # Count votes and select the most common
        vote_counts = Counter(votes)
        most_common = vote_counts.most_common(1)[0]
        
        # If tie, select the prediction from the best performing model (standard FT)
        if len(vote_counts) > 1 and vote_counts.most_common(2)[0][1] == vote_counts.most_common(2)[1][1]:
            final_preds.append(all_preds[0][i])  # standard FT breaks ties
        else:
            final_preds.append(most_common[0])
    
    return np.array(final_preds)

# Get true labels from test dataset
def get_true_labels(test_dataset):
    return np.array([item['labels'].item() for item in test_dataset])

# Run majority voting
y_pred_majority = majority_voting(all_results, test_dataset)
y_true = get_true_labels(test_dataset)

# Plot the confusion matrix for ensemble method
plot_confusion_matrix(y_true, y_pred_majority, class_names, "Majority Voting Ensemble")

# Add majority voting results to comparison table
mv_metrics = precision_recall_fscore_support(y_true, y_pred_majority, average='weighted')

# Add the result of the ensemble approach at the end of the table
comparison_table.loc[len(comparison_table)] = {
    'Model': 'Ensemble',
    'Method': 'Majority Voting',
    'F1 Score': f"{mv_metrics[2]:.4f}",
    'Precision': f"{mv_metrics[0]:.4f}",
    'Recall': f"{mv_metrics[1]:.4f}",
    'Total Params': 'N/A',
    'Trainable Params': 'N/A'
}

print("\n=== Updated Performance Comparison ===")
print(comparison_table.to_markdown(index=False))

# Save the table in a csv file
os.makedirs('results', exist_ok=True)
comparison_table.to_csv('results/full_comparison.csv', index=False)

print("\n=== All experiments completed ===")
print("Results saved in 'results' directory")


Best models selected for majority voting based on F1 score:
METHOD: standard 	F1 score: 0.9054
METHOD: lora 	F1 score: 0.8991
METHOD: prompt 	F1 score: 0.3410
Saved the confusion matrix with title - Majority Voting Ensemble

=== Updated Performance Comparison ===
| Model      | Method          |   F1 Score |   Precision |   Recall | Total Params   | Trainable Params   |
|:-----------|:----------------|-----------:|------------:|---------:|:---------------|:-------------------|
| Bert       | Standard Ft     |     0.8957 |      0.8996 |   0.8973 | 109,486,854    | 109,486,854        |
| Bert       | Lora Ft         |     0.8227 |      0.8295 |   0.839  | 109,786,380    | 299,526            |
| Bert       | Prompt Tuning   |     0.1032 |      0.4    |   0.2346 | 109,502,214    | 15,360             |
| Roberta    | Standard Ft     |     0.8938 |      0.8964 |   0.8955 | 124,650,246    | 124,650,246        |
| Roberta    | Lora Ft         |     0.8991 |      0.9032 |   0.9007 | 125,540,36

In [28]:
# Majority voting based on only standard and LoRA fine-tuning
def majority_voting_standard_lora(all_results, test_dataset):
    # Get the best model of each type (standard and lora only)
    best_models = {
        'standard': None,
        'lora': None
    }
    
    # Find best performing model for each method type
    best_f1 = {'standard': -1, 'lora': -1}
    
    for model_key, methods in all_results.items():
        # Standard fine-tuning
        if methods['standard_ft']['metrics']['eval_f1'] > best_f1['standard']:
            best_f1['standard'] = methods['standard_ft']['metrics']['eval_f1']
            best_models['standard'] = methods['standard_ft']['model']
        
        # LoRA fine-tuning only (skip prompt tuning)
        if methods['lora_ft']['metrics']['eval_f1'] > best_f1['lora']:
            best_f1['lora'] = methods['lora_ft']['metrics']['eval_f1']
            best_models['lora'] = methods['lora_ft']['model']
    
    print("\nBest models selected for majority voting (Standard + LoRA only):")
    print(f"Standard FT: F1 = {best_f1['standard']:.4f}")
    print(f"LoRA FT: F1 = {best_f1['lora']:.4f}")
    
    # Get predictions from each best model
    all_preds = []
    for method, model in best_models.items():
        trainer = Trainer(model=model, args=TrainingArguments(dataloader_pin_memory=False))
        preds = trainer.predict(test_dataset)
        class_preds = np.argmax(preds.predictions, axis=1)
        all_preds.append(class_preds)
    
    # Perform majority voting between two models
    final_preds = []
    for pred1, pred2 in zip(all_preds[0], all_preds[1]):
        if pred1 == pred2:  # Agreement
            final_preds.append(pred1)
        else:  # Disagreement - use the better performing model (standard FT)
            final_preds.append(all_preds[0][i])  # standard FT breaks ties
    
    return np.array(final_preds)

# Run majority voting with only standard and LoRA
y_pred_majority_std_lora = majority_voting_standard_lora(all_results, test_dataset)

# Evaluate performance
print("\n=== Majority Voting (Standard + LoRA) Performance ===")
print(classification_report(y_true, y_pred_majority_std_lora))

# Plot confusion matrix
plot_confusion_matrix(y_true, y_pred_majority_std_lora, class_names, "Majority Voting (Standard+LoRA)")

# Add to comparison table
mv_metrics = precision_recall_fscore_support(y_true, y_pred_majority_std_lora, average='weighted')
comparison_table.loc[len(comparison_table)] = {
    'Model': 'Ensemble',
    'Method': 'Majority Voting (Std+LoRA)',
    'F1 Score': f"{mv_metrics[2]:.4f}",
    'Precision': f"{mv_metrics[0]:.4f}",
    'Recall': f"{mv_metrics[1]:.4f}",
    'Total Params': 'N/A',
    'Trainable Params': 'N/A'
}

# Save updated results
comparison_table.to_csv('results/full_comparison.csv', index=False)
print("\n=== Final Performance Comparison ===")
print(comparison_table.to_markdown(index=False))


Best models selected for majority voting (Standard + LoRA only):
Standard FT: F1 = 0.9054
LoRA FT: F1 = 0.8991

=== Majority Voting (Standard + LoRA) Performance ===
              precision    recall  f1-score   support

           0       0.98      0.73      0.84       296
           1       0.38      0.98      0.55       139
           2       1.00      0.04      0.07        52
           3       0.50      0.03      0.06        34
           4       0.00      0.00      0.00        39
           5       1.00      0.08      0.15        24

    accuracy                           0.61       584
   macro avg       0.64      0.31      0.28       584
weighted avg       0.75      0.61      0.57       584

Saved the confusion matrix with title - Majority Voting (Standard+LoRA)

=== Final Performance Comparison ===
| Model      | Method                     |   F1 Score |   Precision |   Recall | Total Params   | Trainable Params   |
|:-----------|:---------------------------|-----------:|----

In [29]:
# Majority voting based on weighted F1 score
def weighted_voting(all_results, test_dataset):
    # Get the best model of each type (standard and lora only)
    best_models = {
        'standard': None,
        'lora': None
    }
    
    # Find best performing model for each method type with their F1 scores
    best_f1 = {'standard': -1, 'lora': -1}
    
    for model_key, methods in all_results.items():
        # Standard fine-tuning
        if methods['standard_ft']['metrics']['eval_f1'] > best_f1['standard']:
            best_f1['standard'] = methods['standard_ft']['metrics']['eval_f1']
            best_models['standard'] = methods['standard_ft']['model']
        
        # LoRA fine-tuning only
        if methods['lora_ft']['metrics']['eval_f1'] > best_f1['lora']:
            best_f1['lora'] = methods['lora_ft']['metrics']['eval_f1']
            best_models['lora'] = methods['lora_ft']['model']
    
    print("\nBest models selected for weighted voting:")
    print(f"Standard FT: F1 = {best_f1['standard']:.4f} (Weight: {best_f1['standard']:.2f})")
    print(f"LoRA FT: F1 = {best_f1['lora']:.4f} (Weight: {best_f1['lora']:.2f})")
    
    # Get predictions and probabilities from each best model
    all_preds = []
    all_probs = []
    for method, model in best_models.items():
        trainer = Trainer(model=model, args=TrainingArguments(dataloader_pin_memory=False))
        preds = trainer.predict(test_dataset)
        class_preds = np.argmax(preds.predictions, axis=1)
        class_probs = torch.softmax(torch.Tensor(preds.predictions), dim=1).numpy()
        all_preds.append(class_preds)
        all_probs.append(class_probs)
    
    # Calculate weights (normalized F1 scores)
    total_weight = best_f1['standard'] + best_f1['lora']
    weights = {
        'standard': best_f1['standard'] / total_weight,
        'lora': best_f1['lora'] / total_weight
    }
    
    # Perform weighted voting
    final_preds = []
    for i in range(len(test_dataset)):
        # Weighted probability aggregation
        weighted_probs = np.zeros_like(all_probs[0][i])
        for j, (probs, method) in enumerate(zip(all_probs, ['standard', 'lora'])):
            weighted_probs += probs[i] * weights[method]
        
        final_preds.append(np.argmax(weighted_probs))
    
    return np.array(final_preds)

# Run weighted voting
y_pred_weighted = weighted_voting(all_results, test_dataset)

# Evaluate performance
print("\n=== Weighted Voting Performance ===")
print(classification_report(y_true, y_pred_weighted))

# Plot confusion matrix
plot_confusion_matrix(y_true, y_pred_weighted, class_names, "Weighted Voting (Standard+LoRA)")

# Add to comparison table
wv_metrics = precision_recall_fscore_support(y_true, y_pred_weighted, average='weighted')
comparison_table.loc[len(comparison_table)] = {
    'Model': 'Ensemble',
    'Method': 'Weighted Voting',
    'F1 Score': f"{wv_metrics[2]:.4f}",
    'Precision': f"{wv_metrics[0]:.4f}",
    'Recall': f"{wv_metrics[1]:.4f}",
    'Total Params': 'N/A',
    'Trainable Params': 'N/A'
}

# Save updated results
comparison_table.to_csv('results/full_comparison.csv', index=False)
print("\n=== Final Performance Comparison ===")
print(comparison_table.to_markdown(index=False))


Best models selected for weighted voting:
Standard FT: F1 = 0.9054 (Weight: 0.91)
LoRA FT: F1 = 0.8991 (Weight: 0.90)

=== Weighted Voting Performance ===
              precision    recall  f1-score   support

           0       0.85      0.99      0.91       296
           1       0.85      0.79      0.82       139
           2       0.87      0.65      0.75        52
           3       0.92      0.65      0.76        34
           4       0.92      0.56      0.70        39
           5       0.75      0.62      0.68        24

    accuracy                           0.85       584
   macro avg       0.86      0.71      0.77       584
weighted avg       0.85      0.85      0.84       584

Saved the confusion matrix with title - Weighted Voting (Standard+LoRA)

=== Final Performance Comparison ===
| Model      | Method                     |   F1 Score |   Precision |   Recall | Total Params   | Trainable Params   |
|:-----------|:---------------------------|-----------:|------------:|-

In [30]:
def get_the_best_model(all_results, test_dataset):
    best_models = {
        'standard': None,
        'lora': None
    }
    
    # Find best performing model for each method type with their F1 scores
    best_f1 = {'standard': -1, 'lora': -1}
    
    for model_key, methods in all_results.items():
        # Standard fine-tuning
        if methods['standard_ft']['metrics']['eval_f1'] > best_f1['standard']:
            best_f1['standard'] = methods['standard_ft']['metrics']['eval_f1']
            best_models['standard'] = methods['standard_ft']['model']
        
        # LoRA fine-tuning only
        if methods['lora_ft']['metrics']['eval_f1'] > best_f1['lora']:
            best_f1['lora'] = methods['lora_ft']['metrics']['eval_f1']
            best_models['lora'] = methods['lora_ft']['model']
    
    print("\nBest models selected for weighted voting:")
    print(f"Standard FT: F1 = {best_f1['standard']:.4f} (Weight: {best_f1['standard']:.2f})")
    print(f"LoRA FT: F1 = {best_f1['lora']:.4f} (Weight: {best_f1['lora']:.2f})")

    return best_models

    
best_models = get_the_best_model(all_results, test_dataset)

best_standard_model = best_models.get('standard')
best_lora_model = best_models.get('lora')


Best models selected for weighted voting:
Standard FT: F1 = 0.9054 (Weight: 0.91)
LoRA FT: F1 = 0.8991 (Weight: 0.90)


In [31]:
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

def get_model_predictions(model, dataset):
    """Get prediction probabilities from a HF model"""
    trainer = Trainer(model=model)
    preds = trainer.predict(dataset)
    return torch.softmax(torch.Tensor(preds.predictions), dim=1).numpy()


# Get probabilities from both models
standard_probs = get_model_predictions(best_standard_model, train_dataset)
lora_probs = get_model_predictions(best_lora_model, train_dataset)

# Stack horizontally to create meta-features
meta_features = np.hstack([standard_probs, lora_probs])  # Shape: [n_samples, n_classes*2]

# Get true labels
y_true_train = np.array([item['labels'].item() for item in train_dataset])
y_true_test = np.array([item['labels'].item() for item in test_dataset])

# Initialize meta-learners
meta_learners = {
    "Logistic Regression": LogisticRegression(max_iter=1000),
    "SVM": SVC(kernel='linear', probability=True, random_state=42),
    "Random Forest": RandomForestClassifier(n_estimators=100, random_state=42)
}

# Scale features 
scaler = StandardScaler()
meta_features_scaled = scaler.fit_transform(meta_features)

# Dictionary to store all predictions
stacked_predictions = {}

# Train and evaluate each meta-learner
for name, model in meta_learners.items():
    # Scale features for SVM only
    if name == "SVM":
        X_train = meta_features_scaled
    else:
        X_train = meta_features
        
    model.fit(X_train, y_true_train)
    
    # Prediction function for each meta-learner
    def predict_fn(test_dataset, model=model, scaler=scaler if name == "SVM" else None):
        standard_test_probs = get_model_predictions(best_standard_model, test_dataset)
        lora_test_probs = get_model_predictions(best_lora_model, test_dataset)
        test_meta_features = np.hstack([standard_test_probs, lora_test_probs])
        
        if scaler:
            test_meta_features = scaler.transform(test_meta_features)
            
        return model.predict(test_meta_features)
    
    # Store predictions
    stacked_predictions[name] = predict_fn(test_dataset)
    
    # Evaluate
    print(f"\n=== {name} Meta-Learner Performance ===")
    print(classification_report(y_true_test, stacked_predictions[name]))

# Add results to comparison table
for name, preds in stacked_predictions.items():
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true_test, preds, average='weighted'
    )
    
    comparison_table.loc[len(comparison_table)] = {
        'Model': 'Ensemble',
        'Method': f'Stacked ({name})',
        'F1 Score': f"{f1:.4f}",
        'Precision': f"{precision:.4f}",
        'Recall': f"{recall:.4f}",
        'Total Params': 'N/A',
        'Trainable Params': 'N/A'
    }

# Save updated results
comparison_table.to_csv('results/full_comparison.csv', index=False)

# Display final table
print("\n=== Final Performance Comparison ===")
print(comparison_table.to_markdown(index=False))


=== Logistic Regression Meta-Learner Performance ===
              precision    recall  f1-score   support

           0       0.97      0.99      0.98       296
           1       0.84      0.86      0.85       139
           2       0.84      0.79      0.81        52
           3       0.86      0.74      0.79        34
           4       0.86      0.77      0.81        39
           5       0.69      0.83      0.75        24

    accuracy                           0.90       584
   macro avg       0.84      0.83      0.83       584
weighted avg       0.90      0.90      0.90       584


=== SVM Meta-Learner Performance ===
              precision    recall  f1-score   support

           0       0.99      0.99      0.99       296
           1       0.82      0.86      0.84       139
           2       0.87      0.77      0.82        52
           3       0.87      0.79      0.83        34
           4       0.86      0.77      0.81        39
           5       0.69      0.83      0

### Plot the Misclassification Report
- Plots the rate of missclassification (0-1) scale of each class for the best models in each configuration (standard fine-tuning, LoRa, few-shot learning and majority voting) 

In [32]:
# Misclassification rate plot
def get_model_predictions(all_results, test_dataset):
    # Get the best model of each type
    best_models = {
        'Standard FT': None,
        'LoRA FT': None,
        'Prompt Tuning': None
    }
    
    best_f1 = {'Standard FT': -1, 'LoRA FT': -1, 'Prompt Tuning': -1}
    
    for model_key, methods in all_results.items():
        # Standard fine-tuning
        if methods['standard_ft']['metrics']['eval_f1'] > best_f1['Standard FT']:
            best_f1['Standard FT'] = methods['standard_ft']['metrics']['eval_f1']
            best_models['Standard FT'] = methods['standard_ft']['model']
        
        # LoRA fine-tuning
        if methods['lora_ft']['metrics']['eval_f1'] > best_f1['LoRA FT']:
            best_f1['LoRA FT'] = methods['lora_ft']['metrics']['eval_f1']
            best_models['LoRA FT'] = methods['lora_ft']['model']
        
        # Prompt tuning
        if methods['prompt_tuning']['metrics']['eval_f1'] > best_f1['Prompt Tuning']:
            best_f1['Prompt Tuning'] = methods['prompt_tuning']['metrics']['eval_f1']
            best_models['Prompt Tuning'] = methods['prompt_tuning']['model']
    
    # Get predictions from each model
    predictions = {}
    for model_name, model in best_models.items():
        trainer = Trainer(model=model, args=TrainingArguments(dataloader_pin_memory=False))
        preds = trainer.predict(test_dataset)
        predictions[model_name] = np.argmax(preds.predictions, axis=1)
    
    return predictions


# Get predictions from all models
model_predictions = get_model_predictions(all_results, test_dataset)
y_true = get_true_labels(test_dataset)

# Add majority voting predictions
model_predictions['Majority Voting'] = y_pred_majority

# Calculate misclassification rates per class
def calculate_misclassification_rates(y_true, y_pred, num_classes):
    cm = confusion_matrix(y_true, y_pred)
    misclassification_rates = []
    
    # for i in range(num_classes):
    #     correct = cm[i,i]
    #     total = sum(cm[i,:])
    #     misclassification_rates.append(1 - (correct / total))
    # return misclassification_rates
    for i in range(num_classes):
        total = cm[i, :].sum()
        if total == 0:
            # No samples of class i: define error rate as 0.0 (or np.nan if you prefer)
            misclassification_rates.append(0.0)
        else:
            correct = cm[i, i]
            misclassification_rates.append(1.0 - (correct / total))
    
    return misclassification_rates

num_classes = len(class_names)
misclassification_data = {}

for model_name, y_pred in model_predictions.items():
    misclassification_data[model_name] = calculate_misclassification_rates(y_true, y_pred, num_classes)

# Create the bar plot
plt.figure(figsize=(12, 10))
bar_width = 0.2
index = np.arange(num_classes)

for i, (model_name, rates) in enumerate(misclassification_data.items()):
    bars = plt.bar(index + i*bar_width, rates, bar_width, label=model_name)

    # Add the value at the top of each bar
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height, f'{height:.3f}', ha='center', va='bottom')


plt.xlabel('Disaster Categories')
plt.ylabel('Misclassification Rate')
plt.title('Misclassification Rates by Disaster Category and Model')
plt.xticks(index + bar_width*1.5, class_names)
plt.legend()
plt.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.ylim(0, max([max(rates) for rates in misclassification_data.values()]) * 1.1)  # Add 10% headroom
plt.tight_layout()

# Save the plot
plt.savefig('results/misclassification_rates.png', dpi=700, bbox_inches='tight')
# plt.show()
plt.close()