# Saved NER tag loader

In [45]:
from pathlib import Path
import json
from functools import partial

In [161]:
def load_ner_data_pytorch(file: Path):
    try:
        from torch.utils.data.dataset import Dataset
        from torch.utils.data.dataloader import DataLoader
    except ModuleNotFoundError:
        raise ModuleNotFoundError(
            "Please install pytorch first https://pytorch.org/")
    with open(file, "r") as f:
        json_data = json.loads(f.read())

    class NERDataset(Dataset):
        """
        NER Dataset
        return text, a list of tags
        """

        def __init__(self, data):
            self.data = data
            self.labels = []
            for index, labels in data['labels'].items():
                self.labels += list(filter(lambda x: "skipped" not in x, labels))
            self.texts = data['texts']
            self.options = data['options']
            self.i2c = dict(enumerate(["O",]+self.options))
            self.c2i = dict((v,k) for k,v in enumerate(["O",]+self.options))

        def __repr__(self):
            options = self.data["options"]
            return f"NERDataset, {options}\n{len(self)}rows"

        def __len__(self):
            return len(self.labels)

        def __getitem__(self, idx):
            label = self.labels[idx]
            text = self.texts[str(label["index"])]
            return text, label['tags']

        def split_train_valid(self, valid_ratio: float = .2):
            """
            split dataset to train and valid
            """
            all_train_index = []
            train_texts, val_texts = dict(), dict()
            train_labels, val_labels = dict(), dict()

            for index, text in self.texts.items():
                if random.random() > valid_ratio:
                    train_texts.update({index: text})
                    all_train_index.append(str(index))
                else:
                    val_texts.update({index: text})

            for index, label_list in self.data["labels"].items():
                if index in all_train_index:
                    train_labels.update({index: label_list})
                else:
                    val_labels.update({index: label_list})

            train_data, val_data = dict(), dict()

            for k, v in self.data.items():
                if k == "texts":
                    train_data.update({"texts": train_texts})
                    val_data.update({"texts": val_texts})
                elif k == "labels":
                    train_data.update({"labels": train_labels})
                    val_data.update({"labels": val_labels})
                else:
                    train_data.update({k: v})
                    val_data.update({k: v})
            return self.__class__(train_data), self.__class__(val_data)

    return NERDataset(json_data)


def load_ner_data_pytorch_huggingface(file, tokenizer, valid_ratio=None):
    ner_ds = load_ner_data_pytorch(file)

    class NERDatasetHF(Dataset):
        def __init__(self, ner_ds, tokenizer):
            self.ner_ds = ner_ds
            self.options = ner_ds.data["options"]
            
            self.tokenizer = tokenizer
            self.tokenizing = partial(
                self.tokenizer,
                return_offsets_map=True,
                return_tensors="pt")

        def __len__(self): return len(self.ner_ds)

        def collate(self, batch):
            Xs, Ys = zip(*batch)
            Xs = list(Xs)
            Ys = list(Ys)
            
            tked = self.tokenizing(Xs)
            input_ids = tked['input_ids']
            offset_mapping = tked['offset_mapping']

        def __getitem__(self, idx):
            text, label = self.ner_ds[idx]
            return text, label

In [162]:
ner_ds = load_ner_data_pytorch("ner_result_sample.json")
train_ds, val_ds = ner_ds.split_train_valid()

In [163]:
train_ds

NERDataset, ['school', 'company']
2rows

In [164]:
val_ds

NERDataset, ['school', 'company']
1rows

In [165]:
from transformers import AutoTokenizer

In [166]:
tk = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True)

In [184]:
text, label = train_ds[0]



In [185]:
def tag_to_offset(tag):
    offset = tag['offset']
    return [offset, len(tag['text'])+offset],tag['label']

In [186]:
tags_pos = list(map(tag_to_offset, label))
tags_pos

[([122, 144], 'school'), ([346, 354], 'company')]

In [187]:
import torch

In [191]:
y = torch.zeros_like(x)

tked = tk([text,], return_offsets_mapping = True, return_tensors="pt")
om = tked['offset_mapping']
x = tked['input_ids']

for tag_pos, tag_label in tags_pos:
    tag_pos = torch.LongTensor(tag_pos)
    tag_mask=(tag_pos[0]<=om[:,0])*(tag_pos[1]>=om[:,1])*train_ds.c2i[tag_label]
    y+=tag_mask

In [192]:
y

tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0]])