In [93]:
import pandas as pd
import spacy
import random
import matplotlib.pyplot as plt
import re
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from spacy.util import minibatch
from spacy.training import Example

random.seed = 42

In [None]:
# config
experiment_configs = [{
    "name": "spacy_xml_tags",
    "epochs": 7,
    "batch_size": 16
}]

# load data
df_train = pd.read_csv("output_train.csv").drop(columns=['A_raw_entities']).copy() # spacy tagged only
df_test  = pd.read_csv("output_test.csv").drop(columns=['A_raw_entities']).copy()

In [95]:
def add_offsets(text, entities): # adds start and end pos.
    if not isinstance(entities, list):
        return []
    # ------------------------------------------------
    used = [False] * len(text) # tracks characters already tagged
    results = []

    for ent in entities: 
        word  = ent.get("word")
        label = ent.get("entity", "").lower()
        if not word or not label: # skip empty
            continue
        pattern = re.escape(word)
        match   = re.search(pattern, text, re.IGNORECASE)
        if match:
            s, e = match.start(), match.end()
            if not any(used[s:e]):
                results.append({"start": s, "end": e, "entity": label})
                for i in range(s, e):
                    used[i] = True 
    return results

def merge_adjacent_entities(entities): # merge if same label
    if not entities: return []
    # ------------------------------------------------
    entities = sorted(entities, key=lambda x: x['start']) # sort so left to right
    merged = [entities[0]] # leftmost entity
    for curr in entities[1:]: # the rest
        last = merged[-1]
        if curr['entity'] == last['entity'] and curr['start'] <= last['end'] + 1:
                # same label & touching
            last['end'] = curr['end']
        else:
            merged.append(curr)
    return merged

def insert_xml_tags(text, entities): # put entity tags into text
    if not entities:
        return text
    # ------------------------------------------------
    # helpers
    ents = add_offsets(text, entities) # add start & end
    merged = merge_adjacent_entities(ents) # merge adjacent

    offset = 0
    # insert <entity tags>
    for ent in sorted(merged, key=lambda x: x['start']):
        tag_open  = f"<{ent['entity']}>"
        tag_close = f"</{ent['entity']}>"
        s = ent['start'] + offset
        e = ent['end']   + offset
        text = text[:s] + tag_open + text[s:e] + tag_close + text[e:]
        offset += len(tag_open) + len(tag_close)
    return text

# apply XML tagging
def tag_statement(row):
    return insert_xml_tags(row['statement'], row['B_raw_entities'])

tqdm.pandas() # progress bar
for df in (df_train, df_test):
    df['B_XML_statement'] = df.progress_apply(tag_statement, axis=1)

100%|██████████| 18369/18369 [00:00<00:00, 218654.70it/s]
100%|██████████| 2296/2296 [00:00<00:00, 208674.55it/s]


In [96]:
# prepare data for textcat
def prepare_spacy_data(df):
    return [ # prepares data for SpaCy training
        (text, {"cats": {"TRUE": label == 1, "FALSE": label == 0}})
        for text, label in zip(df['B_XML_statement'], df['label_binary'])
    ]
train_data = prepare_spacy_data(df_train)
test_data  = prepare_spacy_data(df_test)

# run experiments
all_results = []

for cfg in experiment_configs:
    nlp = spacy.blank("en") # empty Eng pipeline for SpaCy
    textcat = nlp.add_pipe("textcat", last=True) # text clf
    # register classes
    textcat.add_label("TRUE")
    textcat.add_label("FALSE")

    # initialize optimizer
    optimizer = nlp.initialize()

    losses_list, f1s = [], [] # losses and f1 scores

    for epoch in range(cfg['epochs']):
        random.shuffle(train_data) # shuffle training data
        batches = minibatch(train_data, size=cfg['batch_size']) # create mini batches
        epoch_loss = 0.0

        for batch in batches:
            texts, annotations = zip(*batch) # unzip into texts and annotations
            examples = [ # for spacy
                Example.from_dict(nlp.make_doc(txt), ann)
                for txt, ann in zip(texts, annotations)]
            losses = {} # feed into the model, train
            nlp.update(examples, sgd=optimizer, losses=losses)
            epoch_loss += losses.get("textcat", 0.0)

        # evaluate on test set
        preds, true_labels = [], [] # predictions and true labels
        for txt, ann in test_data:
            doc = nlp(txt)
            pred = doc.cats["TRUE"] > 0.5
            preds.append(int(pred))
            true_labels.append(int(ann["cats"]["TRUE"]))

        f1 = f1_score(true_labels, preds)
        losses_list.append(epoch_loss)
        f1s.append(f1)
        print(f"Epoch {epoch+1}/{cfg['epochs']} — Loss: {epoch_loss:.4f} — F1: {f1:.4f}")

    # plot metrics
    plt.figure()
    plt.plot(range(1, cfg['epochs']+1), losses_list, marker='o', label='Loss')
    plt.plot(range(1, cfg['epochs']+1), [f * 100 for f in f1s], marker='s', linestyle='--', label='F1')
    plt.xlabel("Epoch")
    plt.title("SpaCy + XML Training Curve")
    plt.legend()
    plt.savefig(f"{cfg['name']}_curve.png")
    plt.close()

    # final metrics
    acc  = accuracy_score(true_labels, preds)
    prec = precision_score(true_labels, preds)
    rec  = recall_score(true_labels, preds)

    all_results.append({
        "name": cfg["name"],
        "epochs": cfg["epochs"],
        "batch_size": cfg["batch_size"],
        "accuracy": round(acc, 2),
        "f1": round(f1, 2),
        "precision": round(prec, 2),
        "recall": round(rec, 2)
    })

# save results
pd.DataFrame(all_results).to_csv("spacy_with_xml_results.csv", index=False)


Epoch 1/7 — Loss: 244.4166 — F1: 0.5307
Epoch 2/7 — Loss: 205.6575 — F1: 0.6710
Epoch 3/7 — Loss: 164.7764 — F1: 0.5872
Epoch 4/7 — Loss: 123.7194 — F1: 0.6194
Epoch 5/7 — Loss: 89.8225 — F1: 0.6075
Epoch 6/7 — Loss: 70.6893 — F1: 0.6003
Epoch 7/7 — Loss: 57.8541 — F1: 0.5321
