<a href="https://colab.research.google.com/github/monuminu/ocr_extract/blob/master/LayoutLMv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers@git+https://github.com/monuminu/transformers.git
!pip install seqeval

In [None]:
!pip install pyyaml>=5.1, bs4

In [None]:
import numpy as np
import os
import numpy as np
import pandas as pd
import torch
from transformers import LayoutLMv2Tokenizer, LayoutLMv2ForTokenClassification, LayoutLMv2Config
from torchvision.transforms import ToTensor
from torch.utils.data import Dataset, DataLoader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import warnings 
warnings.filterwarnings('ignore')
from PIL import Image



def normalize_box(box, width, height):
    width = int(width)
    height = int(height)
    return [
         int(1000 * (box[0] / width)),
         int(1000 * (box[1] / height)),
         int(1000 * (box[2] / width)),
         int(1000 * (box[3] / height)),
     ]

def resize_and_align_bounding_box(bbox, original_image, target_size):
    x_, y_ = original_image.size
    x_scale = target_size / x_ 
    y_scale = target_size / y_
    origLeft, origTop, origRight, origBottom = tuple(bbox)
    x = int(np.round(origLeft * x_scale))
    y = int(np.round(origTop * y_scale))
    xmax = int(np.round(origRight * x_scale))
    ymax = int(np.round(origBottom * y_scale)) 
    return [x-0.5, y-0.5, xmax+0.5, ymax+0.5]

class InvoiceDataSet(Dataset):
    """LayoutLM dataset with visual features."""

    def __init__(self, df, tokenizer, max_length, target_size, train=True):
        self.df = df
        self.tokenizer = tokenizer
        self.max_seq_length = max_length
        self.target_size = target_size
        self.pad_token_box = [0, 0, 0, 0]
        self.train = train

    def __len__(self):
        return self.df.shape[0]
    
    def __getitem__(self, idx):
        item = self.df.iloc[idx,:].to_dict()        
        #base_path = data_config.base_image_path
        original_image = Image.open(os.path.join(base_path , item["imageFilename"])).convert("RGB")
        # resize to target size (to be provided to the pre-trained backbone)
        resized_image = original_image.resize((self.target_size, self.target_size))
        # first, read in annotations at word-level (words, bounding boxes, labels)
        words = item["words"]
        unnormalized_word_boxes = item["bbox"]
        word_labels = item["label"]
        width = item["imageWidth"]
        height = item["imageHeight"]
        normalized_word_boxes = [normalize_box(bbox, width, height) for bbox in unnormalized_word_boxes]
        assert len(words) == len(normalized_word_boxes)

        # next, transform to token-level (input_ids, attention_mask, token_type_ids, bbox, labels)
        token_boxes = []
        unnormalized_token_boxes = []
        token_labels = []
        for word, unnormalized_box, box, label in zip(words, unnormalized_word_boxes, normalized_word_boxes, word_labels):
            word_tokens = self.tokenizer.tokenize(word)
            unnormalized_token_boxes.extend(unnormalized_box for _ in range(len(word_tokens)))
            token_boxes.extend(box for _ in range(len(word_tokens)))
            # label first token as B-label (beginning), label all remaining tokens as I-label (inside)
            for i in range(len(word_tokens)):
                if i == 0:
                    token_labels.extend(['B-' + label])
                else:
                    token_labels.extend(['I-' + label])
        
        # Truncation of token_boxes + token_labels
        special_tokens_count = 2 
        if len(token_boxes) > self.max_seq_length - special_tokens_count:
            token_boxes = token_boxes[: (self.max_seq_length - special_tokens_count)]
            unnormalized_token_boxes = unnormalized_token_boxes[: (self.max_seq_length - special_tokens_count)]
            token_labels = token_labels[: (self.max_seq_length - special_tokens_count)]
        
        # add bounding boxes and labels of cls + sep tokens
        token_boxes = [self.pad_token_box] + token_boxes + [[1000, 1000, 1000, 1000]]
        unnormalized_token_boxes = [self.pad_token_box] + unnormalized_token_boxes + [[1000, 1000, 1000, 1000]]
        token_labels = [-100] + token_labels + [-100]
        
        encoding = self.tokenizer(' '.join(words), padding='max_length', truncation=True)
        # Padding of token_boxes up the bounding boxes to the sequence length.
        input_ids = self.tokenizer(' '.join(words), truncation=True)["input_ids"]
        padding_length = self.max_seq_length - len(input_ids)
        token_boxes += [self.pad_token_box] * padding_length
        unnormalized_token_boxes += [self.pad_token_box] * padding_length
        token_labels += [-100] * padding_length
        encoding['bbox'] = token_boxes
        encoding['labels'] = token_labels

        assert len(encoding['input_ids']) == self.max_seq_length
        assert len(encoding['attention_mask']) == self.max_seq_length
        assert len(encoding['token_type_ids']) == self.max_seq_length
        assert len(encoding['bbox']) == self.max_seq_length
        assert len(encoding['labels']) == self.max_seq_length

        encoding['resized_image'] = ToTensor()(resized_image)
        # rescale and align the bounding boxes to match the resized image size (typically 224x224) 
        encoding['resized_and_aligned_bounding_boxes'] = [resize_and_align_bounding_box(bbox, original_image, self.target_size) for bbox in unnormalized_token_boxes]
        #encoding['unnormalized_token_boxes'] = unnormalized_token_boxes
        
        # finally, convert everything to PyTorch tensors 
        for k,v in encoding.items():
            if k == 'labels':
                label_indices = []
                # convert labels from string to indices
                for label in encoding[k]:
                    if label != -100:
                        label_indices.append(data_config.label2id[label])
                    else:
                        label_indices.append(label)
                encoding[k] = label_indices
            encoding[k] = torch.as_tensor(encoding[k])
        return encoding

In [None]:
!wget -O a.zip https://public.bn.files.1drv.com/y4mSquuayMrAT271sQVjs1QtBXPBuB0beDzG8_vny1gii4Gaui_vewF5ouw-wBF0tXswCRzAJtfPepvuUpQ4sR-wWIXcQmeKwuDYVhaDOSepxBh4pGhiNRZlkBV_Rkd3ZT6V4ZvCTDgVMq1P7YV6-0HJXVASSStayku2inAYr36FCipdGCNcWl9nmb5IKWwWvUV7d_nrJxpRiOKr45ZTPuIhBG8wnoDDqVRFPb_R98vS8o?access_token=EwAYA61DBAAUmcDj0azQ5tf1lkBfAvHLBzXl5ugAAdEIFwMFcTejmPWay44FcktbpeOdPyjg2EM2Av7T5s%2btU7wCA2oUhl8iBoVl7bbFjG3VHDRDFP2UXLUu2TDQ9IZDCTEIEHJCevSZHUuoGReXzdfRrH3P4Ouy1CW3DM4h1mrZvlkqpZNNSOyhGcrr4W3Ys%2bhSrYmEE93vC34537iJKVlTLOBaAVEdorY3zLR9mnbRhX1/akIadw12JswgPInfSJoUgrmV9DO9eSEF%2bHtiN%2bRjcbRyLwnm3nSv%2bLJdH4IVQs2N17mAqaYiacIZx3kQ1b43PK7W23GB/dfsWpoosL9%2byrP7DpJipOWAuFoPMl6idHPburHM5byHAKzglrsDZgAACCccOFX/04c86AFhflFZdqavTdCxtOuo2bDXq56Hk4EVlRmhBAdGQsS98CifFLwvnmHIWxGZht0Cz1hOSfIF67j2TrP/%2bSWc6LKLuw3MB38tBNqvgGf7VlNYeOJaarQYJ3LhUsjG994Hx4T9Ro%2bTjTaVbQqrTwqR8XoDwZwftgXXbxHBH9q%2bIpAWTnYOtiwZr7m5531plcdhcMUqSmxqC/vOLO9IlY/KkkvOtuLyO/HiPaBz1EUHkqhxA/EdKksmQX2MsJNB9yvfMqNM0fCrbhNRRYYpgFU2K7AlBDjthu3vv6IlabxC74WqsLxqET4DQUUm0CVx9TSAMZgsBuKqxZTnK77Rrd86Uajjo5rgLiKQ7K4DVrAV9TlH85/BORivWCoM99zHq1o/HxuThDnVseZ9FpRIhJ27hMOukIMgNHie240stssaqNjXffETyBhG7wxF8Zc/21nBu7CrQpXQWQ8oWpRYJ3yV/cGHVLy/rNuhod/Xn4a/fJjEjAtT00MxXrQFeYTAH6v4JIvyceUjlTZXgUTu/LhBKIy%2bXBLGkQy%2bkP1Xu2MVh3Xlfus6g8Q4na0nSABYbAQvUYYSpi9IHvd64pDi/sTSMXZZxlsKRVi/FIN7OrAcJ4Ec8nZTYGt6i1HIHRhIU%2b9aFjEfzgaJmaCsHTAC

In [None]:
!ls /kaggle/input/text-extraction-for-ocr/imageAndXML_Data

In [None]:
from bs4 import BeautifulSoup, element
import pandas as pd
import operator 

def get_get_bbox(bbox):
    items = bbox.split(",")
    x1 = int(float(items[0]))
    y1 = int(float(items[1].split(" ")[0]))
    x2 = int(float(items[1].split(" ")[1]))
    y2 = int(float(items[-1]))
    return [x1, y1, x2, y2]

def get_label_bbox(gt_xml_path):
    with open(gt_xml_path, encoding="utf8")as f:
        xml_data = f.read()
    soup = BeautifulSoup(xml_data,'xml')
    word_list = []
    words = soup.find_all('TextRegion')
    word_list = []
    for word in words:
        word_dict = {}
        for content in word.contents:
            if isinstance(content,element.Tag):
                word_dict.update(content.attrs)
        word_dict["bbox"] = get_get_bbox(word_dict["points"])
        word_dict.pop("points")
        word_list.append(word_dict)
    return sorted(word_list, key=lambda x : [x["bbox"][1], x["bbox"][0]])


def get_words_bbox(xml_file_path):
    with open(xml_file_path, encoding="utf8")as f:
        xml_data = f.read()
    soup = BeautifulSoup(xml_data,'xml')
    word_list = []
    page = soup.find_all('Page')
    words = soup.find_all('Word')
    page_attrs = page[0].attrs
    for word in words:
        word_dict = {}
        for content in word.contents:
            word_dict.update({"text" : word.find("Unicode").get_text()})
            if isinstance(content,element.Tag):
                word_dict.update(content.attrs)
        word_dict["bbox"] = get_get_bbox(word_dict["points"])
        word_dict.pop("points")
        word_list.append(word_dict)
    return page_attrs, sorted(word_list, key=lambda x : [x["bbox"][1], x["bbox"][0]])

def is_word_bbox_in_label_bbox(word_bbox, label_bbox):
    x1_w,y1_w,x2_w,y2_w = word_bbox
    x1_l,y1_l,x2_l,y2_l = label_bbox
    if x1_w > x1_l and x2_w < x2_l and y1_w > y1_l and y2_w < y2_l:
        return True
    else:
        return False

def assign_lable_to_word(words_bbox_list, word_label_list):
    df_label = pd.DataFrame(word_label_list)
    df_words = pd.DataFrame(words_bbox_list)
    lst_output = []
    for index_word, row_word in df_words.iterrows():
        for index_label, row_label in df_label.iterrows():
            if is_word_bbox_in_label_bbox(row_word["bbox"], row_label["bbox"]):
                row_dict = row_word.to_dict()
                row_dict["label"] = row_label["value"]
                lst_output.append(row_dict)    
    return pd.DataFrame(lst_output)


if __name__ == "__main__":
    import glob
    lst_output = []
    for file in glob.glob("/kaggle/input/text-extraction-for-ocr/ImageAndXML_Data/*.tif"):
        try:
            ocr_xml_file = file.replace(".tif", "_ocr.xml")
            page_attrs, words_bbox_list = get_words_bbox(ocr_xml_file)
            label_xml_file = file.replace(".tif", "_gt.xml")
            word_label_list = get_label_bbox(label_xml_file)
            df_word_lable = assign_lable_to_word(words_bbox_list, word_label_list)
            page_attrs.update({"words" : df_word_lable.text.tolist(), "bbox" : df_word_lable.bbox.tolist(), "label" : df_word_lable.label.tolist()})
            lst_output.append(page_attrs)
        except:
            print(label_xml_file)
    df = pd.DataFrame(lst_output)[["imageFilename","imageHeight", "imageWidth", "words", "bbox", "label"]]
    df.to_pickle("/kaggle/working/data.pkl")

In [None]:
base_path = "/kaggle/input/text-extraction-for-ocr/ImageAndXML_Data"
data = pd.read_pickle("/kaggle/working/data.pkl")
data.head()

In [None]:
class data_config:
    labels = np.unique([item for sublist in data.label for item in sublist]).tolist()
    labels = sum([["B-" + item, "I-" + item] for item in np.unique(labels)], [])
    num_labels = len(labels)
    id2label = {v: k for v, k in enumerate(labels)}
    label2id = {k: v for v, k in enumerate(labels)}

In [None]:
model_path = 'microsoft/layoutlmv2-base-uncased'
config = LayoutLMv2Config.from_pretrained(model_path, num_labels=data_config.num_labels, id2label = data_config.id2label, label2id = data_config.label2id)
tokenizer = LayoutLMv2Tokenizer.from_pretrained(model_path)
model = LayoutLMv2ForTokenClassification.from_pretrained(model_path, config = config)
model.to(device)

In [None]:
from sklearn.model_selection import train_test_split
train, valid = train_test_split(data, test_size = 0.2)

train_dataset = InvoiceDataSet(df = train, tokenizer = tokenizer, max_length = 512, target_size = 224, train=True)
train_dataloader = DataLoader(train_dataset, batch_size=5)

valid_dataset = InvoiceDataSet(df = valid, tokenizer = tokenizer, max_length = 512, target_size = 224, train=False)
valid_dataloader = DataLoader(valid_dataset, batch_size=5)

In [None]:
from transformers import AdamW
from tqdm.notebook import tqdm
import numpy as np
from seqeval.metrics import (
    classification_report,
    f1_score,
    precision_score,
    recall_score,
)
import torch

def train_fn(train_dataloader, model, optimizer):
    tk0 = tqdm(train_dataloader, total = len(train_dataloader))
    for bi, batch in enumerate(tk0):
        input_ids=batch['input_ids'].to(device)
        bbox=batch['bbox'].to(device)
        attention_mask=batch['attention_mask'].to(device)
        token_type_ids=batch['token_type_ids'].to(device)
        labels=batch['labels'].to(device)
        resized_images = batch['resized_image'].to(device) 
        resized_and_aligned_bounding_boxes = batch['resized_and_aligned_bounding_boxes'].to(device) 
        outputs = model(image = resized_images,input_ids=input_ids, bbox=bbox, attention_mask=attention_mask, token_type_ids=token_type_ids,labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
def eval_fn(eval_dataloader, model):
    eval_loss = 0.0
    nb_eval_steps = 0
    preds = None
    out_label_ids = None
    model.eval()
    tk0 = tqdm(eval_dataloader, total = len(eval_dataloader))
    for bi, batch in enumerate(tk0):
        with torch.no_grad():
            input_ids=batch['input_ids'].to(device)
            bbox=batch['bbox'].to(device)
            attention_mask=batch['attention_mask'].to(device)
            token_type_ids=batch['token_type_ids'].to(device)
            labels=batch['labels'].to(device)
            resized_images = batch['resized_image'].to(device) 
            resized_and_aligned_bounding_boxes = batch['resized_and_aligned_bounding_boxes'].to(device)
            outputs = model(image = resized_images,input_ids=input_ids, bbox=bbox, attention_mask=attention_mask, token_type_ids=token_type_ids,labels=labels)
            tmp_eval_loss = outputs.loss
            logits = outputs.logits
            eval_loss += tmp_eval_loss.item()
            nb_eval_steps += 1
            if preds is None:
                preds = logits.detach().cpu().numpy()
                out_label_ids = labels.detach().cpu().numpy()
            else:
                preds = np.append(preds, logits.detach().cpu().numpy(), axis=0)
                out_label_ids = np.append(
                    out_label_ids, labels.detach().cpu().numpy(), axis=0
                )
    eval_loss = eval_loss / nb_eval_steps
    preds = np.argmax(preds, axis=2)
    out_label_list = [[] for _ in range(out_label_ids.shape[0])]
    preds_list = [[] for _ in range(out_label_ids.shape[0])]
    for i in range(out_label_ids.shape[0]):
        for j in range(out_label_ids.shape[1]):
            if out_label_ids[i, j] != -100:
                out_label_list[i].append(config.id2label[out_label_ids[i][j]])
                preds_list[i].append(config.id2label[preds[i][j]])

    results = {
        "loss": eval_loss,
        "precision": precision_score(out_label_list, preds_list),
        "recall": recall_score(out_label_list, preds_list),
        "f1": f1_score(out_label_list, preds_list),
    }
    return results

In [None]:
MODEL_PATH ="/kaggle/working/pytorch_model.bin"
optimizer = AdamW(model.parameters(), lr=5e-5)
global_step = 0
best_f1_score = 0
for epoch in range(5):
    train_fn(train_dataloader, model, optimizer)
    current_f1_score = eval_fn(valid_dataloader, model)
    if current_f1_score["f1"] > best_f1_score:
        torch.save(model.state_dict(), MODEL_PATH)
        best_f1_score = current_f1_score["f1"]
    print("best_f1_score :", best_f1_score)