In [2]:
import pandas as pd
from datasets import load_dataset, DatasetDict, Dataset
import logging
import time
import os
import json
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from tqdm import tqdm
from transformers import (
    RobertaTokenizer,
    RobertaForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
    DataCollatorWithPadding
)
from scipy.special import softmax
import numpy as np
import torch

: 

In [3]:
# datasets_path = 'please use the dataset path here - LLM dataset'


train = pd.read_csv(LLMtrainpath)
val = pd.read_csv(LLMvalpath)
test = pd.read_csv(LLMtestpath)

In [4]:
label2id = {
    'Pro': 0,
    'Against': 1,
    'Neutral': 2,
    'Not-about': 3
}

id2label = {v: k for k, v in label2id.items()}

In [None]:
def no_maj(df, col_name):
    df = df.loc[df[col_name] != 'No Majority']
    print(df.shape)
    return df

train = no_maj(train, 'majority_llm_noninst')
test = no_maj(test, 'majority_llm_noninst')
val = no_maj(val, 'majority_llm_noninst')   

In [6]:
label_encoding = {'Pro': 0,
'Against': 1,
'Neutral': 2,
'Not-about': 3}

train['labels'] = train['majority_llm_noninst'].map(label_encoding)
val['labels'] = val['majority_llm_noninst'].map(label_encoding)
test['labels'] = test['majority_llm_noninst'].map(label_encoding)

In [7]:
train = train[['Input', 'labels']]
val = val[['Input', 'labels']]
test = test[['Input', 'labels']]

In [8]:
train_ = Dataset.from_pandas(train)
test_ = Dataset.from_pandas(test)
val_ = Dataset.from_pandas(val)


dataset = DatasetDict({'train': train_, 'test': test_, 'val': val_})

In [9]:
save_dir = '../output/llm/'
model_name = 'FacebookAI/roberta-large' #google-bert/bert-large-uncased'
model_name_filename = model_name.replace("/", "-")

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-large')

In [11]:
def tokenize_func(examples):
    tokenized_inputs = tokenizer(examples['Input'], padding = 'max_length', truncation = True, max_length = 512)
    tokenized_inputs['label'] = examples['labels']
    return tokenized_inputs

In [None]:
train_tokenized = train_.map(tokenize_func, batched = True)
val_tokenized = val_.map(tokenize_func, batched = True)
test_tokenized = test_.map(tokenize_func, batched = True)

In [None]:
train_tokenized

In [14]:
train_tokenized.set_format('torch', columns = ['input_ids', 'attention_mask', 'label'])
val_tokenized.set_format('torch', columns = ['input_ids', 'attention_mask', 'label'])
test_tokenized.set_format('torch', columns = ['input_ids', 'attention_mask', 'label'])

In [None]:
num_labels = 4
len(label2id)

In [None]:
model = RobertaForSequenceClassification.from_pretrained('roberta-large', num_labels=len(label2id))

In [17]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [18]:
output_dir = f'./output/llm/baseline_{model_name_filename}'

In [19]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    accuracy = accuracy_score(labels, predictions)
    f1 = f1_score(labels, predictions, average='weighted')
    
    # Compute cross-entropy loss
    probs = softmax(logits, axis=-1)
    cross_entropy = -np.sum(np.eye(probs.shape[1])[labels] * np.log(probs + 1e-9)) / len(labels)
    
    return {
        'accuracy': accuracy,
        'f1': f1,
        'cross_entropy': cross_entropy
    }


In [20]:
training_args = TrainingArguments(
    output_dir=output_dir,
    num_train_epochs=6,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    evaluation_strategy="epoch",
    logging_dir="./logs",
    logging_strategy="steps",
    logging_steps=10,
    learning_rate=5e-5,
    weight_decay=0.01,
    warmup_steps=500,
    save_strategy="epoch",
    metric_for_best_model="f1",
    load_best_model_at_end=True,
)


In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=train_tokenized,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

In [None]:
trainer.train()

In [None]:
eval_results = trainer.evaluate()
print(eval_results)

In [24]:
best_model_dir = f'{output_dir}/best_model'

In [None]:
model.save_pretrained(best_model_dir)
tokenizer.save_pretrained(best_model_dir)

In [64]:
# tokenizer = RobertaTokenizer.from_pretrained(best_model_dir)
# model = RobertaForSequenceClassification.from_pretrained(best_model_dir)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

In [None]:
model.to(device)
model.eval()

In [28]:
def predictions(text):
    inputs = tokenizer(text, return_tensors="pt", truncation = True, padding = 'max_length', max_length = 512).to(device)
    with torch.no_grad():
        logits = model(**inputs).logits
        probabilities = torch.nn.functional.softmax(logits, dim=-1).tolist()[0]
        predicted_class = np.argmax(probabilities)
        return probabilities, predicted_class

In [29]:
softmax_prob = []
softmax_pred = []

for i, row in test.iterrows():
    text = row['Input']
    probs, preds = predictions(text)
    softmax_prob.append(probs)
    softmax_pred.append(preds)

In [30]:
test['softmax_prob'] = softmax_prob
test['softmax_preds'] = softmax_pred

In [None]:
y_true = test['labels']
y_pred = test['softmax_preds']

# Accuracy
accuracy = accuracy_score(y_true, y_pred)
print("Accuracy:", accuracy*100)

# Precision
precision = precision_score(y_true, y_pred, average='macro')  # 'macro' averaging for multiclass
print("Precision:", precision*100)

# Recall
recall = recall_score(y_true, y_pred, average='macro')  # 'macro' averaging for multiclass
print("Recall:", recall*100)

# F1 Score
f1 = f1_score(y_true, y_pred, average='macro')  # 'macro' averaging for multiclass
print("F1 Score:", f1*100)

# Confusion Matrix
conf_matrix = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:")
print(conf_matrix)

# Classification Report
class_report = classification_report(y_true, y_pred)
print("Classification Report:")
print(class_report)

## Temperature scaling

In [32]:
from temperature_scaling_roberta import TemperatureScalingCalibrationModule

In [None]:
dataset

In [34]:
columns = ['Input', '__index_level_0__']

In [None]:
def tokenize_fn(example):
    # Tokenize the input text
    tokenized_example = tokenizer(example['Input'], padding='max_length', truncation=True)
    # Add the numerical majority label
    tokenized_example['label'] = example['labels']
    return tokenized_example

tokenized_dict = dataset.map(
    tokenize_fn,
    batched= True,
    remove_columns = columns

)

In [None]:
calibration_module = TemperatureScalingCalibrationModule(best_model_dir, tokenizer).to(device)
calibration_module.fit(tokenized_dict['val'], n_epochs = 6)

In [None]:
calibration_module.temperature

In [38]:
from torch.utils.data import DataLoader

from transformers import (

    DataCollatorWithPadding
)
import numpy as np

In [39]:
def predict(model, examples, round_digits: int = 5):
    input_ids = examples['input_ids'].to(device)
    attention_mask = examples['attention_mask'].to(device)
    #token_type_ids = examples['token_type_ids'].to(device)
    batch_labels = examples['labels'].detach().cpu().numpy().tolist()
    model.eval()
    with torch.no_grad():
        batch_output = model(input_ids, attention_mask) #,token_type_ids

    batch_scores = np.round(batch_output.detach().cpu().numpy(), round_digits).tolist()
    predicted_labels = [np.argmax(scores) for scores in batch_scores]
    return batch_scores, batch_labels, predicted_labels


def predict_data_loader(model, data_loader: DataLoader) -> pd.DataFrame:
    scores = []
    true_labels = []
    pred_labels = []
    
    for examples in data_loader:
        batch_scores, batch_labels, batch_pred_labels = predict(model, examples)
        scores += batch_scores
        true_labels += batch_labels
        pred_labels += batch_pred_labels

    df_predictions = pd.DataFrame({'scores': scores, 'original_labels': true_labels, 'pred_labels': pred_labels})
    return df_predictions

In [None]:
data_collator = DataCollatorWithPadding(tokenizer, padding=True)
data_loader = DataLoader(tokenized_dict['test'], collate_fn=data_collator, batch_size=128)
start = time.time()
df_calibrated_predictions = predict_data_loader(calibration_module, data_loader)
end = time.time()

print('elapsed: ', end - start)
print(df_calibrated_predictions.shape)
df_calibrated_predictions.head()

In [None]:
y_true = df_calibrated_predictions['original_labels']
y_pred = df_calibrated_predictions['pred_labels']

# Accuracy
accuracy = accuracy_score(y_true, y_pred)
print("Accuracy:", accuracy*100)

# Precision
precision = precision_score(y_true, y_pred, average='macro')  # 'macro' averaging for multiclass
print("Precision:", precision*100)

# Recall
recall = recall_score(y_true, y_pred, average='macro')  # 'macro' averaging for multiclass
print("Recall:", recall*100)

# F1 Score
f1 = f1_score(y_true, y_pred, average='macro')  # 'macro' averaging for multiclass
print("F1 Score:", f1*100)

# Confusion Matrix
conf_matrix = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:")
print(conf_matrix)

# Classification Report
class_report = classification_report(y_true, y_pred)
print("Classification Report:")
print(class_report)

In [None]:
test.head()

In [None]:
df_calibrated_predictions.head()

In [None]:
test_df = pd.read_csv('df_test_llm_soft.csv')

test_df = no_maj(test_df, 'majority_llm_noninst')

In [45]:
test_df['labels'] = test_df['labels'].tolist()
test_df['uncalib_scores'] = softmax_prob
test_df['uncalib_preds'] = softmax_pred
test_df['calib_scores'] = df_calibrated_predictions['scores'].tolist()
test_df['calib_preds'] = df_calibrated_predictions['pred_labels'].tolist()

In [46]:
test_df.to_csv('results_baseline_roberta_llm.csv', index= False)