In [1]:
import json
from itertools import chain

from transformers import AutoTokenizer
import numpy as np
from pathlib import Path
from torch.utils.data import Dataset

TRAINING_MODEL_PATH = "microsoft/deberta-v3-base"
TRAINING_MAX_LENGTH = 1024
STRIDE=384


data = json.load(Path("/kaggle/input/pii-detection-removal-from-educational-data/train.json").open("r"))

all_labels = sorted(list(set(chain(*[x["labels"] for x in data]))))
label2id = {l: i for i,l in enumerate(all_labels)}
id2label = {v:k for k,v in label2id.items()}

del data

## Tokenization

In [2]:



def tokenize(example, tokenizer, label2id, max_length):

    # rebuild text from tokens
    text = []
    labels = []

    for t, l, ws in zip(
        example["tokens"], example["provided_labels"], example["trailing_whitespace"]
    ):
        text.append(t)
        labels.extend([l] * len(t))

        if ws:
            text.append(" ")
            labels.append("O")

    # actual tokenization
    tokenized = tokenizer("".join(text), return_offsets_mapping=True, max_length=max_length, stride=STRIDE, truncation=True)

    labels = np.array(labels)

    text = "".join(text)
    token_labels = []

    for start_idx, end_idx in tokenized.offset_mapping:
        # CLS token
        if start_idx == 0 and end_idx == 0:
            token_labels.append(label2id["O"])
            continue

        # case when token starts with whitespace
        if text[start_idx].isspace():
            start_idx += 1

        token_labels.append(label2id[labels[start_idx]])

   
    length = len(tokenized.input_ids)
    return {**tokenized, "labels": token_labels, "length": length}

## Paragraph Augmentation

In [3]:
def convert_tokens_to_text(tokens, whitespaces):
    text = []
    for token, whitespace in zip(tokens, whitespaces):
        text.append(token)
        if whitespace:
            text.append(" ")
    return "".join(text)

def partial_essay(essay):
    if 'augmented' in essay:
        return essay
    paragraph_inices = [i+1 for i, pair in enumerate(list(zip(essay["tokens"][:-1], essay["tokens"][1:]))) if pair == (".", "\n\n")]
    paragraphs = [{"full_text": convert_tokens_to_text(essay["tokens"][i:j], essay["trailing_whitespace"][i:j]),
                   "document": essay["document"],
                   "tokens": essay["tokens"][i:j],
                   "trailing_whitespace": essay["trailing_whitespace"][i:j],
                   "labels": essay["labels"][i:j]} for i, j in zip([0]+paragraph_inices, paragraph_inices+[len(essay["tokens"])])]
    
    if len(paragraphs) < 3:
        return essay
    
    middle_paragraphs = [par for par in paragraphs[1:-1] if np.random.random() < 0.5]
    chosen_paragraphs = [paragraphs[0]] + middle_paragraphs + [paragraphs[-1]]
    chosen_essay = {"full_text": "".join([par["full_text"] for par in chosen_paragraphs]),
                    "document": essay["document"],
                    "tokens": list(chain(*[par["tokens"] for par in chosen_paragraphs])),
                    "trailing_whitespace": list(chain(*[par["trailing_whitespace"] for par in chosen_paragraphs])),
                    "labels": list(chain(*[par["labels"] for par in chosen_paragraphs]))}
    return chosen_essay

## Customized Dataset

In [4]:
class TokenizedDataset(Dataset):
    def __init__(self, data, fake_data, label2id, max_length, paragraph_augmentation, oversampling):
        self.data = data
        self.fake_data = fake_data
        self.tokenizer = AutoTokenizer.from_pretrained(TRAINING_MODEL_PATH)
        self.label2id = label2id
        self.max_length = max_length
        self.paragraph_augmentation = paragraph_augmentation
        self.oversampling = oversampling
        if oversampling:
            self._oversample_pii_essays()

    def __len__(self):
        if self.oversampling:
            return len(self.ds_repeat_indices) + len(self.fake_data)
        return len(self.data) + len(self.fake_data)

    def __getitem__(self, idx):
        essay = self.get_raw_essay(idx)
        if self.paragraph_augmentation:
            essay = partial_essay(essay)
        self._tokenize(essay)
        return essay
    
    def get_raw_essay(self, idx):
        if self.oversampling:
            if idx < len(self.ds_repeat_indices):
                idx = self.ds_repeat_indices[idx]
                essay = self.data[idx].copy()
            else:
                idx -= len(self.ds_repeat_indices)
                essay = self.fake_data[idx].copy()
        else:
            if idx < len(self.data):
                essay = self.data[idx].copy()
            else:
                idx -= len(self.data)
                essay = self.fake_data[idx].copy()
        return essay

    def _essay_weight(self, essay):
        labels_unique = set(essay["labels"])
        if not (labels_unique - {'O'}):
            return 1
        if not (labels_unique - {'I-NAME_STUDENT', 'B-NAME_STUDENT', 'O'}):
            return 4
        return 8

    def _oversample_pii_essays(self):
        print("Oversampling essays with PIIs")
        self.ds_weights = [self._essay_weight(essay) for essay in self.data]
        self.ds_repeat_indices = [i for i, w in enumerate(self.ds_weights) for _ in range(w)]
    
    def _tokenize(self, essay):
        essay["provided_labels"] = essay.pop("labels")
        tokenized = tokenize(essay, tokenizer=self.tokenizer, label2id=self.label2id, max_length=self.max_length)
        essay.update(tokenized)


# ds = TokenizedDataset(data, data, tokenizer, label2id, TRAINING_MAX_LENGTH, paragraph_augmentation=True, oversampling=True)