TODO:
- If a label has a poor confidence, can we swap to predict the next label in the hiearchy instead?

# Calculations for Hierarchy

In [1]:
from sklearn.metrics import f1_score
from sklearn.preprocessing import MultiLabelBinarizer
import networkx
from networkx import DiGraph, relabel_nodes, all_pairs_shortest_path_length
from sklearn_hierarchical_classification.constants import ROOT
from sklearn_hierarchical_classification.metrics import h_fbeta_score, h_recall_score, h_precision_score, fill_ancestors, multi_labeled

In [2]:
G = DiGraph()
G.add_edge(ROOT, "Logos")
G.add_edge("Logos", "Repetition")
G.add_edge("Logos", "Obfuscation, Intentional vagueness, Confusion")
G.add_edge("Logos", "Reasoning")
G.add_edge("Logos", "Justification")
G.add_edge('Justification', "Slogans")
G.add_edge('Justification', "Bandwagon")
G.add_edge('Justification', "Appeal to authority")
G.add_edge('Justification', "Flag-waving")
G.add_edge('Justification', "Appeal to fear/prejudice")
G.add_edge('Reasoning', "Simplification")
G.add_edge('Simplification', "Causal Oversimplification")
G.add_edge('Simplification', "Black-and-white Fallacy/Dictatorship")
G.add_edge('Simplification', "Thought-terminating cliché")
G.add_edge('Reasoning', "Distraction")
G.add_edge('Distraction', "Misrepresentation of Someone's Position (Straw Man)")
G.add_edge('Distraction', "Presenting Irrelevant Data (Red Herring)")
G.add_edge('Distraction', "Whataboutism")
G.add_edge(ROOT, "Ethos")
G.add_edge('Ethos', "Appeal to authority")
G.add_edge('Ethos', "Glittering generalities (Virtue)")
G.add_edge('Ethos', "Bandwagon")
G.add_edge('Ethos', "Ad Hominem")
G.add_edge('Ethos', "Transfer")
G.add_edge('Ad Hominem', "Doubt")
G.add_edge('Ad Hominem', "Name calling/Labeling")
G.add_edge('Ad Hominem', "Smears")
G.add_edge('Ad Hominem', "Reductio ad hitlerum")
G.add_edge('Ad Hominem', "Whataboutism")
G.add_edge(ROOT, "Pathos")
G.add_edge('Pathos', "Exaggeration/Minimisation")
G.add_edge('Pathos', "Loaded Language")
G.add_edge('Pathos', "Appeal to (Strong) Emotions")
G.add_edge('Pathos', "Appeal to fear/prejudice")
G.add_edge('Pathos', "Flag-waving")
G.add_edge('Pathos', "Transfer")

In [12]:
def get_all_classes_from_graph(graph):
    return [
        node
        for node in graph.nodes
        if node != ROOT
        ]

def _h_fbeta_score(y_true, y_pred, class_hierarchy, beta=1., root=ROOT):
    hP = _h_precision_score(y_true, y_pred, class_hierarchy, root=root)
    hR = _h_recall_score(y_true, y_pred, class_hierarchy, root=root)
    return (1. + beta ** 2.) * hP * hR / (beta ** 2. * hP + hR)
    
def _fill_ancestors(y, graph, root, copy=True):
    y_ = y.copy() if copy else y
    paths = all_pairs_shortest_path_length(graph.reverse(copy=False))
    for target, distances in paths:
        if target == root:
            continue
        ix_rows = np.where(y[:, target] > 0)[0]
        ancestors = list(filter(lambda x: x != ROOT,distances.keys()))
        y_[tuple(np.meshgrid(ix_rows, ancestors))] = 1
    graph.reverse(copy=False)
    return y_
    
def _h_recall_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_positives = np.count_nonzero(y_true_)

    return true_positives / all_positives

def _h_precision_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_results = np.count_nonzero(y_pred_)

    return true_positives / all_results

def evaluate_h(gold, pred):
    with multi_labeled(gold, pred, G) as (gold_, pred_, graph_):
        return  _h_precision_score(gold_, pred_,graph_), _h_recall_score(gold_, pred_,graph_), _h_fbeta_score(gold_, pred_,graph_)

# Data

In [5]:
import json
import numpy as np

json_file_path = r'X:\PhD\SemEval Task4\Data\annotations\data\subtask1\train.json'
json_file_path_2 = r'X:\PhD\SemEval Task4\Data\annotations\data\subtask1\validation.json'

# Swap the file opening and data loading statements

with open(json_file_path, 'r', encoding='utf-8') as file:
    data1 = json.load(file)

with open(json_file_path_2, 'r', encoding='utf-8') as file2:
    data2 = json.load(file2)

data = [{**d1, **d2} for d1, d2 in zip(data1, data2)]

labels = [sample.get("labels", []) for sample in data]

# lists to get all labels
all_labels = [label for sublist in labels for label in sublist]

num_unique_labels = len(set(all_labels))
print(f"Number of Unique Labels: {num_unique_labels}")
print("Label Names:", set(all_labels))

Number of Unique Labels: 20
Label Names: {'Smears', 'Appeal to fear/prejudice', 'Name calling/Labeling', 'Bandwagon', 'Reductio ad hitlerum', 'Loaded Language', 'Causal Oversimplification', 'Slogans', 'Doubt', 'Exaggeration/Minimisation', 'Glittering generalities (Virtue)', 'Whataboutism', 'Repetition', 'Appeal to authority', 'Thought-terminating cliché', 'Presenting Irrelevant Data (Red Herring)', 'Black-and-white Fallacy/Dictatorship', 'Flag-waving', "Misrepresentation of Someone's Position (Straw Man)", 'Obfuscation, Intentional vagueness, Confusion'}


# Models

In [50]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from transformers import BertTokenizer, TFBertModel
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, GlobalAveragePooling1D
import tensorflow as tf
import keras
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import precision_recall_fscore_support, classification_report

class MemeClassification:
    def __init__(self):
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.bert_model = TFBertModel.from_pretrained('bert-base-uncased')
        self.model = None

    def load_data(self, json_file_path, json_file_path_2):
        with open(json_file_path, 'r', encoding='utf-8') as file:
            data1 = json.load(file)

        with open(json_file_path_2, 'r', encoding='utf-8') as file2:
            data2 = json.load(file2)
        data = [{**d1, **d2} for d1, d2 in zip(data1, data2)]
        
        return data

    def preprocess_data(self, data):
        texts = [sample["text"] for sample in data]
        labels = [sample.get("labels", []) for sample in data]
        
        #checks if labels are empty and assign a default label 
        default_label = ['None']  
        labels = [label if label else default_label for label in labels]

        #converting labels to one hot encoded
        if any(isinstance(label, list) for label in labels):
            self.label_binarizer = MultiLabelBinarizer()
            y = self.label_binarizer.fit_transform(labels)
        else:
            y = np.array(labels)
        return texts, y

    def tokenize_and_pad(self, texts):
        input_ids = []
        attention_masks = []

        for text in texts:
            encoded_text = self.tokenizer(text, max_length=128, truncation=True, padding='max_length', return_tensors='tf')
            input_ids.append(encoded_text['input_ids'])
            attention_masks.append(encoded_text['attention_mask'])

        input_ids = np.concatenate(input_ids, axis=0)
        attention_masks = np.concatenate(attention_masks, axis=0)

        return input_ids, attention_masks

    def build_model(self, num_classes=21):
        input_ids_input = Input(shape=(128,), dtype=tf.int32, name="input_ids")
        attention_masks_input = Input(shape=(128,), dtype=tf.int32, name="attention_masks")

        bert_output = self.bert_model(input_ids_input, attention_mask=attention_masks_input)[0]
        pooled_output = GlobalAveragePooling1D()(bert_output)
        drop = tf.keras.layers.Dropout(0.4)(pooled_output)
        output_layer = Dense(num_classes, activation='sigmoid')(drop)

        opt = tf.keras.optimizers.Adam(lr=1e-5,
                                    decay=0.01,
                                    beta_1=0.9,
                                    beta_2=0.999,
                                    epsilon=1e-07,)

        self.model = Model(inputs=[input_ids_input, attention_masks_input], outputs=output_layer)
        self.model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

    def train_model(self, json_file_path, json_file_path2, num_classes=21, epochs=3, batch_size=8, test_size=0.3, random_state=42):
        data = self.load_data(json_file_path, json_file_path_2)
        texts, y = self.preprocess_data(data)
        input_ids, attention_masks = self.tokenize_and_pad(texts)

        # Split the data
        X_train, X_temp, y_train, y_temp, attention_masks_train, attention_masks_temp = train_test_split(
            input_ids, y, attention_masks, test_size=test_size, random_state=random_state, shuffle=True
        )

        X_val, X_test, y_val, y_test, attention_masks_val, attention_masks_test = train_test_split(
            X_temp, y_temp, attention_masks_temp, test_size=0.1, random_state=random_state, shuffle=True
        )
        
        print("Shapes:")
        print("X_train:", X_train.shape)
        print("y_train:", y_train.shape)
        print("X_val:", X_val.shape)
        print("y_val:", y_val.shape)
        
        # Build and compile the model
        self.build_model(num_classes)
        
        # Train the model
        history = self.model.fit(
            [X_train, attention_masks_train],
            y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=([X_val, attention_masks_val], y_val),
        )
        
        return history, X_test, attention_masks_test, y_test
    
    def plot_training_history(self, history):
        plt.figure(figsize=(10, 4))
        
        #plot accuracy
        plt.subplot(1, 2, 1)
        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Model accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend(['Train', 'Validation'], loc='upper left')

        #plot loss 
        plt.subplot(1, 2, 2)
        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Model loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend(['Train', 'Validation'], loc='upper left')
        
        plt.tight_layout()
        plt.show()      

    def evaluate_with_hierarchy(self, X_test, attention_masks_test, y_test):
        print("X_test:", X_test.shape)
        print("y_test:", y_test.shape)

        predictions = self.model.predict([X_test, attention_masks_test])

        total_precision = 0.0
        total_recall = 0.0
        total_samples = len(y_test)

        predicted_labels_all = []
        true_labels_all = []

        for i in range(total_samples):
            predicted_probabilities = predictions[i]

            #decoding predicted labels
            
            predicted_labels = [self.label_binarizer.classes_[j] for j in range(len(self.label_binarizer.classes_)) 
                                 if predicted_probabilities[j] > 0.5]
            gold_labels = [self.label_binarizer.classes_[j] for j in range(len(self.label_binarizer.classes_)) if
                           y_test[i][j] == 1]

       

            predicted_labels_all.append(predicted_labels)
            true_labels_all.append(gold_labels)
    
        true_labels_all_binary = self.label_binarizer.transform(true_labels_all)
        predicted_labels_all_binary = self.label_binarizer.transform(predicted_labels_all)
        
        precision, recall, f1 = evaluate_h(true_labels_all, predicted_labels_all)
        print("Classification Report:")
        print(classification_report(true_labels_all_binary, predicted_labels_all_binary, target_names=self.label_binarizer.classes_))
        print(f'Average Precision: {precision}, Average Recall: {recall}, Average F1: {f1}')

        return precision, recall, f1




In [51]:
texts, y = meme_classifier.preprocess_data(data)
print("Processed Texts:", texts[:1]) 
print("Processed Labels:", y[:1])

Processed Texts: ['Critical Thinking Essentials\\n\\Are my biases affecting how I examine the issue?\\\\n\\n\\Am I using information that can be verified with reliable data?\\\\n\\n\\Am I basing my position on what I KNOW to be the truth, or what I WANT to be the truth?\\\\n\\n\\I might be wrong.\\ (A little humility goes a long way.)\\n']
Processed Labels: [[0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0]]


In [52]:
# Tokenize and pad
input_ids, attention_masks = meme_classifier.tokenize_and_pad(texts)
print("Tokenized Input IDs:", input_ids.shape)  
print("Attention Masks:", attention_masks.shape)
# Display a few examples
num_examples = 3
for i in range(num_examples): 
    print(f"Example {i + 1}:")
    print("Tokenized Input IDs:", input_ids[i])
    print("Attention Masks:", attention_masks[i])
    print("\n")

Tokenized Input IDs: (500, 128)
Attention Masks: (500, 128)
Example 1:
Tokenized Input IDs: [  101  4187  3241  6827  2015  1032  1050  1032  2024  2026 13827  2229
 12473  2129  1045 11628  1996  3277  1029  1032  1032  1050  1032  1050
  1032  2572  1045  2478  2592  2008  2064  2022 20119  2007 10539  2951
  1029  1032  1032  1050  1032  1050  1032  2572  1045  6403  2290  2026
  2597  2006  2054  1045  2113  2000  2022  1996  3606  1010  2030  2054
  1045  2215  2000  2022  1996  3606  1029  1032  1032  1050  1032  1050
  1032  1045  2453  2022  3308  1012  1032  1006  1037  2210 14910 15148
  3632  1037  2146  2126  1012  1007  1032  1050   102     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0]
Attention Masks: [1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1

In [53]:
meme_classifier = MemeClassification()
history, X_test, attention_masks_test, y_test = meme_classifier.train_model(json_file_path, json_file_path_2)

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions w

Shapes:
X_train: (350, 128)
y_train: (350, 21)
X_val: (135, 128)
y_val: (135, 21)
Epoch 1/3


  super().__init__(name, **kwargs)


Epoch 2/3
Epoch 3/3


In [55]:
precision, recall, f1 = meme_classifier.evaluate_with_hierarchy(X_test, attention_masks_test, y_test)

X_test: (15, 128)
y_test: (15, 21)
Classification Report:
                                                     precision    recall  f1-score   support

                                Appeal to authority       0.13      1.00      0.24         2
                           Appeal to fear/prejudice       0.00      0.00      0.00         0
                                          Bandwagon       0.50      0.50      0.50         2
               Black-and-white Fallacy/Dictatorship       0.07      1.00      0.12         1
                          Causal Oversimplification       0.00      0.00      0.00         0
                                              Doubt       0.00      0.00      0.00         0
                          Exaggeration/Minimisation       0.13      1.00      0.24         2
                                        Flag-waving       0.13      1.00      0.24         2
                   Glittering generalities (Virtue)       0.07      1.00      0.12         1
           

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


# F1 Hierarchy Outside the Model

In [56]:
from sklearn.metrics import f1_score
from sklearn.preprocessing import MultiLabelBinarizer
import networkx
from networkx import DiGraph, relabel_nodes, all_pairs_shortest_path_length
from sklearn_hierarchical_classification.constants import ROOT
from sklearn_hierarchical_classification.metrics import h_fbeta_score, h_recall_score, h_precision_score, fill_ancestors, multi_labeled

In [57]:
# label tree in graph format

# there is no 'none' label in this tree

G = DiGraph()
G.add_edge(ROOT, "Logos")
G.add_edge("Logos", "Repetition")
G.add_edge("Logos", "Obfuscation, Intentional vagueness, Confusion")
G.add_edge("Logos", "Reasoning")
G.add_edge("Logos", "Justification")
G.add_edge('Justification', "Slogans")
G.add_edge('Justification', "Bandwagon")
G.add_edge('Justification', "Appeal to authority")
G.add_edge('Justification', "Flag-waving")
G.add_edge('Justification', "Appeal to fear/prejudice")
G.add_edge('Reasoning', "Simplification")
G.add_edge('Simplification', "Causal Oversimplification")
G.add_edge('Simplification', "Black-and-white Fallacy/Dictatorship")
G.add_edge('Simplification', "Thought-terminating cliché")
G.add_edge('Reasoning', "Distraction")
G.add_edge('Distraction', "Misrepresentation of Someone's Position (Straw Man)")
G.add_edge('Distraction', "Presenting Irrelevant Data (Red Herring)")
G.add_edge('Distraction', "Whataboutism")
G.add_edge(ROOT, "Ethos")
G.add_edge('Ethos', "Appeal to authority")
G.add_edge('Ethos', "Glittering generalities (Virtue)")
G.add_edge('Ethos', "Bandwagon")
G.add_edge('Ethos', "Ad Hominem")
G.add_edge('Ethos', "Transfer")
G.add_edge('Ad Hominem', "Doubt")
G.add_edge('Ad Hominem', "Name calling/Labeling")
G.add_edge('Ad Hominem', "Smears")
G.add_edge('Ad Hominem', "Reductio ad hitlerum")
G.add_edge('Ad Hominem', "Whataboutism")
G.add_edge(ROOT, "Pathos")
G.add_edge('Pathos', "Exaggeration/Minimisation")
G.add_edge('Pathos', "Loaded Language")
G.add_edge('Pathos', "Appeal to (Strong) Emotions")
G.add_edge('Pathos', "Appeal to fear/prejudice")
G.add_edge('Pathos', "Flag-waving")

In [None]:
# functions to calculate score

def get_all_classes_from_graph(graph):
    return [
        node
        for node in graph.nodes
        if node != ROOT
        ]

def _h_fbeta_score(y_true, y_pred, class_hierarchy, beta=1., root=ROOT):
    hP = _h_precision_score(y_true, y_pred, class_hierarchy, root=root)
    hR = _h_recall_score(y_true, y_pred, class_hierarchy, root=root)
    return (1. + beta ** 2.) * hP * hR / (beta ** 2. * hP + hR)
    
def _fill_ancestors(y, graph, root, copy=True):
    y_ = y.copy() if copy else y
    paths = all_pairs_shortest_path_length(graph.reverse(copy=False))
    for target, distances in paths:
        if target == root:
            continue
        ix_rows = np.where(y[:, target] > 0)[0]
        ancestors = list(filter(lambda x: x != ROOT,distances.keys()))
        y_[tuple(np.meshgrid(ix_rows, ancestors))] = 1
    graph.reverse(copy=False)
    return y_
    
def _h_recall_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_positives = np.count_nonzero(y_true_)

    return true_positives / all_positives

def _h_precision_score(y_true, y_pred, class_hierarchy, root=ROOT):
    y_true_ = _fill_ancestors(y_true, graph=class_hierarchy, root=root)
    y_pred_ = _fill_ancestors(y_pred, graph=class_hierarchy, root=root)

    ix = np.where((y_true_ != 0) & (y_pred_ != 0))

    true_positives = len(ix[0])
    all_results = np.count_nonzero(y_pred_)

    return true_positives / all_results

In [58]:
# evaluation function

def evaluate_h_test(gold_labels, pred_labels):
    """ Modified from the scoring baselines, labels should be a list of labels per samples e.g.:
    gold = [['Smears'], ['Smears, 'Flag-waving]] """
    
    with multi_labeled(gold, pred, G) as (gold_, pred_, graph_):
        return  _h_precision_score(gold_, pred_,graph_), _h_recall_score(gold_, pred_,graph_), _h_fbeta_score(gold_, pred_,graph_)

In [59]:
# example:
# precision, recall, f1
gold = [['Loaded Language', 'Smears'], ['Whataboutism']]
pred = [['Loaded Language', 'Smears'], ['Flag-waving']]
evaluate_h_test(gold, pred)

(0.6666666666666666, 0.5454545454545454, 0.6)

In [60]:
# example with half reward:
# precision, recall, f1
gold = [['Loaded Language', 'Smears'], ['Pathos']]
pred = [['Loaded Language', 'Smears'], ['Flag-waving']]
evaluate_h_test(gold, pred)

(0.6666666666666666, 1.0, 0.8)

In [63]:
# example of partial reward, predicting label of the same ancestor but not moving up to the top label:
# precision, recall, f1

gold = [['Loaded Language', 'Smears'], ['Whataboutism']]
pred = [['Loaded Language', 'Smears'], ['Smears']]
evaluate_h_test(gold, pred)

(0.875, 0.6363636363636364, 0.7368421052631579)