# Variable Naming Convention
unique labels: `LABELS`

one-hot encoded labels for samples: `labels_tokens`

dictionaries that gives ranges of labels either word-level or char-level: `*_spans`



In [1]:
# !pip install -q jupyter-black
%load_ext jupyter_black
# !python -m spacy download xx_ent_wiki_sm

In [2]:
import torch
from statistics import mean
import spacy

from spacy.lang.punctuation import TOKENIZER_INFIXES
from spacy.util import compile_infix_regex

from tqdm import tqdm
from pathlib import Path
import jsonlines
from transformers import BertTokenizer, BertModel

In [3]:
ROOT = Path("../data/")

LABELS = [
    "Appeal_to_Values",
    "Loaded_Language",
    "Consequential_Oversimplification",
    "Causal_Oversimplification",
    "Questioning_the_Reputation",
    "Straw_Man",
    "Repetition",
    "Guilt_by_Association",
    "Appeal_to_Hypocrisy",
    "Conversation_Killer",
    "False_Dilemma-No_Choice",
    "Whataboutism",
    "Slogans",
    "Obfuscation-Vagueness-Confusion",
    "Name_Calling-Labeling",
    "Flag_Waving",
    "Doubt",
    "Appeal_to_Fear-Prejudice",
    "Exaggeration-Minimisation",
    "Red_Herring",
    "Appeal_to_Popularity",
    "Appeal_to_Authority",
    "Appeal_to_Time",
]

WHITESPACE_PLACEHOLDER = "▁"

# Load Tokenizer and Spacy

In [4]:
# hf tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased")
tokenizer.add_tokens(WHITESPACE_PLACEHOLDER, special_tokens=True)

# spacy
nlp = spacy.load("xx_ent_wiki_sm")
infixes = TOKENIZER_INFIXES + ([r"[▁:,]"])
infix_re = compile_infix_regex(infixes)
nlp.tokenizer.infix_finditer = infix_re.finditer

# Utils

## Text Parser

In [5]:
def parse_text(text, span_objs, tokenizer, labels, whitespace_placeholder, nlp):
    missed = 0
    text = text.lower().replace(" ", whitespace_placeholder)
    doc = nlp(text)[:]  # slicing to convert into an Span object
    # creates a dict of idx to labels. idx is the index of the word in the sentence.
    labels_per_word = {i: list() for i in range(len(doc))}

    spans = [(span["start"], span["end"], span["technique"]) for span in span_objs]

    for start, end, label in spans:
        span = doc.char_span(start, end)
        # sometimes span will be None when start and end does not give corresponds to valid span
        # mostly because of unsanitized texts
        if span:
            # span.start, span.end are word level spans
            for i in range(span.start, span.end):
                labels_per_word[i].append(label)
        else:
            missed += 1

    tokens = []
    labels_per_token = [list() for _ in labels]

    for i, word in enumerate(doc):
        tokens_per_word = tokenizer.tokenize(word.text)
        tokens.extend(tokens_per_word)

        for j, label in enumerate(labels):
            if label in labels_per_word[i]:
                # if current label is assigned to this word, duplicate this label required amount of times
                labels_per_token[j].extend(
                    [
                        1,
                    ]
                    * len(tokens_per_word)
                )
            else:
                labels_per_token[j].extend(
                    [
                        0,
                    ]
                    * len(tokens_per_word)
                )

    return tokens, labels_per_token, labels_per_word, missed

## File Parser

In [6]:
def parse_jsonl(path, labels, tokenizer):
    total = 0
    missed = 0

    with jsonlines.open(path) as f:
        batch_tokens = []  # (N, L)
        batch_labels = []  # (N, C, L)

        for obj in tqdm(f):
            doc_id = str(obj["id"])
            text = obj["text"]
            total += len(obj["labels"])
            try:
                tokens, label, _, _missed = parse_text(
                    text, obj["labels"], tokenizer, labels, WHITESPACE_PLACEHOLDER, nlp
                )

                batch_tokens.append(tokens)
                batch_labels.append(label)

                missed += _missed
            except Exception as e:
                # print(doc_id, text, label, sep="\n")
                raise e

    return batch_tokens, batch_labels, missed, total

In [7]:
def agg_sum(array):
    result = []
    running_sum = 0
    for data in array:
        running_sum += data
        result.append(running_sum)
    return result


agg_sum([1, 2, 3, 2])

[1, 3, 6, 8]

In [8]:
def find_consecutive_trues(flags):
    """
    This function takes an array of boolean flags and returns a list of ranges
    of all consecutive true values.

    Args:
        flags: A list of boolean flags.

    Returns:
        A list of tuples, where each tuple represents a range of consecutive
        true values. The tuple contains the starting and ending indices (inclusive)
        of the range.
    """
    ranges = []
    start_idx = None
    for i, flag in enumerate(flags):
        if flag and start_idx is None:
            start_idx = i
        elif not flag and start_idx is not None:
            ranges.append((start_idx, i - 1))
            start_idx = None
    if start_idx is not None:
        ranges.append((start_idx, len(flags) - 1))
    return ranges


# Example usage
flags = [1, 1, 0, 0, 1, 1, 1]
ranges = find_consecutive_trues(flags)
print(ranges)  # Output: [(0, 1), (3, 5)]

[(0, 1), (4, 6)]


In [9]:
def token_span_to_char_span(tokens, labels_tokens, labels, whitespace_placeholder):
    """
    transform word level tags spans to char spans

    Args:
        text: list of token, (N, L)
        spans: list of list of labels (N, C, L)
        labels: list of available tags (C,)
    """
    # [{"technique": "Exaggeration-Minimisation", "start": 13, "end": 32, "text": "ن السعوديه تاوي اره"}
    span_objs = []

    decoded_text = (
        "".join(tokens).replace("##", "").replace(whitespace_placeholder, " ")
    )

    lengths = [len(token.replace("##", "")) for token in tokens]
    char_boundaries = agg_sum(lengths)

    for i, label in enumerate(labels):
        span_ranges = find_consecutive_trues(labels_tokens[i])
        for start_idx, stop_idx in span_ranges:
            start_idx -= 1
            if start_idx < 0:
                char_start_idx = 0
            else:
                char_start_idx = char_boundaries[start_idx]

            char_stop_idx = char_boundaries[stop_idx]

            span_text = decoded_text[char_start_idx:char_stop_idx]

            obj = {
                "technique": label,
                "start": char_start_idx,
                "end": char_stop_idx,
                "text": span_text,
            }

            span_objs.append(obj)

    return decoded_text, span_objsdef token_span_to_char_span(tokens, labels_tokens, labels, whitespace_placeholder):
    """
    transform word level tags spans to char spans

    Args:
        text: list of token, (N, L)
        spans: list of list of labels (N, C, L)
        labels: list of available tags (C,)
    """
    # [{"technique": "Exaggeration-Minimisation", "start": 13, "end": 32, "text": "ن السعوديه تاوي اره"}
    span_objs = []

    decoded_text = (
        "".join(tokens).replace("##", "").replace(whitespace_placeholder, " ")
    )

    lengths = [len(token.replace("##", "")) for token in tokens]
    char_boundaries = agg_sum(lengths)

    for i, label in enumerate(labels):
        span_ranges = find_consecutive_trues(labels_tokens[i])
        for start_idx, stop_idx in span_ranges:
            start_idx -= 1
            if start_idx < 0:
                char_start_idx = 0
            else:
                char_start_idx = char_boundaries[start_idx]

            char_stop_idx = char_boundaries[stop_idx]

            span_text = decoded_text[char_start_idx:char_stop_idx]

            obj = {
                "technique": label,
                "start": char_start_idx,
                "end": char_stop_idx,
                "text": span_text,
            }

            span_objs.append(obj)

    return decoded_text, span_objs

## Test The Utils

In [10]:
obj = {
    "id": "AFP_458-eurl_02_004",
    "text": "كان بطل فقرة بروباغندا في الحلقة الأولى هو صلاح قوش ، المعادل الموضوعي لعُمر سليمان الرئيس الأسبق للمخابرات المصرية. وكما عمر سليمان، نُسجت حول قوش وأنشطته العديد من الأساطير الغامضة.",
    "labels": [
        {"technique": "Name_Calling-Labeling", "text": "بطل", "start": 4, "end": 7},
        {
            "technique": "Obfuscation-Vagueness-Confusion",
            "text": "نُسجت حول قوش وأنشطته العديد من الأساطير الغامضة",
            "start": 134,
            "end": 182,
        },
        {
            "technique": "Loaded_Language",
            "text": "الأساطير الغامضة",
            "start": 166,
            "end": 182,
        },
    ],
    "type": "paragraph",
}

In [11]:
# obj = {
#     "text": "Newt Gingrich: The truth about Trump, Putin, and Obama\n\nPresident Trump.",
#     "labels": [
#         {
#             "technique": "Name_Calling-Labeling",
#             "text": "Gingrich:",
#             "start": 5,
#             "end": 14,
#         },
#         {
#             "technique": "Obfuscation-Vagueness-Confusion",
#             "text": "The truth about Trump",
#             "start": 15,
#             "end": 36,
#         },
#         {
#             "technique": "Loaded_Language",
#             "text": "الأساطير الغامضة",
#             "start": 166,
#             "end": 182,
#         },
#     ],
# }

In [12]:
tokens, labels_tokens, labels_per_token, _ = parse_text(
    obj["text"],
    obj["labels"],
    tokenizer,
    LABELS,
    WHITESPACE_PLACEHOLDER,
    nlp,
)

In [13]:
print("Tokens/Word Pieces")
print(tokens)

print(len(labels_tokens), len(labels_tokens[0]))
print()

print("Labels assigned to each tokens")
print(labels_per_token)

print("\nOriginal Char-level Spans")
print(obj["labels"])

Tokens/Word Pieces
['كان', '▁', 'ب', '##طل', '▁', 'ف', '##قر', '##ة', '▁', 'بر', '##وب', '##اغ', '##ندا', '▁', 'في', '▁', 'ال', '##حل', '##قة', '▁', 'الأولى', '▁', 'هو', '▁', 'صلاح', '▁', 'ق', '##وش', '▁', '،', '▁', 'ال', '##مع', '##ادل', '▁', 'ال', '##مو', '##ض', '##وعي', '▁', 'ل', '##ع', '##ُ', '##مر', '▁', 'سليمان', '▁', 'الرئيس', '▁', 'ال', '##أس', '##بق', '▁', 'ل', '##لم', '##خ', '##اب', '##رات', '▁', 'المصرية', '.', '▁', 'و', '##كم', '##ا', '▁', 'عمر', '▁', 'سليمان', '،', '▁', 'ن', '##ُ', '##س', '##ج', '##ت', '▁', 'حول', '▁', 'ق', '##وش', '▁', 'وأن', '##ش', '##ط', '##ته', '▁', 'العديد', '▁', 'من', '▁', 'ال', '##أس', '##اطي', '##ر', '▁', 'ال', '##غا', '##م', '##ضة', '.']
23 101

Labels assigned to each tokens
{0: [], 1: [], 2: ['Name_Calling-Labeling'], 3: [], 4: [], 5: [], 6: [], 7: [], 8: [], 9: [], 10: [], 11: [], 12: [], 13: [], 14: [], 15: [], 16: [], 17: [], 18: [], 19: [], 20: [], 21: [], 22: [], 23: [], 24: [], 25: [], 26: [], 27: [], 28: [], 29: [], 30: [], 31: [], 32: []

In [14]:
decoded_text, span_objs = token_span_to_char_span(
    tokens, labels_tokens, LABELS, WHITESPACE_PLACEHOLDER
)

In [15]:
print(decoded_text, obj["text"], sep="\n")

كان بطل فقرة بروباغندا في الحلقة الأولى هو صلاح قوش ، المعادل الموضوعي لعُمر سليمان الرئيس الأسبق للمخابرات المصرية. وكما عمر سليمان، نُسجت حول قوش وأنشطته العديد من الأساطير الغامضة.
كان بطل فقرة بروباغندا في الحلقة الأولى هو صلاح قوش ، المعادل الموضوعي لعُمر سليمان الرئيس الأسبق للمخابرات المصرية. وكما عمر سليمان، نُسجت حول قوش وأنشطته العديد من الأساطير الغامضة.


In [16]:
span_objs

[{'technique': 'Loaded_Language',
  'start': 166,
  'end': 182,
  'text': 'الأساطير الغامضة'},
 {'technique': 'Obfuscation-Vagueness-Confusion',
  'start': 134,
  'end': 182,
  'text': 'نُسجت حول قوش وأنشطته العديد من الأساطير الغامضة'},
 {'technique': 'Name_Calling-Labeling', 'start': 4, 'end': 7, 'text': 'بطل'}]

# Load The Training Files

In [18]:
import os

os.listdir(ROOT)

['license_cc_by-nc-sa_4.0.txt',
 'araieval24_task1_dev.jsonl',
 'araieval24_task1_train.jsonl']

In [19]:
train_tokens, train_labels, missed, total = parse_jsonl(
    ROOT / "araieval24_task1_train.jsonl", LABELS, tokenizer
)

6997it [00:56, 124.16it/s]


In [20]:
print(len(train_tokens), len(train_labels), missed, total)

6997 6997 3914 15765


In [21]:
dev_tokens, dev_labels, dev_missed, dev_total = parse_jsonl(
    ROOT / "araieval24_task1_dev.jsonl", LABELS, tokenizer
)

print(len(dev_tokens), len(dev_labels), dev_missed, dev_total)

921it [00:08, 113.22it/s]

921 921 497 2064





# BertTokenizer w/ Offset Mapping

In [23]:
from transformers import BertTokenizerFast

fast_tokenizer = BertTokenizerFast.from_pretrained("bert-base-multilingual-cased")

In [30]:
encoding = fast_tokenizer("i eat. risce!", return_offsets_mapping=True)
encoding

{'input_ids': [101, 177, 69110, 119, 29956, 24176, 106, 102], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1], 'offset_mapping': [(0, 0), (0, 1), (2, 5), (5, 6), (7, 9), (9, 12), (12, 13), (0, 0)]}

In [31]:
encoding.tokens()

['[CLS]', 'i', 'eat', '.', 'ri', '##sce', '!', '[SEP]']

In [34]:
encoding.word_ids()

[None, 0, 1, 2, 3, 3, 4, None]

In [38]:
encoding.offset_mapping

[(0, 0), (0, 1), (2, 5), (5, 6), (7, 9), (9, 12), (12, 13), (0, 0)]

In [None]:
encoding.