In [303]:
import os
import json
from PIL import ImageDraw, Image
from fuzzywuzzy import fuzz
from tqdm import tqdm

from doctr.io import DocumentFile
from doctr.models import ocr_predictor

In [304]:
model = ocr_predictor(det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True)

In [305]:
NAME = "BHASHINI"

MAX_MATCHES = 5

CUT_OFF_THRESHOLD = 70
QUESTION_WEIGHT = 0.2
ANSWER_WEIGHT = 0.8

LEVEL = "line" # or "word_level"

IMG_DIR = f"/data/BADRI/FINAL/THESIS/GRVQA/gr-doc-vqa-grounding/data/input/{NAME}/"

JSON_FILE = f"/data/BADRI/FINAL/THESIS/GRVQA/gr-doc-vqa-grounding/data/input/data2.json"
OUT_DIR = f"/data/BADRI/FINAL/THESIS/GRVQA/gr-doc-vqa-grounding/data/output/grounding/{NAME}/doctr/{LEVEL}/"


IMG_DIR = "/data/BADRI/FINAL/THESIS/GRVQA/data/CircularsVQA/BHASHINI_TESTSET/final/"
JSON_FILE = "/data/BADRI/FINAL/THESIS/GRVQA/data/CircularsVQA/BHASHINI_TESTSET/final_annotations.json"
OUT_DIR = f"/data/BADRI/FINAL/THESIS/GRVQA/gr-doc-vqa-grounding/data/output/grounding/{NAME}/doctr/{LEVEL}/"

OUT_DET_DIR = os.path.join(OUT_DIR, "detections")
OUT_IMG_DIR = os.path.join(OUT_DIR, "images")
OUT_JSON_DIR = os.path.join(OUT_DIR, "json")

if not os.path.exists(OUT_DIR):
    os.makedirs(OUT_DIR)
if not os.path.exists(OUT_IMG_DIR):
    os.makedirs(OUT_IMG_DIR)
if not os.path.exists(OUT_JSON_DIR):
    os.makedirs(OUT_JSON_DIR)
if not os.path.exists(OUT_DET_DIR):
    os.makedirs(OUT_DET_DIR)



stop_words = {'what', 'is', 'the', 'this', 'that', 'these', 'those', 'which', 'how', 'why', 'where', 'when', 'who', 'will', 'be', 'and', 'or', 'in', 'at', 'to', 'for', 'of', 'with', 'by'}

# Read json data
with open(JSON_FILE, "r") as f:
    data = json.load(f)




In [306]:

for file in tqdm(os.listdir(IMG_DIR)):
    IMG_PATH = os.path.join(IMG_DIR, file)
    doc = DocumentFile.from_images(IMG_PATH)
    result = model(doc)

    predictions = []

    image = Image.open(IMG_PATH)

    for page in result.pages:     
        dim = tuple(reversed(page.dimensions))
        for block in page.blocks:
            for line in block.lines:
                output = {}
                geo = line.geometry
                a = list(a*b for a,b in zip(geo[0],dim))
                b = list(a*b for a,b in zip(geo[1],dim))
                x1 = round(a[0], 2).astype(float)
                y1 = round(a[1], 2).astype(float)
                x2 = round(b[0], 2).astype(float)
                y2 = round(b[1], 2).astype(float)
                line_bbox = [x1, y1, x2, y2]
                
                sent = []
                words_data = []
                for word in line.words:
                    word_data = {}
                    sent.append(word.value)
                    geo = word.geometry
                    a = list(a*b for a,b in zip(geo[0],dim))
                    b = list(a*b for a,b in zip(geo[1],dim))
                    x1 = round(a[0], 2).astype(float)
                    y1 = round(a[1], 2).astype(float)
                    x2 = round(b[0], 2).astype(float)
                    y2 = round(b[1], 2).astype(float)
                    bbox = [x1, y1, x2, y2]
                    
                    word_data['bbox'] = bbox
                    word_data['text'] = word.value
                    words_data.append(word_data)
                output['bbox'] = line_bbox
                output['text'] = " ".join(sent)
                output['words'] = words_data
                predictions.append(output)

                # Draw bounding boxes on the image
                draw = ImageDraw.Draw(image)
                draw.rectangle(line_bbox, outline='red', width=2)


    # save predictions in json
    with open(os.path.join(OUT_JSON_DIR, f"{file}.json"), "w") as f:
        json.dump(predictions, f, indent=4)

    # save image
    image.save(os.path.join(OUT_DET_DIR, f"{file}.png"))


100%|██████████| 502/502 [15:24<00:00,  1.84s/it]


In [307]:
def get_matched_regions(question_text, target_text, predictions):

    question_terms = [word.lower() for word in question_text.split() if word.lower() not in stop_words]
    matched_regions = []
    for region in predictions:
        region_text = region['text']
        region_copy = region.copy()

        if target_text.lower() in region_text.lower():
            region_copy['match_score'] = 100
            region_copy['match_details'] = {
                    'exact_match': True,
                    'answer_score': 100,
                    'question_score': 100
                }
            matched_regions.append(region_copy)
            continue

        partial_score = fuzz.partial_ratio(target_text.lower(), region_text.lower())
        token_score = fuzz.token_set_ratio(target_text.lower(), region_text.lower())
        
        # Calculate length factor (preference for longer matches that contain meaningful content)
        target_len = len(target_text)
        region_len = len(region_text)
        length_factor = min(1.0, region_len / min(50, target_len))  # Cap at 1.0, adapt based on target length
        
        # Combine scores for answer with weights
        # Higher weight to token matching for longer texts, higher weight to partial matching for shorter texts
        if region_len > 10:
            answer_score = (partial_score * 0.3) + (token_score * 0.5) + (length_factor * 100 * 0.2)
        else:
            # For very short texts, reduce their overall score unless they're exact matches
            answer_score = (partial_score * 0.3) + (token_score * 0.4) + (length_factor * 100 * 0.3)
            if region_len < 5 and partial_score < 100:
                answer_score *= 0.5  # Penalize very short inexact matches

        # penalize shorter region_texts
        if region_len < 5:
            answer_score *= 0.5
        
        # Calculate fuzzy match scores for question terms using both methods
        partial_question_scores = [fuzz.partial_ratio(term, region_text.lower()) for term in question_terms]
        token_question_scores = [fuzz.token_set_ratio(term, region_text.lower()) for term in question_terms]
        
        # Get best scores for question terms
        best_partial_question = max(partial_question_scores) if partial_question_scores else 0
        best_token_question = max(token_question_scores) if token_question_scores else 0
        
        # Combine question scores
        question_score = (best_partial_question * 0.4) + (best_token_question * 0.6)
        
        # Combine scores (giving more weight to answer matches)
        combined_score = (answer_score * ANSWER_WEIGHT) + (question_score * QUESTION_WEIGHT)

        # print(combined_score)
        
        if combined_score >= CUT_OFF_THRESHOLD:
            region_copy['match_score'] = combined_score
            region_copy['match_details'] = {
                'exact_match': False,
                'answer_score': answer_score,
                'question_score': question_score,
                'answer_weight': ANSWER_WEIGHT,
                'question_weight': QUESTION_WEIGHT
            }
            matched_regions.append(region_copy)


    matched_regions.sort(key=lambda x: x['match_score'], reverse=True)
    top_matches = matched_regions[:MAX_MATCHES]
    return top_matches
        
        
def longest_consecutive_range(indices):
    if not indices:
        return []

    indices = sorted(set(indices))
    longest = []
    current = [indices[0]]

    for i in range(1, len(indices)):
        if indices[i] == indices[i - 1] + 1:
            current.append(indices[i])
        else:
            if len(current) > len(longest):
                longest = current
            current = [indices[i]]

    if len(current) > len(longest):
        longest = current

    return longest


def get_word_level_matches(answer_text, top_k_matches):
    bboxes = []
    for match in top_k_matches:
        indices = []
        for index, word in enumerate(match['words']):
            if word['text'].lower() in answer_text.lower():
                # bboxes.append(word['bbox'])
                indices.append(index)
        longest_indices = longest_consecutive_range(indices)
        for index in longest_indices:
            bboxes.append(match['words'][index]['bbox'])
    return bboxes

In [308]:
for image_name, qna_pairs in tqdm(data.items()):
    IMG_PATH = os.path.join(IMG_DIR, image_name)
    

    json_file = os.path.join(OUT_JSON_DIR, f"{image_name}.json")
    with open(json_file, "r") as f:
        predictions = json.load(f)

    # print(predictions)

    qna_count = 0
    for qna_pair in qna_pairs:
        image = Image.open(IMG_PATH)

        question_text = qna_pair['question']
        answer_text = qna_pair['answer']
        
        top_k_matches = get_matched_regions(question_text, answer_text, predictions)

        # if answer_text == "Recruitment Notification No.1/2021.":
        #     print(top_k_matches)

        #     check = top_k_matches
        #     break

        if LEVEL == "word":

            word_level_matches = get_word_level_matches(answer_text, top_k_matches)
            for bbox in word_level_matches:
                draw = ImageDraw.Draw(image)
                draw.rectangle(bbox, outline='green', width=2)

        else :
            for match in top_k_matches:
                draw = ImageDraw.Draw(image)
                draw.rectangle(match['bbox'], outline='blue', width=2)

        # # write qna pair in the image
        draw.text((10, 10), "Question:" + question_text, fill='red')
        draw.text((10, 25), "Answer: "+answer_text, fill='red')

        image.save(os.path.join(OUT_IMG_DIR, f"{image_name}_{qna_count}.png"))
        qna_count += 1

        # break
        

100%|██████████| 502/502 [19:18<00:00,  2.31s/it]
