In [None]:
import json
from torch.utils.data import Dataset

class Task1Dataset(Dataset):

    """
    Dataset provided for SemEval 2024 Task 1 Challenge
    """

    def __init__(self, path, class_list):

        """
        path: path to the json file containing data samples
        class_list: list of classes present for a task
        """

        self.path = path
        self.class_list = read_classes(class_list)

        with open(self.path, 'r') as file:
            self.data = json.load(file)



    def __getitem__(self, index):

        sample = self.data[index]

        sample_id = sample['id']
        text = sample['text']
        labels = sample['labels']

        labels_id = [self.class_list.index(x) for x in labels]

        return sample_id, text, labels_id



    def __len__(self):
        return len(self.data)



In [None]:
def read_classes(file_path):

    classes = []

    with open(file_path, 'r', encoding='utf8') as f:

        for label in f.readlines():
            label = label.strip()

            if label:
                classes.append(label)

    return classes


In [None]:
import pdb
import json
import logging.handlers
import argparse
import os
import numpy as np
from sklearn.metrics import f1_score
from sklearn.preprocessing import MultiLabelBinarizer
from networkx import DiGraph, relabel_nodes, all_pairs_shortest_path_length
from sklearn_hierarchical_classification.constants import ROOT
from sklearn_hierarchical_classification.metrics import h_fbeta_score, h_recall_score, h_precision_score, \
    fill_ancestors, multi_labeled
import sys
sys.path.append('.')


KEYS = ['id','labels']
logger = logging.getLogger("subtask_1_2a_scorer")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.setLevel(logging.INFO)
#logging.basicConfig(format='%(levelname)s : %(message)s', level=logging.INFO)

G = DiGraph()
G.add_edge(ROOT, "Logos")
G.add_edge("Logos", "Repetition")
G.add_edge("Logos", "Obfuscation, Intentional vagueness, Confusion")
G.add_edge("Logos", "Reasoning")
G.add_edge("Logos", "Justification")
G.add_edge('Justification', "Slogans")
G.add_edge('Justification', "Bandwagon")
G.add_edge('Justification', "Appeal to authority")
G.add_edge('Justification', "Flag-waving")
G.add_edge('Justification', "Appeal to fear/prejudice")
G.add_edge('Reasoning', "Simplification")
G.add_edge('Simplification', "Causal Oversimplification")
G.add_edge('Simplification', "Black-and-white Fallacy/Dictatorship")
G.add_edge('Simplification', "Thought-terminating cliché")
G.add_edge('Reasoning', "Distraction")
G.add_edge('Distraction', "Misrepresentation of Someone's Position (Straw Man)")
G.add_edge('Distraction', "Presenting Irrelevant Data (Red Herring)")
G.add_edge('Distraction', "Whataboutism")
G.add_edge(ROOT, "Ethos")
G.add_edge('Ethos', "Appeal to authority")
G.add_edge('Ethos', "Glittering generalities (Virtue)")
G.add_edge('Ethos', "Bandwagon")
G.add_edge('Ethos', "Ad Hominem")
G.add_edge('Ethos', "Transfer")
G.add_edge('Ad Hominem', "Doubt")
G.add_edge('Ad Hominem', "Name calling/Labeling")
G.add_edge('Ad Hominem', "Smears")
G.add_edge('Ad Hominem', "Reductio ad hitlerum")
G.add_edge('Ad Hominem', "Whataboutism")
G.add_edge(ROOT, "Pathos")
G.add_edge('Pathos', "Exaggeration/Minimisation")
G.add_edge('Pathos', "Loaded Language")
G.add_edge('Pathos', "Appeal to (Strong) Emotions")
G.add_edge('Pathos', "Appeal to fear/prejudice")
G.add_edge('Pathos', "Flag-waving")
G.add_edge('Pathos', "Transfer")

def get_all_classes_from_graph(graph):
    return [
        node
        for node in graph.nodes
        if node != ROOT
        ]

def _h_fbeta_score(y_true, y_pred, class_hierarchy, beta=1., root=ROOT):
    hP = _h_precision_score(y_true, y_pred, class_hierarchy, root=root)
    hR = _h_recall_score(y_true, y_pred, class_hierarchy, root=root)

    if hP == 0 and hR == 0:
       return 0

    return (1. + beta ** 2.) * hP * hR / (beta ** 2. * hP + hR)

def _fill_ancestors(y, graph, root, copy=True):
    y_ = y.copy() if copy else y
    paths = all_pairs_shortest_path_length(graph.reverse(copy=False))
    for target, distances in paths:
        if target == root:
            continue
        ix_rows = np.where(y[:, target] > 0)[0]
        ancestors = list(filter(lambda x: x != ROOT,distances.keys()))
        y_[tuple(np.meshgrid(ix_rows, ancestors))] = 1
    graph.reverse(copy=False)
    return y_

def _h_recall_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_positives = np.count_nonzero(y_true_)

    if all_positives == 0:
        return 0

    return true_positives / all_positives

def _h_precision_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_results = np.count_nonzero(y_pred_)

    if all_results == 0:
      return 0

    return true_positives / all_results

def read_classes(file_path):
  CLASSES = []
  with open(file_path) as f:
    for label in f.readlines():
      label = label.strip()
      if label:
        CLASSES.append(label)
  return CLASSES

def check_format(file_path):
  _classes = get_all_classes_from_graph(G)
  if not os.path.exists(file_path):
    logging.error("File doesnt exists: {}".format(file_path))
    return False
  submmission = ''
  try:
    with open(file_path, encoding='utf-8') as p:
      submission = json.load(p)
  except:
    logging.error("File is not a valid json file: {}".format(file_path))
    return False
  for i, obj in enumerate(submission):
    for key in KEYS:
      if key not in obj:
        logging.error("Missing entry in {}:{}".format(file_path, i))
        return False
  for label in list(obj['labels']):
       if label not in _classes:
         print(label)
         logging.error("Unknown Label in {}:{}".format(file_path, i))
         return False
  return True

def _read_gold_and_pred(pred_fpath, gold_fpath):
  """
  Read gold and predicted data.
  :param pred_fpath: a json file with predictions,
  :param gold_fpath: the original annotated gold file.
  :return: {id:pred_labels} dict; {id:gold_labels} dict
  """

  gold_labels = {}
  with open(gold_fpath, encoding='utf-8') as gold_f:
    gold = json.load(gold_f)
    for obj in gold:
      gold_labels[obj['id']] = obj['labels']

  pred_labels = {}
  with open(pred_fpath, encoding='utf-8') as pred_f:
    pred = json.load(pred_f)
    for obj in pred:
      pred_labels[obj['id']] = obj['labels']

  if set(gold_labels.keys()) != set(pred_labels.keys()):
      logger.error('There are either missing or added examples to the prediction file. Make sure you only have the gold examples in the prediction file.')
      raise ValueError('There are either missing or added examples to the prediction file. Make sure you only have the gold examples in the prediction file.')

  return pred_labels, gold_labels

# def evaluate_h(pred_fpath, gold_fpath):
def evaluate_h(pred_file, gold_file):
    if validate_files(pred_file, gold_file):
      pred_labels, gold_labels = _read_gold_and_pred(pred_file, gold_file)

      gold = []
      pred = []
      for id in gold_labels:
          gold.append(gold_labels[id])
          pred.append(pred_labels[id])
      with multi_labeled(gold, pred, G) as (gold_, pred_, graph_):
          return  _h_precision_score(gold_, pred_,graph_), _h_recall_score(gold_, pred_,graph_), _h_fbeta_score(gold_, pred_,graph_)

def validate_files(pred_files, gold_files):
  if not check_format(pred_files):
    logger.error('Bad format for pred file {}. Cannot score.'.format(pred_files))
    return False
  return True


In [None]:
import torch
#import task1_dataset
from transformers import BertTokenizer


class Collate:


    def __init__(self, class_list):

        self.tokenizer = BertTokenizer.from_pretrained("bert-base-cased")
        self.class_list = read_classes(class_list)


    def __call__(self, data):

        ids, texts, labels = zip(*data)

        tokenized_text = []
        attention_masks = []

        for sent in texts:
            preprocessed_sent = sent.replace('\\n', ' ').strip()

            encoded_sent = self.tokenizer.encode_plus(
                text=preprocessed_sent,
                add_special_tokens=True,
                # max_length=512, #self.max_length,
                padding='max_length',
                truncation=True,
                return_attention_mask=True
            )

            tokenized_text.append(encoded_sent.get('input_ids'))
            attention_masks.append(encoded_sent.get('attention_mask'))


        text_input = torch.tensor(tokenized_text)
        attention_masks = torch.tensor(attention_masks)

        texts_len = len(texts)
        labels_output = torch.zeros(texts_len, len(self.class_list))

        # creating one-hot vector of labels for multi-label classification
        for lo, c in zip(labels_output, labels):
            lo[c] = 1


        return ids, text_input, attention_masks, labels_output




In [None]:
import torch
import torch.nn as nn
from transformers import RobertaConfig, RobertaModel


class TextEncoderRoBERTa(nn.Module):

    def __init__(self):
        super(TextEncoderRoBERTa, self).__init__()

        self.bert_config = RobertaConfig.from_pretrained('roberta-base',
                                                 output_hidden_states=True,
                                                 num_hidden_layers=10)

        self.bert_model = RobertaModel.from_pretrained('roberta-base', config=self.bert_config)

        self.classifier = nn.Linear(self.bert_config.hidden_size, 20)



    def forward(self, text_inputs, attention_mask):

        outputs = self.bert_model(text_inputs, attention_mask=attention_mask)

        sequence_output = outputs[0][:,0,:]  # Take the output of the [CLS] token for classification

        logits = self.classifier(sequence_output)  # Pass through the classifier
        # probs = torch.sigmoid(logits)  # Apply sigmoid to get probabilities for each label

        # return probs
        return logits

In [None]:
import os
import json
import numpy as np
from tqdm import tqdm
from datetime import datetime

import torch
import torch.nn as nn
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import MultiStepLR

from sklearn.metrics import f1_score
from sklearn.preprocessing import MultiLabelBinarizer

# import task1_dataset
# from task1_dataset import Task1Dataset
# from task1_collate import Collate
# from task1_bert_model import TextEncoderBERT

# import sys
# sys.path.append('scorer-baseline')
# import subtask_1_2a


def evaluate(pred, gold, CLASSES):

    mlb = MultiLabelBinarizer()
    mlb.fit([CLASSES])

    gold = mlb.transform(gold)
    pred = mlb.transform(pred)

    macro_f1 = f1_score(gold, pred, average="macro", zero_division=1)
    micro_f1 = f1_score(gold, pred, average="micro", zero_division=1)

    return macro_f1, micro_f1



def train():

    task1_classlist = '/path_to/task1_class_list.txt'

    traindata_path = '/path_to/train.json'
    train_dataset = Task1Dataset(traindata_path, task1_classlist)

    valdata_path = '/path_to_subtask1/validation.json'
    val_dataset = Task1Dataset(valdata_path, task1_classlist)

    collate_fn = Collate(task1_classlist)

    train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=mp.cpu_count(), collate_fn=collate_fn)
    val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=mp.cpu_count(), collate_fn=collate_fn)

    model_name = 'roberta'
    model = TextEncoderRoberta()
    #model = torch.nn.DataParallel(model)
    model = model.cuda()

    criterion = nn.BCEWithLogitsLoss()

    optimizer = torch.optim.AdamW([
                              {"params": model.bert_model.parameters(), "lr": 0.00002},
                              {"params": model.classifier.parameters(), "lr": 0.0002}],lr=0.0002)

    scheduler = MultiStepLR(optimizer, milestones=[4, 8], gamma=0.8)



    # used to create directory for saving checkpoints
    cur_date_time = datetime.now()
    date_time_str = cur_date_time.strftime('%Y-%m-%d_%H-%M')
    run_dir = f"task1_checkpoints/run_{date_time_str}"

    if not os.path.exists(run_dir):
        os.makedirs(run_dir)


    losses = []
    num_epochs = 10
    best_micro_f1 = 0
    best_epoch = 0


    for epoch in range(num_epochs):

        model.train()

        steps = 0
        progress_bar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False)


        for batch in progress_bar:

            ids, text_input, attention_masks, labels_output = batch

            text = text_input.cuda()
            labels = labels_output.cuda()
            attention_masks = attention_masks.cuda()

            optimizer.zero_grad()

            logits = model(text, attention_masks)
            loss = criterion(logits, labels.float())

            losses.append(loss.item())
            loss.backward()
            optimizer.step()


            steps += 1



        mean_loss = torch.mean(torch.tensor(losses)).item()

        print ("=========================== Epoch: ", epoch+1," | Loss: ", mean_loss, "===========================\n")

        # if (epoch) % 2 == 0:

        metrics = validate(val_dataloader, model, run_dir)
        print("Validation: ", metrics, "\n")

        if metrics['h_f1_score = '] > best_micro_f1:

            best_micro_f1 = metrics['h_f1_score = ']
            best_epoch = epoch


            torch.save({
                'state_dict': model.state_dict(),
                'optimizer_dict': optimizer.state_dict(),
                'epoch': best_epoch,
            }, run_dir +'/'+ model_name + '_' + f"{best_micro_f1:.4f}" + '.pth')


            lines_to_write = ["=========================== Epoch: ", str(epoch+1), " | Loss: ", str(mean_loss), "===========================\n",
                              "Validation: ", str(metrics), "\n"]

            with open(run_dir +'/best.txt', 'w') as f:
                f.writelines(lines_to_write)


        scheduler.step()




    # testing

    devdata_path = '/path_to/dev_subtask1_en.json'

    dev_dataset = Task1Dataset(devdata_path, task1_classlist)

    dev_dataloader = DataLoader(dev_dataset, batch_size=8, shuffle=False, num_workers=mp.cpu_count(), collate_fn=collate_fn)



    test_run_dir = f"task1_evaluation/run_{date_time_str}"

    if not os.path.exists(test_run_dir):
        os.makedirs(test_run_dir)



    test_metrics = validate(dev_dataloader, model, test_run_dir)

    print("Test: ", test_metrics, "\n")



    test_lines_to_write = ["Test: ", str(test_metrics), "\n"]

    with open(run_dir +'/test_results.txt', 'w') as f:
        f.writelines(test_lines_to_write)




def validate(val_dataloader, model, run_dir):

    model.eval()

    predictions = []
    true_labels = []
    ids_list = []

    pred_list = []
    gold_list = []

    metrics = {}

    valdata_classlist = '/path_to/task1_class_list.txt'
    classes_list = read_classes(valdata_classlist)

    progress_bar = tqdm(val_dataloader, desc='Validation', leave=False)

    for batch in progress_bar:

        ids, text_input, attention_masks, labels_output = batch

        if torch.cuda.is_available():
            text = text_input.cuda()
            labels = labels_output.cuda()
            attention_masks = attention_masks.cuda()

        with torch.no_grad():
            pred_probs = model(text, attention_masks)
            pred_classes = (pred_probs > 0.5).long()


        predictions.extend(pred_classes.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        ids_list.extend(ids)

    # Convert binary arrays to label sets for evaluation
    preds = [set(classes_list[i] for i, label in enumerate(sample) if label == 1) for sample in predictions]
    gts = [set(classes_list[i] for i, label in enumerate(sample) if label == 1) for sample in true_labels]


    for id, pred in zip(ids_list, preds):    # loop over every element of the batch
        pred_list.append({'id': id, 'labels': list(pred)})

    preds_json = run_dir +'/preds.json'
    with open(preds_json, 'w') as f:
        json.dump(pred_list, f)



    for id, gt in zip(ids_list, gts):    # loop over every element of the batch
        gold_list.append({'id': id, 'labels': list(gt)})

    gt_json = run_dir +'/gold.json'
    with open(gt_json, 'w') as f:
        json.dump(gold_list, f)




    macro_f1, micro_f1 = evaluate(preds, gts, classes_list)

    precision, recall, f1 = evaluate_h(preds_json, gt_json)

    metrics['macroF1 = '] = macro_f1
    metrics['microF1 = '] = micro_f1

    metrics['h_precision = '] = precision
    metrics['h_recall = '] = recall
    metrics['h_f1_score = '] = f1


    return metrics






if __name__ == '__main__':
    train()

In [None]:
def evaluate(pred, gold, CLASSES):

    mlb = MultiLabelBinarizer()
    mlb.fit([CLASSES])

    gold = mlb.transform(gold)
    pred = mlb.transform(pred)

    macro_f1 = f1_score(gold, pred, average="macro", zero_division=1)
    micro_f1 = f1_score(gold, pred, average="micro", zero_division=1)

    return macro_f1, micro_f1

In [None]:
import os
import json
import numpy as np
from tqdm import tqdm
from datetime import datetime

import torch
import torch.nn as nn
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import MultiStepLR

from sklearn.metrics import f1_score
from sklearn.preprocessing import MultiLabelBinarizer

# import task1_dataset
# from task1_dataset import Task1Dataset
# from task1_collate import Collate
# from task1_bert_model import TextEncoderBERT

# import sys
# sys.path.append('scorer-baseline')
# import subtask_1_2a




def validate(val_dataloader, model):

    model.eval()

    predictions = []
    true_labels = []
    ids_list = []

    pred_list = []
    gold_list = []

    metrics = {}

    valdata_classlist = '/path_to/task1_class_list.txt'
    classes_list = read_classes(valdata_classlist)

    progress_bar = tqdm(val_dataloader, desc='Validation', leave=False)

    for batch in progress_bar:

        ids, text_input, attention_masks, labels_output = batch

        if torch.cuda.is_available():
            text = text_input.cuda()
            labels = labels_output.cuda()
            attention_masks = attention_masks.cuda()

        with torch.no_grad():
            pred_probs = model(text, attention_masks)
            pred_classes = (pred_probs > 0.5).long()


        predictions.extend(pred_classes.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        ids_list.extend(ids)

    # Convert binary arrays to label sets for evaluation
    preds = [set(classes_list[i] for i, label in enumerate(sample) if label == 1) for sample in predictions]
    gts = [set(classes_list[i] for i, label in enumerate(sample) if label == 1) for sample in true_labels]


    for id, pred in zip(ids_list, preds):    # loop over every element of the batch
        pred_list.append({'id': id, 'labels': list(pred)})

    preds_json = '/preds.json'
    with open(preds_json, 'w') as f:
        json.dump(pred_list, f)



    for id, gt in zip(ids_list, gts):    # loop over every element of the batch
        gold_list.append({'id': id, 'labels': list(gt)})

    gt_json = '/gold.json'
    with open(gt_json, 'w') as f:
        json.dump(gold_list, f)




    macro_f1, micro_f1 = evaluate(preds, gts, classes_list)

    precision, recall, f1 = evaluate_h(preds_json, gt_json)

    metrics['macroF1 = '] = macro_f1
    metrics['microF1 = '] = micro_f1

    metrics['h_precision = '] = precision
    metrics['h_recall = '] = recall
    metrics['h_f1_score = '] = f1


    return metrics



In [None]:
# testing

devdata_path = '/path_to/test_subtask1_ar.json'
task1_classlist = '/path_to/task1_class_list.txt'

dev_dataset = Task1Dataset(devdata_path, task1_classlist)

collate_fn = Collate(task1_classlist)

dev_dataloader = DataLoader(dev_dataset, batch_size=8, shuffle=False, num_workers=mp.cpu_count(), collate_fn=collate_fn)

model_name = 'roberta'
model = TextEncoderRoBERTa()

model_ckpt_path = '/path_to/Multilingual/roberta_0.5978.pth'


# Load the checkpoint
checkpoint = torch.load(model_ckpt_path, map_location='cpu')

# Check if the model was saved with DataParallel or DistributedDataParallel
if list(checkpoint.keys())[0].startswith('module.'):
    # Create a new state dict without the 'module.' prefix
    new_state_dict = {k.replace('module.', ''): v for k, v in checkpoint.items()}
else:
    new_state_dict = checkpoint


# Load the modified state dict into your model
model.load_state_dict(new_state_dict, strict=False)

model = torch.nn.DataParallel(model)
model = model.cuda()

test_metrics = validate(dev_dataloader, model)

print(test_metrics)