In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import arrow
import random
import pandas as pd
import numpy as np
from datetime import datetime
from datagen.csvgen.base import randgen, randrec
from datagen.csvgen.ner import converter
from tqdm import tqdm
from pandas.core.common import flatten
from collections import OrderedDict
from datagen.config import data_config

from datagen.imgen.content import ner_utils
from datagen.imgen.ops import boxes_ops


import json
import chardet

import torch.nn as nn

In [3]:
from transformers import BertTokenizer, AutoModel
tokenizer = BertTokenizer.from_pretrained("indobenchmark/indobert-base-p2")

In [4]:
#label config

def labels_map(label_name, label_type):
    labels=[]
    for kn,vn in label_name.items():
        for kt,vt in label_type.items():
            if kt!='delimiter':
                for bil in "BILU":
                    name = f'{bil}-{vt}_{vn}'
                    labels.append(name)
    labels.append("O")
    return labels


label_name = {
    'provinsi': 'PROV',
    'kabupaten': 'KAB',
    'nik': 'NIK',
    'nama': 'NAMA',
    'ttl': 'TTL',
    'gender': 'GDR',
    'goldar': 'GLD',
    'alamat': 'ADR',
    'rtrw': 'RTW',
    'kelurahan': 'KLH',
    'kecamatan': 'KCM',
    'agama': 'RLG',
    'perkawinan': 'KWN',
    'pekerjaan': 'KRJ',
    'kewarganegaraan': 'WRG',
    'berlaku': 'BLK',
    'sign_place': 'SGP',
    'sign_date': 'SGD'
}

label_type = {
    'field': "FLD",
    'value': "VAL",
    "delimiter": "O"
}

LABELS = labels_map(label_name, label_type)
LABEL2INDEX = dict((label,idx) for idx, label in enumerate(LABELS))
INDEX2LABEL = dict((idx, label) for idx, label in enumerate(LABELS))
NUM_LABELS = len(LABELS)

In [97]:
def get_formatted_annotation_objects(anno, tokenizer, max_seq_length=512):
    objects = anno['objects']
    objects = prepare_objects_data(objects, tokenizer)
    
    tokens, labels, label_ids, boxes = [],[],[],[]
    
    tokens.append(cls_token)
    boxes.append(cls_token_box)
    label_ids.append(pad_token_label_id)
    
    for obj in objects:
        tokens.append(obj['token'])
        labels.append(obj['label'])

        lab = LABEL2INDEX[obj['label']]
        label_ids.append(lab)

        pts = obj['points']
        pts = np.array(pts)
        pts = boxes_ops.order_points(np.array(pts))
        pts = list(boxes_ops.to_xyminmax(pts))
        boxes.append(pts)

    tokens.append(sep_token)
    boxes.append(sep_token_box)
    label_ids.append(pad_token_label_id)

    input_ids = tokenizer.encode(tokens, add_special_tokens=False)
    input_masks = [1] * len(input_ids)
    segment_ids = [0] * len(input_ids)
    
    padding_data_ouput = padding_data(
        input_ids, input_masks, 
        segment_ids, label_ids, boxes,
        max_seq_length=max_seq_length
    )
    input_ids, input_masks, segment_ids, label_ids, boxes = padding_data_ouput
    
    return input_ids, input_masks, segment_ids, label_ids, boxes


def padding_data(input_ids, input_masks, segment_ids,
                label_ids, boxes, max_seq_length=512, 
                pad_on_left=False):
    
    padding_length = max_seq_length - len(input_ids)

    if not pad_on_left:
        input_ids += [pad_token] * padding_length
        input_masks += [0] * padding_length
        segment_ids += [0] * padding_length
        label_ids += [pad_token_label_id] * padding_length
        boxes += [pad_token_box] * padding_length
    else:
        input_ids = [pad_token] * padding_length + input_ids
        input_masks = [0] * padding_length + input_masks
        segment_ids = [0] * padding_length + segment_ids
        label_ids = [pad_token_label_id] * padding_length + label_ids
        boxes = [pad_token_box] * padding_length + boxes
        
    return input_ids, input_masks, segment_ids, label_ids, boxes


def prepare_objects_data(objects, tokenizer):
    duplicated_objects = tokenize_duplicate_dict(objects, tokenizer)
    formatted_objects = reformat_label_oriented(duplicated_objects)
    bilou_objects = inject_bilou_to_objects(formatted_objects)
    objects = revert_to_list_format(bilou_objects)
    
    return objects

def reformat_label_oriented(objects):
    data = OrderedDict({k:{'field':[], 'delimiter':[], 'value':[]} for k,v in label_name.items()})

    for idx, obj in enumerate(objects):
        cname_curr = obj['classname']
        scname_curr = obj['subclass']
        data[cname_curr][scname_curr].append(obj)

    return data

# datas['objects']
def bilou_prefixer(text_list, label=None):
    out = []
    text_len = len(text_list)
    if text_len==1:
        bl = "U"
        if label!=None: bl =  bl + "-" + label
        out.append(bl)
    elif text_len>1:
        for idx, text in enumerate(text_list):
            if idx==0: 
                bl = "B"
                if label!=None: bl = bl + "-" + label
                out.append(bl)
            elif idx < text_len - 1: 
                bl = "I"
                if label!=None: bl = bl + "-" + label
                out.append(bl)
            else: 
                bl = "L"
                if label!=None: bl =  bl + "-" + label
                out.append(bl)
    return out


def tokenize_inside_dict(data_dict, tokenizer):
    for idx in range(len(data_dict)):
        text = data_dict[idx]['text']
        data_dict[idx]['text'] = tokenizer.tokenize(text)
    return data_dict

def tokenize_duplicate_dict(objects, tokenizer):
    new_objects = []
    for idx, obj in enumerate(objects):
        curr_text = objects[idx]['text']

        token = tokenizer.tokenize(curr_text)
        if len(token) > 1:
            for tok in token:
                new_obj = objects[idx].copy()
                new_obj['token'] = tok
                new_objects.append(new_obj)
        else:
            if len(token)==0:
                obj['token'] = ''
            else:
                obj['token'] = token[0]
            new_objects.append(obj)

    return new_objects


def inject_bilou_to_label(data_dict):
    # create bilou prefix to dictionary data
    texts = []
    for idx in range(len(data_dict)):
        texts.append(data_dict[idx]['token'])
    bil_prefix = bilou_prefixer(texts)

    #inject bilou prefix into label inside data_dict
    for idx, (bil, fld) in enumerate(zip(bil_prefix, data_dict)):
        if fld['label'] != "O":
            label = bil+'-'+fld['label']
            data_dict[idx]['label'] = label
    
    return data_dict


def inject_bilou_to_objects(objects):
    for idx, (key,val) in enumerate(objects.items()):
        field = val['field']
        delim = val['delimiter']
        value = val['value']

        if len(field)>0:
            objects[key]['field'] = inject_bilou_to_label(field)

        if len(delim)>0:
            objects[key]['delimiter'] = inject_bilou_to_label(delim)

        if len(value)>0:
            objects[key]['value'] = inject_bilou_to_label(value)
    
    return objects


def revert_to_list_format(dnew):
    data_list = []
    for k,v in dnew.items():
        field = dnew[k]['field']
        delim = dnew[k]['delimiter']
        value = dnew[k]['value']
        if len(delim)>0:
            line_list = field+delim+value
        else:
            line_list = field+value

        data_list += line_list
    return data_list

In [None]:
path = '../results/combined/1606064001/7917_json.json'

In [None]:
with open(path) as f:
    datas = json.load(f)

In [370]:
objects = prepdatas['objects']

In [386]:
cls_token_at_end=False
cls_token="[CLS]"
sep_token="[SEP]"
sep_token_extra=False

cls_token_segment_id=1
pad_token_segment_id=0
sequence_a_segment_id=0

pad_on_left=False
pad_token=0

cls_token_box=[0, 0, 0, 0]
sep_token_box=[1500, 1500, 1500, 1500]
pad_token_box=[0, 0, 0, 0]

pad_token_label_id = nn.CrossEntropyLoss().ignore_index

max_seq_length = 512

In [387]:
tokenizer.cls_token_id, tokenizer.sep_token_id, tokenizer.pad_token_id

(2, 3, 0)

In [422]:
input_ids, input_masks, segment_ids, label_ids, boxes = build_data(objects, tokenizer)

In [420]:
idx = 17
word = tokenizer.ids_to_tokens[input_ids[idx]]
label = INDEX2LABEL[label_ids[idx]]
word, label, boxes[idx]

(':', 'O', [346.0, 458.0, 364.0, 464.0])

In [98]:
import os
import cv2 as cv
from pathlib import Path
import matplotlib.pyplot as plt

from torch.utils.data import dataset
from sklearn.model_selection import train_test_split


class IDCardDataset(dataset.Dataset):
    def __init__(self, root, tokenizer, labels=None, mode='train', 
                 test_size=0.2, max_seq_length=512):
        self.root = Path(root)
        self.tokenizer = tokenizer
        self.labels = labels
        self.mode = mode
        self.test_size = test_size
        self.max_seq_length = max_seq_length
        self._build_files()
        
    def _build_files(self):
        names = self._get_names("*_json.json")
        data = self._get_filtered_files(names)
        dframe, train, test = self._split_dataset(data)
        self.data_frame = dframe
        self.train_frame = train
        self.test_frame = test
        
        if self.mode=="train":
            self.frame = self.train_frame
        else:
            self.frame = self.test_frame
        
    def _split_dataset(self, data):
        dframe = pd.DataFrame(data)
        train, test = train_test_split(dframe, 
                                       test_size=self.test_size, 
                                       random_state=1261)
        train = train.reset_index(drop=True)
        test = test.reset_index(drop=True)
        return dframe, train, test
        
    def _get_filtered_files(self, names):
        data = {'name':[], 'image':[], 'mask':[],'anno':[]}
        
        jfiles = self._glob_filter("*_json.json")
        ifiles = self._glob_filter("*_image.jpg")
        mfiles = self._glob_filter("*_mask.jpg")
        
        for name in names:
            for (jfile, ifile, mfile)  in zip(jfiles, ifiles, mfiles):
                
                jfpn = jfile.name.split("_")[0]
                ifpn = ifile.name.split("_")[0]
                mfpn = mfile.name.split("_")[0]
                
                if name == jfpn and name == ifpn and name == mfpn:
                    data['name'].append(name)
                    data['image'].append(ifile)
                    data['mask'].append(mfile)
                    data['anno'].append(jfile)
                    
        return data

    def _get_names(self, path_pattern):
        names = []
        files = self._glob_filter(path_pattern)
        for file in files:
            names.append(file.name.split("_")[0])
        return names
    
    def _glob_filter(self, pattern):
        return sorted(list(self.root.glob(pattern)))
    
    def _load_anno(self, path):
        path = str(path)
        with open(path) as f:
            data_dict = json.load(f)
        return data_dict
    
    def _load_image(self, path):
        path = str(path)
        img = cv.imread(path, cv.IMREAD_UNCHANGED)
        return img
        
    def __len__(self):
        return len(self.frame)
    
    def __getitem__(self, idx):
        record = self.frame.iloc[idx]
        anno = self._load_anno(record['anno'])
        img = self._load_image(record['image'])
        mask = self._load_image(record['mask'])
        
        anno_objects = get_formatted_annotation_objects(
            anno, self.tokenizer, self.max_seq_length
        )
        
        input_ids, input_masks, segment_ids, label_ids, boxes = anno_objects
        data = (
            input_ids, input_masks, 
            segment_ids, label_ids, boxes,
            img, mask
        )
        
        return data
        
    

path = '../results/combined/1606064001/'
data = IDCardDataset(root=path, tokenizer=tokenizer)
# len(data[0])
data[0]

NameError: name 'cls_token' is not defined

In [76]:
import os
print(os.getcwd())

/home/nunenuh/study/code/repo/idcard_datagen/notebook
