In [55]:
import os
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import pickle
import json
from PIL import ImageEnhance
import PIL.Image as Img
import re
from IPython import display
display.clear_output()

import ultralytics
import os
ultralytics.checks()

from ultralytics import YOLO
from IPython.display import display, Image
import time
import psutil
import torch
from transformers import TrOCRProcessor, default_data_collator, VisionEncoderDecoderModel



Ultralytics 8.3.56  Python-3.11.7 torch-2.5.1+cpu CPU (Intel Core(TM) Ultra 5 125H)
Setup complete  (18 CPUs, 15.6 GB RAM, 162.7/399.9 GB disk)


# Functions

### Generic

In [49]:
def load_gt(gt_file_path):
    with open(gt_file_path, 'r') as f:
        data = json.load(f)
    coordinates = [d['Coordinate'] for d in data]
    fields = [remove_special_char(d['Field name']) for d in data]
    return fields, coordinates

In [6]:
def compute_iou(boxA, boxB):
    # Unpack coordinates
    xA_min, yA_min, xA_max, yA_max = boxA
    xB_min, yB_min, xB_max, yB_max = boxB

    # Compute intersection coordinates
    x_left = max(xA_min, xB_min)
    y_top = max(yA_min, yB_min)
    x_right = min(xA_max, xB_max)
    y_bottom = min(yA_max, yB_max)

    # Check for no overlap
    if x_right < x_left or y_bottom < y_top:
        return 0.0

    # Compute intersection area
    intersection_area = (x_right - x_left) * (y_bottom - y_top)

    # Compute areas of each box
    boxA_area = (xA_max - xA_min) * (yA_max - yA_min)
    boxB_area = (xB_max - xB_min) * (yB_max - yB_min)

    # Compute IoU
    iou = intersection_area / float(boxA_area + boxB_area - intersection_area)
    return iou

### Yolo

In [22]:
def run_yolo_inference(model, image_file):
    results = model(image_file, conf=0.3, iou=0.4, save=True)
    if not results[0].boxes is None:
        boxes = results[0].boxes.xyxy.tolist()
    else:
        boxes = []
        print("No handwritten text detected")
    return boxes

### Field Matching

In [31]:
def get_table_coordinates(image_file):
    image = Img.open(image_file)
    enhancer = ImageEnhance.Contrast(image)
    contrast_image = enhancer.enhance(2)  
    opencv_image = np.array(contrast_image) # Enhance Contrast 
    opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGB2BGR) # Convert RGB to BGR    
    gray_image = cv2.cvtColor(opencv_image, cv2.COLOR_BGR2GRAY) # Convert BGR to GRAY    
    blurred_image = cv2.GaussianBlur(gray_image, (5, 5), 0) # Apply Blurr on Image    
    edged = cv2.Canny(blurred_image, 150, 200) # Apply Canny Edge Detection
    
    kernel = np.ones((3,3), np.uint8)
    dilated_img = cv2.dilate(edged, kernel, iterations=1) # Perform dilation    
    contours, hierarchy = cv2.findContours(dilated_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Find contours

    # Draw the contours on the original image
    # cv2.drawContours(opencv_image, contours, -1, (0, 255, 0), 2) # Green color, thickness 2
    # cv2.imwrite("Testimage.jpg", opencv_image)
    # cv2.imwrite("edged.jpg", edged)
    # cv2.imwrite("dilated.jpg", dilated_img)
#     plt.imshow(opencv_image)
    # Calculate Contour Area
    area = []
    for cont in contours:
        bbox_coord = cv2.boundingRect(cont)
#         x1, x2, y1, y2 = x, x + w, y, y + h
        area.append(bbox_coord[2] * bbox_coord[3])

    
    # Get Coordinates of Max Contour Area    
    cnt = contours[np.argmax(area)]
    x, y, w, h = cv2.boundingRect(cnt)
    x1, x2, y1, y2 = x, x + w, y, y + h
    return [x1, y1, x2, y2] #, contours, hierarchy

In [10]:
def norm_coordinates(table_coord, field_coord):
    cell_x1, cell_y1, cell_x2, cell_y2 = field_coord
    table_x1, table_y1, table_x2, table_y2 = table_coord
    
    table_width = table_x2 - table_x1
    table_height = table_y2 - table_y1
    
    norm_x1 = (cell_x1 - table_x1) / table_width
    norm_y1 = (cell_y1 - table_y1) / table_height
    norm_x2 = (cell_x2 - table_x1) / table_width
    norm_y2 = (cell_y2 - table_y1) / table_height
    
    return [np.round(norm_x1, 4), np.round(norm_y1, 4), np.round(norm_x2, 4), np.round(norm_y2, 4)]

In [11]:
def load_ref_coordinates(file_path):
    ref_df = pd.read_excel(file_path)
    ref_df.set_index('Field', inplace=True)
    ref_table_coordinates = list(ref_df.loc['Full'])[:-2]
    ref_field_coordinates = {}
    ref_columns = list(ref_df.index)
    for ind in range(1, len(ref_columns)):
        f_coordinates = list(ref_df.loc[ref_columns[ind]])[:-2]        
        ref_field_coordinates[ref_columns[ind]] = norm_coordinates(ref_table_coordinates, f_coordinates)
    return ref_field_coordinates

In [12]:
def find_matches(table_coordinates, field_coordinates):
    pred_field = []
    for cent in field_coordinates:
        min_dist = {}
        for key, val in table_coordinates.items():
            min_dist[key] = compute_iou(cent, val)
        pred_field.append(max(min_dist, key = lambda x: min_dist[x]))
    
    return pred_field

In [13]:
def extract_fields(image_file, ref_table_coordinates, pred_boxes):
    local_table_coordinates = get_table_coordinates(image_file)    
    field_coordinates = []
    for coord in pred_boxes:
        field_coordinates.append(norm_coordinates(local_table_coordinates, coord))
    pred_fields = find_matches(ref_table_coordinates, field_coordinates)
    pred_results = {f:c for f, c in zip(pred_fields, pred_boxes)}
    return pred_results

### TrOCR

In [57]:
def run_ocr(image, processor, ocr_model):
    # pixel_values = processor(images=image, return_tensors="pt").pixel_values
    # generated_ids = ocr_model.generate(pixel_values) # Generate output
    # generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] # Decode generated token ids to string
    return "Test"

In [72]:
def extract_text(image_file, field_boxes, processor, ocr_model):
    img = cv2.imread(image_file)
    image_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    field_text = {}
    for field, box in field_boxes.items():
        box = [int(coord) for coord in box]
        start_x, start_y, end_x, end_y = box
        cropped_image = image_rgb[start_y:end_y, start_x:end_x]
        text = run_ocr(cropped_image, processor, ocr_model)
        field_text[field] = post_processing([field, text])
    return field_text

### Post Processing

In [67]:
def remove_special_char(text):
    cleaned_text = re.sub(r'[^A-Za-z0-9]+', '', text)
    return cleaned_text
def numeric_only(text):
    cleaned_text = re.sub(r'[^0-9]+', '', text)
    return cleaned_text
def char_space(text):
    cleaned_text = re.sub(r'[^A-Za-z ]+', '', text)
    return cleaned_text.strip()
def generic(text):
    cleaned_text = re.sub(r'[^A-Za-z0-9 +\-]+', '', text)
    return cleaned_text.strip()
def alphabets_only(text):
    cleaned_text = re.sub(r'[^A-Za-z]+', '', text)
    return cleaned_text.capitalize()
def alphanum_specialchar(text):
    cleaned_text = re.sub(r'[^A-Za-z0-9 +\-/,.]+', '', text)
    return cleaned_text.strip()

In [68]:
def lang_specific(text, camelcase=True):
    text = re.sub(r'[^A-Za-z ,]+', '', text)
    text = re.sub(r'^[^A-Za-z0-9]+|[^A-Za-z0-9]+$', '', text) # Remove special character at start and end
    text = re.sub(r'[^A-Za-z0-9, ]+', ',', text) # Remove single special character to comma
    text = re.sub(r'\s+,', ',', text) # Remove extra spaces before comma
    words = []
    for word in text.split():
        if camelcase:
            capital_letters = sum(1 for char in text if char.isupper())
            if capital_letters <= len(word)//2:
                # word = word.strip().lower()
                word = word.capitalize().strip()
            else:
                word = word.strip()
        else:
            word = word.strip()
        word = autocorrect_words(word.replace(",",""), 'language') + ","
        words.append(word)
    return " ".join(words)[:-1]
    
def date_format(text):
    elements = text.split("/")
    corrected = []
    if len(elements) == 3:
        corrected = [str(int(ele)) for ele in elements]
        return "/".join(corrected)
    else:
        return text
        
def date_specific(text):
    cleaned_text = re.sub(r'[^0-9]+', '/', text)
    cleaned_text = re.sub(r'/+', '/', cleaned_text)
    cleaned_text = re.sub(r'^[^0-9]+|[^0-9]+$', '', cleaned_text)
    cleaned_text = date_format(cleaned_text)
    return cleaned_text

def bloodgroup_specific(text):
    cleaned_text = re.sub(r'[^ABOTabot0-1+\-]+', '', text)
    cleaned_text = cleaned_text.replace('t', '+').replace('0', 'O')
    return cleaned_text.upper().strip()

def address_specific(text):
    text = re.sub(r'^[^A-Za-z0-9]+|[^A-Za-z0-9]+$', '', text) # Remove special character at start and end
    text = re.sub(r'[^A-Za-z0-9, /\-.]+', ',', text) # Remove single special character to comma
    text = re.sub(r'\s+,', ',', text) # Remove extra spaces before comma
    return text

def reference_specific(text):
    cleaned_text = re.sub(r'[^A-Za-z0-9 \-]+', '', text)
    words = cleaned_text.split("-")
    if len(words) == 1:        
        cleaned_text = " - ".join([re.sub(r'[^A-Za-z]+', '', text), re.sub(r'[^0-9]+', '', text)])
    elif len(words) == 2:
        cleaned_text = " - ".join([process_words(words[0], True).strip(), re.sub(r'[^0-9]+', '', words[1])])    
    return cleaned_text
        
def pan_specific(text):
    text = remove_special_char(text)
    text_transform = ""
    if len(text) == 10:
        text_transform = num2alpha(text[:5])
        text_transform += alpha2num(text[5:-1])
        text_transform += num2alpha(text[-1])
    else:
        text_transform = text    
    return text_transform.upper()

In [73]:
with open('/kaggle/input/supportingfiles/field_vocabulary.json', 'r') as f:
    field_vocab = json.load(f)

''

In [69]:
def autocorrect_words(text, field):
    accept_flag = False
    if field in field_vocab:
        training_labels = field_vocab[field]
        dist = [levenshtein_distance(text, ele) for ele in training_labels]
        
        min_dist, min_dist_ind = np.min(dist), np.argmin(dist)
        corrected = training_labels[min_dist_ind]
        if ((min_dist <= 1) or (min_dist <= 2 and len(corrected) > 4) or (min_dist <= 3 and len(corrected) > 8)) and dist.count(min_dist) <= 1:
            return corrected
        else:
            return text
    else:
        return text
def process_words2(text):
    words = []
    for word in text.split():
        word = word.capitalize().strip()
        word = autocorrect_words(word, field)
        words.append(word)
    return " ".join(words)

def process_words(text, camelcase=False):
    words = []
    for word in text.split():
        if camelcase:
            capital_letters = sum(1 for char in text if char.isupper())
            if capital_letters <= len(word)//2:
                # word = word.strip().lower()
                word = word.capitalize()
            else:
                word = word.strip()
        else:
            word = word.strip()
        words.append(word)
    return " ".join(words)

In [70]:
alphatonum = {"o": "0", "O": "0", "b": "6", "z": "2", "Z": "2", "B": "8", "G": "6", "I": "1", "T":"7"}
numtoalpha = {val: key for key, val in alphatonum.items()}
schartonum = {"/": "1", "\\": "1", "(": "1"}


def alpha2num(text):
    text_transform = ""
    for char in text:
        if char.isalpha():
            if char in alphatonum:
                text_transform += alphatonum[char]
            else:
                text_transform += char
        else:
            text_transform += char
    return text_transform

def num2alpha(text):
    text_transform = ""
    for char in text:
        if char.isdigit():
            if char in numtoalpha:
                text_transform += numtoalpha[char]
            else:
                text_transform += char
        else:
            text_transform += char
    return text_transform

def special2num(text):
    text_transform = ""
    for char in text:
        if not char.isalnum():
            if char in schartonum:
                text_transform += schartonum[char]
            else:
                text_transform += char
        else:
            text_transform += char
    return text_transform

In [71]:
def post_processing(data):
    if data[0] in ['Dateofbirth', 'date']:
        return date_specific(data[1])
    elif data[0] in ['nationality', 'gender', 'maritalstatus']:
        clean_text = alphabets_only(data[1])
        return autocorrect_words(clean_text, data[0])    
    elif data[0] in ['candidatename', 'Fatherhusbandname']:
        clean_text = char_space(data[1])
        return process_words(clean_text, True)
    elif data[0] in ['place']:
        clean_text = alphabets_only(data[1])
#         clean_text = process_words(clean_text, True)
        return autocorrect_words(clean_text, data[0])
    elif data[0] in ['contactnumber', 'AlternateNo', 'aadhaarcard']:
        clean_text = special2num(alpha2num(data[1]))
        return numeric_only(clean_text)
    elif data[0] in ['permanentaddress', 'presentaddress']:
        clean_text = alphanum_specialchar(data[1])
        clean_text = address_specific(clean_text)
        return process_words(clean_text, True)
    elif data[0] in ['bloodgroup']:
        return bloodgroup_specific(data[1])
    elif data[0] in ['experience', 'experience1']:
        clean_text = generic(data[1])
        return process_words(clean_text, False)
    elif data[0] in ['qualification']:
        clean_text = generic(data[1])
        clean_text = process_words(clean_text, True)        
        return autocorrect_words(clean_text, data[0]) 
    elif data[0] in ['referencescmob1', 'referencescmob2']:
        clean_text = reference_specific(data[1])
        return clean_text
    elif data[0] in ['pancard']:
        return pan_specific(data[1])
    elif data[0] in ['languageknown']:
        return lang_specific(data[1])
    else:
        return ""

### Evaluation Functions

In [None]:
# --- METRICS & EFFICIENCY ---
def levenshtein_distance(s1, s2):
    if isinstance(s1, str):
        s1 = list(s1)
    if isinstance(s2, str):
        s2 = list(s2)
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)
    if len(s2) == 0:
        return len(s1)
    previous_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
    return previous_row[-1]

def word_error_rate(y_true, y_pred):
    total_words = 0
    total_errors = 0
    for t, p in zip(y_true, y_pred):
        t_words = t.split()
        p_words = p.split()
        total_words += len(t_words)
        total_errors += levenshtein_distance(t_words, p_words)
    return total_errors / total_words if total_words > 0 else 0.0

def char_error_rate(y_true, y_pred):
    total_chars = 0
    total_errors = 0
    for t, p in zip(y_true, y_pred):
        total_chars += len(t)
        total_errors += levenshtein_distance(t, p)
    return total_errors / total_chars if total_chars > 0 else 0.0

def field_accuracy(y_true, y_pred):
    from sklearn.metrics import accuracy_score
    return accuracy_score(y_true, y_pred)

def document_level_accuracy(y_true, y_pred, doc_ids):
    from collections import defaultdict
    doc_true = defaultdict(list)
    doc_pred = defaultdict(list)
    for doc_id, t, p in zip(doc_ids, y_true, y_pred):
        doc_true[doc_id].append(t)
        doc_pred[doc_id].append(p)
    correct_docs = 0
    for doc_id in doc_true:
        if doc_true[doc_id] == doc_pred[doc_id]:
            correct_docs += 1
    return correct_docs / len(doc_true) if doc_true else 0.0


In [None]:
# YOLO

In [18]:
current_run = "Run16"
# Path of model
root_dir = r"E:\Nextstep\MLChallenge\DeHaDo_AI"

model_path = src_dir + "/" + r"tokenwise-dehado-ai\assets\Models\Custom_Coco"

model_name = "best.pt"
# Path of train and test set
train_path = src_dir + r"\Dataset\train"
test_path = src_dir + r"\Dataset\test"

# Results path
res_path = src_dir + r"\Results" + "/" + current_run
## Predicted bounding box on Image
output_path = res_path + "\Predict"

In [58]:
model = YOLO(model_path + "/" + model_name)
ref_table_coordinates = load_ref_coordinates(root_dir + "/" + r"tokenwise-dehado-ai\assets\Ref_Coordinates.xlsx")

processor = TrOCRProcessor.from_pretrained("microsoft/trocr-large-handwritten", use_fast=True)
model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-large-handwritten")

preprocessor_config.json:   0%|          | 0.00/224 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


tokenizer_config.json:   0%|          | 0.00/1.12k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.13k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/2.23G [00:00<?, ?B/s]

KeyboardInterrupt: 

In [65]:
image_path = test_path + "/" + "images"
file = "MIT_10.jpg"
file_path = image_path + "/" + file
# pred_boxes = run_yolo_inference(model, file_path)
pred_field_boxes = extract_fields(file_path, ref_table_coordinates, pred_boxes)
pred_field_text = extract_text(file_path, pred_field_boxes, processor, ocr_model)



In [74]:
pred_field_boxes

{'presentaddress': [907.2194213867188,
  1282.40478515625,
  1736.8446044921875,
  1472.6346435546875],
 'permanentaddress': [897.4985961914062,
  1537.2169189453125,
  2098.47119140625,
  1768.4093017578125],
 'experience1': [945.0428466796875,
  1164.7982177734375,
  1961.4866943359375,
  1268.0960693359375],
 'languageknown': [892.9874267578125,
  1949.5166015625,
  1684.2789306640625,
  2053.94580078125],
 'gender': [1824.88720703125,
  850.718017578125,
  2098.48291015625,
  954.9454345703125],
 'experience': [957.2333374023438,
  1046.192626953125,
  2355.276123046875,
  1143.63720703125],
 'referencescmob2': [966.2678833007812,
  2185.861328125,
  2140.096923828125,
  2291.89892578125],
 'nationality': [1845.9581298828125,
  950.673583984375,
  2151.87548828125,
  1045.5997314453125],
 'referencescmob1': [951.04541015625,
  2066.5185546875,
  1997.015869140625,
  2173.115234375],
 'Fatherhusbandname': [911.06982421875,
  540.0111083984375,
  1476.7900390625,
  644.3064575195312]

In [52]:
label_path = test_path + "/" + "labels"
file = "MIT_10.jpg"
fields, coords = load_gt(label_path + "/" + file.split(".")[0] + ".json")
act_box = {f:c for f, c in zip(fields,coords)}
for key, val in pred_field_boxes.items():
    # img = cv2.imread(file_path)
    
    # cv2.rectangle(img, (val[0], val[1]), (val[2], val[3]), (0, 255, 0), 2)
    print(key, compute_iou(val, act_box[key]))
    
    # cv2.rectangle(img, (val[0], val[1]), (val[2], val[3]), (0, 255, 0), 2)
# cv2.imwrite("Test.jpg", img)

presentaddress 0.9469211891575613
permanentaddress 0.9369870302371721
experience1 0.8383726633315985
languageknown 0.9534722547980123
gender 0.9012471859446292
experience 0.9211260247743872
referencescmob2 0.8581930303199218
nationality 0.9028368131749757
referencescmob1 0.8609377627424383
Fatherhusbandname 0.8347391507173885
AlternateNo 0.8737551106463426
qualification 0.8463382862225584
Dateofbirth 0.8996616701850986
candidatename 0.8804360776763397
maritalstatus 0.8541920039297384
date 0.9137287720109768
bloodgroup 0.8275007835186984
place 0.8855232400059172
contactnumber 0.8521053171290589
aadhaarcard 0.904218153451516
