# BERT-Based Models Predictions

In [1]:
import torch
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from transformers import AutoModel, AutoTokenizer, AutoModelForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold, KFold
from transformers import get_linear_schedule_with_warmup

import pandas as pd
import numpy as np

from tabulate import tabulate
from tqdm import trange
import random
from sklearn.metrics import f1_score
import random


In [2]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Mon Apr 15 17:20:46 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.86.10              Driver Version: 535.86.10    CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla V100-PCIE-16GB           On  | 00000000:2F:00.0 Off |                    0 |
| N/A   34C    P0              35W / 250W |   4706MiB / 16384MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

## Define Base Model

In [3]:
base_model = "bert-base-uncased"

In [4]:
from sklearn.model_selection import train_test_split

def adjust_domain_label(labels, test_domain):
    return torch.tensor([l if l < test_domain else l - 1 for l in labels.tolist()])

add_lyrics_training = True  # True to use the synthetic lyrics in training (keep true)
just_lyrics = False  # Change this to True if you want to train just on the synthetic lyrics

count_synthetic_in_model_weights = True  # This option involves the counting of the synthetic labels in computing class weights.

num_labels = 2
num_domains = 4 # Change this based on the domain number used in training

print(f"Number of Domains: {num_domains}")

epochs = 10
batch_size = 16
epoch_count = 0
rw = 0.1
iw = 0.01
dw = 0.1
# Initialize the model with dynamic alpha for gradient reversal
initial_alpha = 0.1
alpha_growth_rate = 0.1  # How much to increase alpha after each epoch

if just_lyrics:
    add_lyrics_training = True

if just_lyrics:
    suffix = "_with_just_synthetic_lyrics"
    print(suffix)
elif add_lyrics_training:
    suffix = "_with_ssocial_media_and_synthetic_lyrics"
    print(suffix)
else:
    suffix = "_with_social_media"
    print(suffix)


Number of Domains: 4
_with_ssocial_media_and_synthetic_lyrics


## Adversarial Module:

In [5]:
from torch.autograd import Function
import torch.nn as nn

class GradientReversal(Function):
    @staticmethod
    def forward(ctx, x, alpha):
        ctx.save_for_backward(x, alpha)
        return x

    @staticmethod
    def backward(ctx, grad_output):
        grad_input = None
        _, alpha = ctx.saved_tensors
        if ctx.needs_input_grad[0]:
            grad_input = - alpha*grad_output
        return grad_input, None
revgrad = GradientReversal.apply

class GradientReversal(nn.Module):
    def __init__(self, alpha):
        super().__init__()
        self.alpha = torch.tensor(alpha, requires_grad=False)

    def forward(self, x):
        return revgrad(x, self.alpha)

class AdversarialBERT(nn.Module):
    def __init__(self, bert_model, moral_label=2, domain_label=3, class_weight=[0,0],
                 domain_weight=1, identity_weight=1, reconstruction_weight=1, moral_weight=1,
                 alpha=1.0,
                 freeze_bert=False): # class_weight[0] = -1 deactivates the baalancing tentatives

        super(AdversarialBERT, self).__init__()
        self.bert = bert_model
        bert_dim = 768
        self.invariant_trans = nn.Linear(768, 768)
        print(' self.invariant_trans ',  self.invariant_trans)
        if identity_weight+reconstruction_weight+domain_weight==0:
            self.moral_classification = nn.Linear(768, moral_label)
        else:
            self.moral_classification = nn.Sequential(nn.Linear(768,768),
                                                      nn.ReLU(),
                                                      nn.Linear(768, moral_label))

        self.domain_classification = nn.Sequential(GradientReversal(alpha),
                                                   nn.Linear(768,768),
                                                   nn.ReLU(),
                                                   nn.Dropout(0.3),
                                                   nn.Linear(768, domain_label))

        # Dynamically adjustable alpha for gradient reversal
        self.alpha = alpha
        self.domain_weight = domain_weight

        if moral_label>2:
                self.loss_fn_moral = nn.BCEWithLogitsLoss() #nn.CrossEntropyLoss()

        else:
            if class_weight[0]>0:
                weights = torch.tensor(class_weight).float()
            else:
                weights = torch.tensor([1.0 for _ in range(moral_label)]).float()
            if moral_label>2:
                self.loss_fn_moral = nn.BCEWithLogitsLoss(pos_weight=weights) #BCEWithLogitsLoss

            else:
                self.loss_fn_moral = nn.CrossEntropyLoss(weight=weights)

        self.loss_fn_domain = nn.CrossEntropyLoss()
        self.reconstruction_feed = nn.Linear(768, 768)
        self.loss_reconstruction = nn.MSELoss()
        self.weight_identity = identity_weight
        self.reconstruction_weight = reconstruction_weight
        self.moral_weight = moral_weight
        self.identity = torch.eye(768).to(device)
        self.freeze=freeze_bert

    def update_alpha(self, new_alpha):
        # Method to update alpha for the gradient reversal layer
        self.domain_classification[0].alpha = new_alpha

    def update_model_params(self, domain_weight, moral_weight, reconstruction_weight, identity_weight):
        # Update the relevant parameters
        self.domain_weight = domain_weight
        self.moral_weight = moral_weight
        self.reconstruction_weight = reconstruction_weight
        self.identity_weight = identity_weight

    def forward(self, b_input_ids, b_token_type_ids, b_input_mask, b_labels, b_domain_labels, original_bert_embeddings=None, test=False):
        # Forward pass
        if self.freeze:
            with torch.no_grad():
                pooled_output = self.bert(b_input_ids,
                                    token_type_ids = b_token_type_ids, #it was None
                                    attention_mask = b_input_mask).last_hidden_state[:,0,:]

        else:
            pooled_output = self.bert(b_input_ids,
                                token_type_ids = b_token_type_ids, #it was None
                                attention_mask = b_input_mask).last_hidden_state[:,0,:]


            pooled_output = self.invariant_trans(pooled_output)


        logits = self.moral_classification(pooled_output)

        loss_moral = self.loss_fn_moral(logits, b_labels)

        if test:
            return loss_moral, logits
        if self.domain_weight>0:
            loss_domain = self.loss_fn_domain(self.domain_classification(pooled_output), b_domain_labels)
        else:
            loss_domain=0
        if original_bert_embeddings is not None:
            loss_reconstruction = self.loss_reconstruction(self.reconstruction_feed(pooled_output), original_bert_embeddings)*self.reconstruction_weight
        else:
            loss_reconstruction=0
        if self.weight_identity>0:
            loss_identity = torch.norm(self.invariant_trans.weight-self.identity)*self.weight_identity
        else:
            loss_identity=0
        total_loss = loss_moral*self.moral_weight+loss_reconstruction+loss_identity+self.domain_weight*loss_domain
        return  total_loss

In [6]:
from transformers import AutoTokenizer, AutoModel

base_model = "bert-base-uncased"

test_lyrics = pd.read_csv("../Lyrics_Data/MFT_human_annotated_lyrics.csv") 

test_lyrics =  test_lyrics.iloc[:,-12:]

test_lyrics.rename({k:k.split("_")[-1] for k in test_lyrics.columns[2:]}, inplace=True, axis=1)

test_lyrics.rename({"lyrics":"cleaned_text"}, inplace=True, axis=1)

test_lyrics["domain"] = 3

test_lyrics["subdomain"] = "song_lyrics"

input_files = test_lyrics["cleaned_text"].values

tokenizer = AutoTokenizer.from_pretrained(base_model)
bert_model = AutoModel.from_pretrained(base_model)

# Original Input and attention masks without the augmented dictionary terms:
original_input_id = []
original_attention_masks = []
original_token_type_id = []

# Input id and attention masks with the dictionary terms:
input_id = []
attention_masks = []
token_type_id = []

def preprocessing(input_text, tokenizer):
    '''
    Returns <class transformers.tokenization_utils_base.BatchEncoding> with the following fields:
    - input_ids: list of token ids
    - token_type_ids: list of token type ids
    - attention_mask: list of indices (0,1) specifying which tokens should considered by the model (return_attention_mask = True).
    '''
    return tokenizer.encode_plus(
                        input_text,
                        add_special_tokens = True,
                        max_length = 300,
                        padding = 'max_length',
                        return_attention_mask = True,
                        return_token_type_ids = True,  # Add this line
                        return_tensors = 'pt',
                        truncation=True
                   )

for sample in input_files:
    # Original Input
    original_encoding_dict = preprocessing(sample, tokenizer)
    original_input_id.append(original_encoding_dict['input_ids'])
    original_attention_masks.append(original_encoding_dict['attention_mask'])


    # Calculate token type ids
    original_token_type = torch.zeros_like(original_encoding_dict['input_ids'])
    original_token_type[original_encoding_dict['input_ids'] != 0] = 0

    original_token_type_id.append(original_token_type)

original_input_id = torch.cat(original_input_id, dim=0)
original_attention_masks = torch.cat(original_attention_masks, dim=0)
original_token_type_id = torch.cat(original_token_type_id, dim = 0)

labels = test_lyrics.iloc[:, -12:-2].values
labels2 = torch.tensor([3 for _ in range(len(input_files))])  # add a new axis at index 1

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


### Models Evaluation with the adjusted Thresholds:

In [87]:
import re
import os
from sklearn.metrics import multilabel_confusion_matrix as mcm, classification_report
from scipy.special import softmax
import torch

possible_labels = ["care", "harm", "fairness", "cheating", "loyalty", "betrayal",
                   "authority", "subversion", "purity", "degradation"]
predictions = []

for lab_idx, lab in enumerate(possible_labels):
 best_f1 = 0

 for th in np.arange(0.05,1,0.05):
    
  new_labels = []
  for ex in labels:
    if  ex[lab_idx]:
        new_labels.append(1)
    else:
        new_labels.append(0)

  val_set = TensorDataset(original_input_id,
                          original_token_type_id,
                          original_attention_masks,
                          torch.tensor(new_labels),
                          labels2)

  validation_dataloader = DataLoader(
              val_set,
              batch_size = batch_size
          )

  checkpoint_folder = f"../Models/moralBERT_sm_sl/" # change the path 
  model_checkpoint = f"{checkpoint_folder}model_bert_{lab}_{suffix}.bin"
  
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  
  # Initialize the base model outside of the training loop.
  model = AdversarialBERT(bert_model, moral_label=num_labels,
                          domain_label=num_domains, domain_weight=dw, moral_weight=1,
                          reconstruction_weight=rw, identity_weight=iw,
                          alpha=0, class_weight=[0, 0],
                          freeze_bert=False).to(device)

  model.load_state_dict(torch.load(model_checkpoint))

  ex_id = 0
    

  model.eval()
  val_loss, nb_val_examples, nb_val_steps = 0, 0, 0
  y_true, y_pred = [], []

  for batch in validation_dataloader:
      batch = tuple(t.to(device) for t in batch)
      b_input_ids, b_token_type_ids, b_input_mask, b_labels, b_domain_labels = batch

      with torch.no_grad():
          loss, logits = model(b_input_ids, b_token_type_ids, b_input_mask, b_labels, b_domain_labels, test=True)

          val_loss += loss.item()
          logits = logits.detach().cpu().numpy()
          nb_val_examples += b_input_ids.size(0)
          nb_val_steps += 1
          label_ids = b_labels.to('cpu').numpy()
          predicted_labels = [l[1]>th for l in softmax(logits, axis=1)]
          y_true.extend(label_ids)
          y_pred.extend(predicted_labels)

  f1 = f1_score(y_true, y_pred, average="binary")
  if f1>best_f1:
    best_f1 = f1
    best_y = y_pred.copy()
    best_th = th


 if not lab_idx:
    for l, g in zip(best_y, y_true):
        predictions.append({"pred_"+lab:l, "true_"+lab:g,"id":ex_id})
        ex_id += 1
 else:
    for l, g in zip(best_y, y_true):
      predictions[ex_id]["pred_"+lab] = l
      predictions[ex_id]["true_"+lab] = g
      ex_id += 1

 print('Evaluation')

 # Single-LABEL CLASSIFICATION REPORT
 #################################
 print(f"best threshold: {best_th}")
 target_names = [f"Non-{lab}", lab]
 report = classification_report(y_true, best_y, target_names=target_names)
 f1 = f1_score(y_true, best_y, average="binary")
 print("\nClassification Report:")
 print(report)
 #################################

pred_df = pd.DataFrame(predictions)


 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invari

 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
Evaluation
best threshold: 0.1

Classification Report:
              precision    recall  f1-score   support

 Non-loyalty       0.84      0.99      0.91       163
     loyalty       0.75      0.16      0.27        37

    accuracy                           0.83       200
   macro avg       0.79      0.57      0.59       200
weighted avg       0.82      0.83      0.79       200

 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.inva

 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invariant_trans  Linear(in_features=768, out_features=768, bias=True)
 self.invari

### Bootstraping (Sampling Technique):

In [89]:
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score
from sklearn.utils import resample
import numpy as np
import pandas as pd

possible_labels = ["care", "harm", "fairness", "cheating", "loyalty", "betrayal",
                   "authority", "subversion", "purity", "degradation"]



n_bootstrap_iters = 1000  # Number of bootstrap iterations
bootstrap_results = {label: {metric: [] for metric in ["F1 (Binary)", "F1 (Weighted)", "Precision (Binary)", "Precision (Weighted)", "Recall (Binary)", "Recall (Weighted)", "Accuracy"]} for label in possible_labels}

# Bootstrap loop
for _ in range(n_bootstrap_iters):
    for lab in possible_labels:
        # Resample with replacement
        sample_indices = resample(np.arange(len(test_lyrics)), replace=True)
        true = test_lyrics.loc[sample_indices, lab].values
        candidate = pred_df.loc[sample_indices, f"pred_{lab}"].values
        
        # Compute metrics for bootstrap sample
        bootstrap_results[lab]["F1 (Binary)"].append(f1_score(true, candidate, average="binary", zero_division=0))
        bootstrap_results[lab]["F1 (Weighted)"].append(f1_score(true, candidate, average="weighted", zero_division=0))
        bootstrap_results[lab]["Precision (Binary)"].append(precision_score(true, candidate, average="binary", zero_division=0))
        bootstrap_results[lab]["Precision (Weighted)"].append(precision_score(true, candidate, average="weighted", zero_division=0))
        bootstrap_results[lab]["Recall (Binary)"].append(recall_score(true, candidate, average="binary", zero_division=0))
        bootstrap_results[lab]["Recall (Weighted)"].append(recall_score(true, candidate, average="weighted", zero_division=0))
        bootstrap_results[lab]["Accuracy"].append(accuracy_score(true, candidate))

# Calculate standard deviations from bootstrap results
std_devs = {label: {metric: np.std(values) for metric, values in metrics.items()} for label, metrics in bootstrap_results.items()}

# Calculate original metrics and include standard deviations
final_results = []
for lab in possible_labels:
    result = {"Moral Value": lab}
    true = test_lyrics[lab].values
    candidate = pred_df[f"pred_{lab}"].values
    
    # Original metrics
    result["F1 Score (Binary)"] = f"{f1_score(true, candidate, average='binary', zero_division=0):.2f} ± {std_devs[lab]['F1 (Binary)']:.2f}"
    result["F1 Score (Weighted)"] = f"{f1_score(true, candidate, average='weighted', zero_division=0):.2f} ± {std_devs[lab]['F1 (Weighted)']:.2f}"
    result["Precision Score (Binary)"] = f"{precision_score(true, candidate, average='binary', zero_division=0):.2f} ± {std_devs[lab]['Precision (Binary)']:.2f}"
    result["Precision Score (Weighted)"] = f"{precision_score(true, candidate, average='weighted', zero_division=0):.2f} ± {std_devs[lab]['Precision (Weighted)']:.2f}"
    result["Recall Score (Binary)"] = f"{recall_score(true, candidate, average='binary', zero_division=0):.2f} ± {std_devs[lab]['Recall (Binary)']:.2f}"
    result["Recall Score (Weighted)"] = f"{recall_score(true, candidate, average='weighted', zero_division=0):.2f} ± {std_devs[lab]['Recall (Weighted)']:.2f}"
    result["Accuracy"] = f"{accuracy_score(true, candidate):.2f} ± {std_devs[lab]['Accuracy']:.2f}"
    
    final_results.append(result)

results_df = pd.DataFrame(final_results)

In [91]:
print("Results of the MoralBERT SL")
results_df

Results of the MoralBERT SL


Unnamed: 0,Moral Value,F1 Score (Binary),F1 Score (Weighted),Precision Score (Binary),Precision Score (Weighted),Recall Score (Binary),Recall Score (Weighted),Accuracy
0,care,0.75 ± 0.04,0.83 ± 0.03,0.64 ± 0.05,0.86 ± 0.02,0.92 ± 0.04,0.82 ± 0.03,0.82 ± 0.03
1,harm,0.69 ± 0.04,0.70 ± 0.03,0.54 ± 0.05,0.81 ± 0.02,0.94 ± 0.03,0.70 ± 0.03,0.70 ± 0.03
2,fairness,0.38 ± 0.06,0.74 ± 0.03,0.29 ± 0.06,0.80 ± 0.03,0.56 ± 0.09,0.70 ± 0.03,0.70 ± 0.03
3,cheating,0.32 ± 0.06,0.69 ± 0.03,0.20 ± 0.04,0.88 ± 0.03,0.82 ± 0.09,0.62 ± 0.03,0.62 ± 0.03
4,loyalty,0.27 ± 0.09,0.79 ± 0.03,0.75 ± 0.16,0.82 ± 0.04,0.16 ± 0.06,0.83 ± 0.03,0.83 ± 0.03
5,betrayal,0.37 ± 0.08,0.84 ± 0.02,0.27 ± 0.07,0.88 ± 0.03,0.58 ± 0.11,0.81 ± 0.03,0.81 ± 0.03
6,authority,0.39 ± 0.09,0.84 ± 0.03,0.53 ± 0.13,0.84 ± 0.03,0.31 ± 0.09,0.86 ± 0.02,0.86 ± 0.02
7,subversion,0.43 ± 0.06,0.71 ± 0.03,0.29 ± 0.05,0.86 ± 0.03,0.83 ± 0.07,0.67 ± 0.03,0.67 ± 0.03
8,purity,0.63 ± 0.08,0.90 ± 0.02,0.67 ± 0.09,0.90 ± 0.02,0.59 ± 0.09,0.91 ± 0.02,0.91 ± 0.02
9,degradation,0.32 ± 0.10,0.86 ± 0.03,0.35 ± 0.12,0.86 ± 0.03,0.29 ± 0.10,0.87 ± 0.02,0.87 ± 0.02
