In [2]:
import json
import os
import re
import bisect
from pathlib import Path
import torch
import numpy as np
import pandas as pd
from datasets import Dataset
from spacy.lang.en import English
from transformers.models.deberta_v2 import DebertaV2ForTokenClassification, DebertaV2TokenizerFast
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from transformers.trainer import Trainer
from transformers.training_args import TrainingArguments
from transformers.data.data_collator import DataCollatorForTokenClassification

2025-04-17 19:01:42.251774: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1744916502.450313      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1744916502.506118      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# Config & Parameters

In [3]:
INFERENCE_MAX_LENGTH = 1024
CONF_THRESH = 0.6  # threshold for "O" class
# URL_THRESH = 0.1  # threshold for URL
AMP = True
MODEL_PATH = '/kaggle/input/model-weights-ner/output/fold_1/checkpoint-1212'
DATA_DIR = '/kaggle/input/pii-detection-removal-from-educational-data/'

# Utils

In [4]:
nlp = English()

def find_span(target: list[str], document: list[str]) -> list[list[int]]:
    idx = 0
    spans = []
    span = []

    for i, token in enumerate(document):
        if token != target[idx]:
            idx = 0
            span = []
            continue
        span.append(i)
        idx += 1
        if idx == len(target):
            spans.append(span)
            span = []
            idx = 0
            continue
    
    return spans

# Tokenizer

In [5]:
class CustomTokenizer:
    def __init__(self, tokenizer: PreTrainedTokenizerBase, max_length: int) -> None:
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __call__(self, example: dict) -> dict:
        text = []
        token_map = []

        for idx, t in enumerate(example["tokens"]):
            text.append(t)
            token_map.extend([idx]*len(t))

        text = "".join(text)

        tokenized = self.tokenizer(
            "".join(text),
            return_offsets_mapping=True,
            truncation=True,
            max_length=self.max_length,
        )

        return {**tokenized,"token_map": token_map,}

In [6]:
with open("/kaggle/input/workshop-task-acl/SOMD2025-PhaseII/test_texts.txt", "r", encoding="utf-8") as file:
    text = file.read()

text_list = text.split('\n')

In [7]:
df = pd.DataFrame()
df['train_text'] = text_list
df['train_text_list'] = df['train_text'].str.split(' ')
df['document'] = np.arange(len(df))

In [8]:
all_labels = [
    'B-Extension','I-Extension','B-Application','I-Application','B-Abbreviation','B-Citation','I-Citation',
    'B-SoftwareCoreference','I-SoftwareCoreference','B-URL','I-URL','B-AlternativeName', 'I-AlternativeName',
    'B-OperatingSystem','I-OperatingSystem','B-Developer','I-Developer','O','B-License','I-License','B-PlugIn','I-PlugIn',
    'B-Release','I-Release','B-ProgrammingEnvironment','I-ProgrammingEnvironment','B-Version','I-Version']

id2label = {i: l for i, l in enumerate(all_labels)}
label2id = {v: k for k, v in id2label.items()}
target = [l for l in all_labels if l != "O"]

In [9]:
ds = Dataset.from_dict({
        "full_text": df['train_text'].values.tolist(),
        "tokens": df['train_text_list'].values.tolist(),
        "document": [str(x) for x in df['document'].values.tolist()],
})

tokenizer = DebertaV2TokenizerFast.from_pretrained(MODEL_PATH)
ds = ds.map(CustomTokenizer(tokenizer=tokenizer, max_length=INFERENCE_MAX_LENGTH), num_proc=os.cpu_count())

Map (num_proc=4):   0%|          | 0/220 [00:00<?, ? examples/s]

# Instantiate the Trainer

In [10]:
model = DebertaV2ForTokenClassification.from_pretrained(MODEL_PATH)
collator = DataCollatorForTokenClassification(tokenizer)
args = TrainingArguments(".", per_device_eval_batch_size=1, report_to="none")
trainer = Trainer(
    model=model, args=args, data_collator=collator, tokenizer=tokenizer,
)

  trainer = Trainer(


# Prediction

In [11]:
predictions = trainer.predict(ds).predictions  # (n_sample, len, n_labels)

# Post-processing

In [12]:
pred_softmax = torch.softmax(torch.from_numpy(predictions), dim=2).numpy()
id2label = model.config.id2label
o_index = model.config.label2id["O"]
preds = predictions.argmax(-1)
preds_without_o = pred_softmax.copy()
preds_without_o[:,:,o_index] = 0
preds_without_o = preds_without_o.argmax(-1)
o_preds = pred_softmax[:,:,o_index]
preds_final = np.where(o_preds < CONF_THRESH, preds_without_o , preds)

In [13]:
processed =[]
pairs = set()

# Iterate over document
for p, token_map, offsets, tokens, doc in zip(
    preds_final, ds["token_map"], ds["offset_mapping"], ds["tokens"], ds["document"]
):
    # Iterate over sequence
    for token_pred, (start_idx, end_idx) in zip(p, offsets):
        label_pred = id2label[token_pred]

        if start_idx + end_idx == 0:
            continue

        if token_map[start_idx] == -1:
            start_idx += 1

        # ignore "\n\n"
        while start_idx < len(token_map) and tokens[token_map[start_idx]].isspace():
            start_idx += 1

        if start_idx >= len(token_map): 
            break

        token_id = token_map[start_idx]
        pair = (doc, token_id)

        # # ignore certain labels and whitespace
        # if label_pred in ("O", "B-EMAIL", "B-URL_PERSONAL", "B-PHONE_NUM", "I-PHONE_NUM") or token_id == -1:
        #     continue        

        if pair in pairs:
            continue
            
        processed.append(
            {"document": doc, "token": token_id, "label": label_pred, "token_str": tokens[token_id]}
        )
        pairs.add(pair)

## 🤝 Submission hand-in

In [14]:
df_pred = pd.DataFrame(processed)
df_pred["row_id"] = list(range(len(df_pred)))

l = []
token_map = []
for x in df_pred['document'].unique():
    l.append(' '.join(df_pred[df_pred['document']==x]['label'].values))
    token_map.append(' '.join(df_pred[df_pred['document']==x]['token'].values.astype(str)))

In [15]:
def correct_bio_tagging(tags):
    corrected_tags = tags[:]
    
    for i in range(1, len(tags)):
        prev_tag = tags[i-1].split('-')[-1] if '-' in tags[i-1] else None
        curr_prefix, curr_tag = (tags[i].split('-') + [None])[:2]

        if curr_prefix == 'B' and prev_tag == curr_tag:
            corrected_tags[i] = f'I-{curr_tag}'

    return ' '.join(corrected_tags)

In [23]:
def align_labels_to_tokens(tokens, labels):
    """
    Ensure the labels list matches the length of the tokens list.
    Adds 'O' if labels are short, truncates if too long.
    """
    token_len = len(tokens)
    label_len = len(labels)

    if label_len < token_len:
        labels += ['O'] * (token_len - label_len)
    elif label_len > token_len:
        labels = labels[:token_len]
    
    return labels

In [46]:
corrected_bio_tagging = []
for i in list(range(len(l))):
    w = l[i].split(' ')
    labels_text = ' '.join(align_labels_to_tokens(df['train_text_list'].values[i], correct_bio_tagging(w).split(' ')))
    assert len_list[i] == len(labels_text.split(' '))
    corrected_bio_tagging.append(labels_text)

In [48]:
with open("predictions.entities.txt", "w") as f:
    for text in corrected_bio_tagging:
        f.write(text + "\n")