In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install -q accelerate==0.20.3 torch==2.2.1
!pip install -q -U transformers peft bitsandbytes
!pip install -q torch datasets

In [None]:
import numpy as np
import pandas as pd
import os,torch
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    RobertaTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer
)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# RoBERTa

In [None]:
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
model= AutoModelForSequenceClassification.from_pretrained("FacebookAI/roberta-base", num_labels=8,
                                                          problem_type="multi_label_classification")

model.config.pad_token_id = tokenizer.pad_token_id

model=model.to(device)




# Mistral & Llama 2

In [None]:
# Qunatization Config

quantization_config = BitsAndBytesConfig(
    load_in_4bit = True,
    bnb_4bit_quant_type = 'nf4',
    bnb_4bit_use_double_quant = True,
    bnb_4bit_compute_dtype = torch.bfloat16
)

In [None]:
# LORA Config

target_modules = ['q_proj','k_proj','v_proj','o_proj','gate_proj','down_proj','up_proj','lm_head']

lora_config = LoraConfig(
    r=128,
    lora_alpha= 256,
    lora_dropout=0.1,
    bias="none",
    target_modules = target_modules,
    task_type = 'SEQ_CLS',
)

In [None]:
mistral='mistralai/Mistral-7B-v0.1'
llama='NousResearch/Llama-2-7b-hf'

tokenizer = AutoTokenizer.from_pretrained(mistral)
model = AutoModelForSequenceClassification.from_pretrained(mistral,
                                                           quantization_config=quantization_config,
                                                           num_labels=8,
                                                           torch_dtype=torch.float16,
                                                           problem_type="multi_label_classification")



model.config.use_cache = False
model.config.pretraining_tp = 1
model.gradient_checkpointing_enable()

model = prepare_model_for_kbit_training(model)
model = get_peft_model(model, lora_config)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

model.config.pad_token_id = tokenizer.pad_token_id

model=model.to(device)

# Trainable Parameters

In [None]:
def print_number_of_trainable_model_parameters(model):
    trainable_model_params = 0
    all_model_params = 0
    for _, param in model.named_parameters():
        all_model_params += param.numel()
        if param.requires_grad:
            trainable_model_params += param.numel()
    return f"Trainable model parameters: {trainable_model_params}\nAll model parameters: {all_model_params}\nPercentage of trainable model parameters: {100 * trainable_model_params / all_model_params:.2f}%"

print(print_number_of_trainable_model_parameters(model))

# Importing data & one-hot encoding the labels

In [None]:
path='' #path to the labelled dataset
df= pd.read_csv(path)

In [None]:
import json

category_to_index = {'not_categorisable': 0,
          'conflict_and_crisis': 1,
          'migration_flow': 2,
          'host_country_security': 3,
          'host_country_politics': 4,
          'refugee_rights_and_advocacy': 5,
          'host_country_resources': 6,
          'host_country_symbolic_discourse': 7}

num_labels = len(category_to_index)

def convert_labels(row):
    label_array = [0] * num_labels
    entry = row['refugee_sentiment']
    try:
        parsed_entry = json.loads(entry)
        if 'choices' in parsed_entry:
            categories = parsed_entry['choices']
        else:
            categories = [parsed_entry]
    except json.JSONDecodeError:
        categories = [entry]

    #One-hot encoding the categories
    for category in categories:
        if category in category_to_index:
            index = category_to_index[category]
            label_array[index] = 1

    return label_array


df['one_hot'] = df.apply(convert_labels, axis=1)

desired_columns = ['Translation', 'URL', 'Date', 'Language', 'Sentiment', 'one_hot']
df = df[desired_columns]

# Splitting the labelled data & creating the datasets

In [None]:
from torch.utils.data import Dataset
from datasets import Dataset, DatasetDict

class CustomHFDataset(Dataset):
    def __init__(self, dataset, tokenizer, max_len, text_column, one_hot_column, device=None):
        self.tokenizer = tokenizer
        self.dataset = dataset
        self.text_column = text_column
        self.one_hot_column = one_hot_column
        self.max_len = max_len
        self.device = device if device is not None else torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        record = self.dataset[index]
        text = str(record[self.text_column])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            return_tensors="pt"
        )

        labels = record[self.one_hot_column]

        return {
            "input_ids": inputs["input_ids"].flatten(),
            "attention_mask": inputs["attention_mask"].flatten(),
            "token_type_ids": inputs.get("token_type_ids", torch.tensor([])).flatten(),
            "labels": torch.FloatTensor(labels)
        }


In [None]:
train_size = 0.8
val_test_size = 0.5
MAX_LEN = 512

#Splitting into training and temporary data
train_temp = df.sample(frac=train_size, random_state=42)
temp = df.drop(train_temp.index)

#Splitting temporary data into validation and test sets
val = temp.sample(frac=val_test_size, random_state=42)
test = temp.drop(val.index)

# Resetting indices and creating datasets
dataset_dict = DatasetDict({
    'train': Dataset.from_pandas(train_temp.reset_index(drop=True)),
    'validation': Dataset.from_pandas(val.reset_index(drop=True)),
    'test': Dataset.from_pandas(test.reset_index(drop=True))
})

custom_datasets = {}

for phase in ['train', 'validation', 'test']:
    dataset = dataset_dict[phase]
    custom_datasets[phase] = CustomHFDataset(
        dataset,
        tokenizer,
        max_len=MAX_LEN,
        text_column='Translation',
        one_hot_column='one_hot'
    )

encoded_dict = DatasetDict(custom_datasets)

#Mutli-label classification adjustments

In [None]:
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from transformers import EvalPrediction
import torch.nn.functional as F

def multi_label_metrics(predictions, labels, threshold=0.5):
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))

    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1
    y_true = labels

    f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro')
    roc_auc = roc_auc_score(y_true, y_pred, average = 'micro')

    # Return Metrics Dict
    metrics = {'f1': f1_micro_average,
               'roc_auc': roc_auc}

    return metrics

def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    result = multi_label_metrics(predictions=preds, labels=p.label_ids)
    return result

In [None]:
def custom_data_collator(features):
    input_ids = torch.stack([f['input_ids'] for f in features])
    batch = {'input_ids': input_ids}

    attention_masks = torch.stack([f['attention_mask'] for f in features])
    batch['attention_mask'] = attention_masks

    if isinstance(features[0]['labels'], torch.Tensor):
        labels = torch.stack([f['labels'] for f in features])
        batch['labels'] = labels

    return batch

In [None]:
def custom_loss_function(outputs, labels, penalty_factor=1.0):
    bce_loss = F.binary_cross_entropy_with_logits(outputs, labels, reduction='none')

    class_1_pred = torch.sigmoid(outputs[:, 0]) > 0.5
    other_classes_pred = torch.sigmoid(outputs[:, 1:]) > 0.5

    #Custom penalty
    penalty_condition = class_1_pred.unsqueeze(-1) & other_classes_pred
    penalties = penalty_condition.any(dim=1).float() * penalty_factor

    # Apply penalty
    enhanced_loss = bce_loss.mean(dim=1) + penalties

    return enhanced_loss.mean()

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss = custom_loss_function(logits, labels.float())
        return (loss, outputs) if return_outputs else loss

# Training

In [None]:
epochs=10
batch_size = 16
metric_name = "f1"


batches_per_epoch = 1600 / batch_size
total_training_steps = batches_per_epoch * epochs
warmup_steps = total_training_steps * 0.01

In [None]:
args = TrainingArguments(output_dir='', #path to output dir
                          logging_dir='' , #path to logs
                          remove_unused_columns=False ,
                          num_train_epochs=epochs,
                          load_best_model_at_end=True,
                          evaluation_strategy = "epoch",
                          save_strategy="epoch",
                          logging_steps=10,
                          learning_rate=2e-5,
                          metric_for_best_model=metric_name,
                          per_device_train_batch_size= batch_size,
                          per_device_eval_batch_size= batch_size,
                          warmup_steps=int(warmup_steps),
                          save_total_limit=2,
                          weight_decay=0.001,
                          max_grad_norm=1.0,

                          #For QLORA training:
                          #label_names=['labels'],
                          #fp16=False,
                          #bf16=False
                          )

trainer = CustomTrainer(model=model,
                        args=args,
                        train_dataset=encoded_dict['train'],
                        eval_dataset=encoded_dict['validation'],
                        data_collator=custom_data_collator,
                        tokenizer = tokenizer,
                        compute_metrics = compute_metrics,
                        )


In [None]:
trainer.train()
eval_result = trainer.evaluate()
print(eval_result)

# Save the model and tokenizer
save_path = "" #path to model

model.save_pretrained(save_path)
tokenizer.save_pretrained(save_path)

# Evaluate on Test set


In [None]:
from peft import AutoPeftModelForSequenceClassification #For PEFT

save_path = "" #set to current model

#RoBERTA
#model = AutoModelForSequenceClassification.from_pretrained(save_path, num_labels=8, problem_type="multi_label_classification")

#Mistral, LLama
model = AutoPeftModelForSequenceClassification.from_pretrained(save_path, num_labels=8, problem_type="multi_label_classification")

tokenizer = AutoTokenizer.from_pretrained(save_path)

model=model.to(device)
model.eval()


if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

model.config.pad_token_id = tokenizer.pad_token_id

Predictions on test set:

In [None]:
from torch.utils.data import DataLoader
test_dataloader = DataLoader(custom_test_dataset, batch_size=4, shuffle=False)

all_probabilities = []
all_predictions = []

with torch.no_grad():
    for batch in test_dataloader:
        inputs = {k: v.to(model.device) for k, v in batch.items() if k != 'labels' and k != 'token_type_ids'}
        outputs = model(**inputs)
        logits = outputs.logits
        probabilities = torch.sigmoid(logits).cpu().numpy()
        predicted_labels = (probabilities > 0.5).astype(float)

        # Store predictions and probabilities
        all_probabilities.extend(probabilities)
        all_predictions.extend(predicted_labels)


all_probabilities = np.array(all_probabilities)
all_predictions = np.array(all_predictions)


Metric calculations:

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score, recall_score, roc_auc_score

#True labels:
y_test = np.array([entry['labels'].numpy() for entry in encoded_dict['test']])

#F1 Micro Average
f1_micro = f1_score(y_test, all_predictions, average='micro')
print(f"F1 Micro Average: {f1_micro}")

#F1 Macro Average
f1_macro = f1_score(y_test, all_predictions, average='macro')
print(f"F1 Macro Average: {f1_macro}")

#ROC AUC Micro
roc_auc_micro = roc_auc_score(y_test, all_probabilities, average='micro')
print(f"ROC AUC Micro Average: {roc_auc_micro}")

#ROC AUC Macro
roc_auc_macro = roc_auc_score(y_test, all_probabilities, average='macro')
print(f"ROC AUC Macro Average: {roc_auc_macro}")

#Hamming Score
hamming_score = np.mean(y_test == all_predictions)
print(f"Hamming Score: {hamming_score}")

Plotting ROC AUC graphs:

In [None]:
import numpy as np
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from itertools import cycle

n_classes = 8
fpr = dict()
tpr = dict()
roc_auc = dict()

y_test_flat = y_test.ravel()
all_probabilities_flat = all_probabilities.ravel()

fpr, tpr, _ = roc_curve(y_test_flat, all_probabilities_flat)
roc_auc = auc(fpr, tpr)

unique_fpr, indices = np.unique(fpr, return_index=True)
unique_tpr = tpr[indices]
smooth_fpr = np.linspace(0, 1, 300)
smooth_tpr = interp1d(unique_fpr, unique_tpr, kind='quadratic', fill_value="extrapolate")(smooth_fpr)

fpr, tpr = smooth_fpr, smooth_tpr


roc_auc_direct = roc_auc_score(y_test_flat, all_probabilities_flat, average='micro')
roc_auc_plot = auc(fpr, tpr)

print(f"ROC AUC from Plot Data: {roc_auc_plot}")
print(f"ROC AUC Direct Calculation: {roc_auc_direct}")


plt.figure(figsize=(7, 7))
plt.plot(fpr, tpr,
         color='darkblue', linestyle='-', linewidth=3)

plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Micro-Average ROC Curve with Smoothing: Llama 2 7B')
plt.legend(loc="lower right")
plt.show()

Per-class F1-scores & Accuracies:

In [None]:
def per_class_metrics(predictions, probabilities, labels):
    class_metrics = {}
    for i in range(labels.shape[1]):
        class_f1 = f1_score(labels[:, i], predictions[:, i])
        class_accuracy = accuracy_score(labels[:, i], predictions[:, i])
        class_metrics[f"Class_{i+1}"] = {"F1": class_f1, "Accuracy": class_accuracy}
    return class_metrics

class_metrics = per_class_metrics(all_predictions, all_probabilities, y_test)

for class_id, metrics in class_metrics.items():
    print(f"{class_id} - F1: {metrics['F1']}, Accuracy: {metrics['Accuracy']}")


Confusion Matrices for each label:

In [None]:
import numpy as np
from sklearn.metrics import multilabel_confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

label_names = [
    'Not Categorisable',
    'Conflict & Crisis',
    'Migration Flow',
    'Host Country Security',
    'Host Country Politics',
    'Refugee Rights & Advocacy',
    'Host Country Resources',
    'Host Country Symbolic Discourse'
]

confusion_matrices = multilabel_confusion_matrix(y_test, all_predictions)


def plot_confusion_matrix(cm, label_name, class_names=['Negative', 'Positive']):
    plt.figure(figsize=(5, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap=plt.cm.Blues, xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix for {label_name}')
    plt.show()


for i, cm in enumerate(confusion_matrices):
    plot_confusion_matrix(cm, label_names[i])
