In [1]:
# Update edit_df to include real-time suggestions for 'answer'
import pandas as pd
# import streamlit as st
from difflib import SequenceMatcher, get_close_matches
from paddleocr import PaddleOCR
from PIL import Image, ImageDraw
import numpy as np
from rapidfuzz import fuzz
# from sentence_transformers import SentenceTransformer
# import torch.nn.functional as F
import torch
from transformers import AutoModel, AutoTokenizer
import json
import cv2
from PIL import Image

# Functions
def get_matches_with_confidence(word, possibilities, n=3, cutoff=0.6):
    """
    Returns matches from `get_close_matches` with their confidence scores.
    
    Args:
        word (str): The word to match.
        possibilities (list of str): A list of potential matches.
        n (int): Maximum number of close matches to return.
        cutoff (float): Minimum similarity ratio for a match to be included.

    Returns:
        list of tuples: A list of tuples where each tuple is (match, confidence_score).
    """
    # Generate matches using get_close_matches
    matches = get_close_matches(word, possibilities, n=n, cutoff=cutoff)
    
    # Calculate the confidence score for each match
    results = []
    for match in matches:
        score = SequenceMatcher(None, word, match).ratio()
        results.append((match, score))
    
    return results

def calculate_containment_ratio(inner_box, outer_box):
    """
    Calculate the containment ratio of `inner_box` within `outer_box`.

    Args:
        inner_box (list): [x1, y1, x2, y2] for the inner box.
        outer_box (list): [x1, y1, x2, y2] for the outer box.

    Returns:
        float: Containment ratio between 0 and 1.
    """
    # Extract coordinates
    x1_inner, y1_inner, x2_inner, y2_inner = inner_box
    x1_outer, y1_outer, x2_outer, y2_outer = outer_box

    # Calculate intersection coordinates
    inter_x1 = max(x1_inner, x1_outer)
    inter_y1 = max(y1_inner, y1_outer)
    inter_x2 = min(x2_inner, x2_outer)
    inter_y2 = min(y2_inner, y2_outer)

    # Compute intersection area
    inter_width = max(0, inter_x2 - inter_x1)
    inter_height = max(0, inter_y2 - inter_y1)
    intersection_area = inter_width * inter_height

    # Compute area of the inner box
    inner_area = (x2_inner - x1_inner) * (y2_inner - y1_inner)

    # Avoid division by zero
    if inner_area == 0:
        return 0.0

    # Containment ratio
    return intersection_area / inner_area

def find_best_contained_bbox(target_bbox, normalized_bboxes, threshold=0.5):
    """
    Find the best contained bbox in normalized_bboxes based on containment ratio.

    Args:
        target_bbox (list): [x1, y1, x2, y2] for the target box.
        normalized_bboxes (list): List of bounding boxes to compare against.
        threshold (float): Minimum containment ratio to consider as a match.

    Returns:
        tuple: (best_match_index, best_containment_ratio) or (None, 0.0) if no match exceeds the threshold.
    """
    best_match_idx = None
    best_ratio = 0.0
    scores = []
    for idx, bbox in enumerate(normalized_bboxes):
        ratio = calculate_containment_ratio(bbox, target_bbox)
        scores.append(ratio)
        if ratio > best_ratio and ratio >= threshold:
            best_ratio = ratio
            best_match_idx = idx

    return best_match_idx, best_ratio, scores



def levenshtein_similarity(word, phrases):
    """
    Calculate normalized Levenshtein similarity between `word` and each phrase in `phrases`.

    Args:
        word (str): The target word or phrase.
        phrases (list of str): List of candidate phrases.

    Returns:
        list: List of tuples (phrase, similarity_score), sorted by descending similarity.
    """
    results = []

    for phrase in phrases:
        # Calculate Levenshtein similarity
        score = fuzz.ratio(word.lower(), phrase.lower())
        results.append((phrase, score / 100))  # Normalize to 0-1 range

    # Sort by descending similarity
    return sorted(results, key=lambda x: -x[1])

def detect_rotation_angle(image):
    """Detect rotation angle using text alignment."""
    # Ensure the input is a NumPy array
    if not isinstance(image, np.ndarray):
        image = np.array(image)
    
    # Handle grayscale images
    if len(image.shape) == 2:
        grayscale = image
    else:
        grayscale = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    edges = cv2.Canny(grayscale, 50, 150, apertureSize=3)
    lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=100, minLineLength=100, maxLineGap=10)
    
    angles = []
    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
            angles.append(angle)

    if angles:
        median_angle = np.median(angles)
        return median_angle
    return 0  # Assume no rotation if no angles detected

def find_best_and_all_matching_bboxes(target_bbox, normalized_bboxes, threshold=0.5):
    """
    Find the best and all matching bboxes in normalized_bboxes based on containment ratio.

    Args:
        target_bbox (list): [x1, y1, x2, y2] for the target box.
        normalized_bboxes (list): List of bounding boxes to compare against.
        threshold (float): Minimum containment ratio to consider as a match.

    Returns:
        tuple:
            - best_match (dict): {'index': best_match_index, 'ratio': best_containment_ratio}.
            - all_matches (list of dict): [{'index': idx, 'ratio': ratio} for each match above threshold].
    """
    best_match = {'index': None, 'ratio': 0.0}
    all_matches = []

    for idx, bbox in enumerate(normalized_bboxes):
        ratio = calculate_containment_ratio(bbox, target_bbox)
        if ratio >= threshold:
            all_matches.append({'index': idx, 'ratio': ratio})
            if ratio > best_match['ratio']:
                best_match = {'index': idx, 'ratio': ratio}

    return best_match, all_matches

def calculate_containment_ratio(bbox, target_bbox):
    """Example logic for containment ratio calculation."""
    x1, y1, x2, y2 = bbox
    tx1, ty1, tx2, ty2 = target_bbox

    intersection_x1 = max(x1, tx1)
    intersection_y1 = max(y1, ty1)
    intersection_x2 = min(x2, tx2)
    intersection_y2 = min(y2, ty2)

    # Compute area of intersection
    intersection_area = max(0, intersection_x2 - intersection_x1) * max(0, intersection_y2 - intersection_y1)

    # Compute area of the bbox
    bbox_area = (x2 - x1) * (y2 - y1)

    # Return containment ratio (intersection over bbox area)
    return intersection_area / bbox_area if bbox_area > 0 else 0.0

def draw_bboxes(image, bboxes, color="red", width=3):
    """
    Draws bounding boxes on an image.

    Args:
        image (PIL.Image.Image): The input image.
        bboxes (list of lists): List of bounding boxes [[x1, y1, x2, y2], ...].
        color (str): Color of the bounding boxes. Default is "red".
        width (int): Line width for the bounding boxes. Default is 3.

    Returns:
        PIL.Image.Image: Image with bounding boxes drawn.
    """
    # Make a copy of the image to draw on
    draw_image = image.copy()
    draw = ImageDraw.Draw(draw_image)
    
    for bbox in bboxes:
        x1, y1, x2, y2 = bbox
        # Draw the rectangle
        draw.rectangle([x1, y1, x2, y2], outline=color, width=width)
    
    return draw_image


In [4]:
from ppocr.utils.logging import get_logger
import logging
logger = get_logger()
logger.setLevel(logging.ERROR)

In [1]:
import paddle
gpu_available  = paddle.device.is_compiled_with_cuda()
print("GPU available:", gpu_available)

GPU available: False


# Doc Start

In [1]:
from fast_processor import PDFHandler, ClassifyExtract
from tqdm import tqdm
import pandas as pd

p = PDFHandler(r'test_pages\drivers_license_test4.pdf')

df_pages = p.df_pages

clf_results = []
clf_confidence = []
extraction_results = []
for _, row in tqdm(df_pages.iterrows()):
    # fp = rf"{row['preprocessed']}"
    # print(fp)
    c = ClassifyExtract(row)
    page_label = c.page_label
    page_score = c.page_score
    clf_results.append(page_label)
    clf_confidence.append(page_score)
    if page_label != 'Unknown':
        res = c.process_image()
        res.index.name = 'key'
        res = res.reset_index()
        res['page_label'] = page_label
        res['page_num'] = row['page_number']
        extraction_results.append(res)

df_pages['page_label'] = clf_results
df_pages['page_score'] = clf_confidence

Some weights of the model checkpoint at Snowflake/snowflake-arctic-embed-l-v2.0 were not used when initializing XLMRobertaModel: ['pooler.dense.bias', 'pooler.dense.weight']
- This IS expected if you are initializing XLMRobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing XLMRobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
converting pages...: 1it [00:02,  2.40s/it]
1it [00:00, 95.12it/s]


In [2]:
df_extracted = pd.concat(extraction_results)

ValueError: No objects to concatenate

In [None]:
df_extracted

In [41]:
c.show_bboxes()

In [None]:
p.df_pages['preprocessed'][0]

In [None]:
from fast_processor import ClassifyExtract
# image_path = r'debug_images\1120S_p1_2024_12_18_224445_957\page_1\2_denoised.png'
# image_path = r'debug_images\1040_p1_2024_12_27_085517_449\page_1\2_denoised.png'
# img = Image.open(image_path)
t = ClassifyExtract(image_path=p.df_pages['preprocessed'][0])
results, page_label, page_score = t.process_image()

In [None]:
page_label

In [2]:
t.show_bboxes()

In [None]:
t.keys

In [None]:
results

In [9]:
t.show_bboxes()

In [4]:
t.show_image()

In [None]:
t.bbox_draw_list

In [500]:
from fast_classifier import load_template_database, classify_document_with_confidence
from datetime import datetime

template_db = load_template_database()

In [None]:
label, score, _, alls = classify_document_with_confidence(image_path=image_path,template_db=template_db)

In [None]:
ocr = PaddleOCR(lang="en", cls=False)


In [None]:
image = Image.open(image_path).convert('RGB')

image_width, image_height = image.size

ocr_results = ocr.ocr(image_path)[0]

words = [line[1][0] for line in ocr_results]
bboxes = []
for box in [line[0] for line in ocr_results]:
    x1 = min(point[0] for point in box)
    y1 = min(point[1] for point in box)
    x2 = max(point[0] for point in box)
    y2 = max(point[1] for point in box)
    bboxes.append([x1, y1, x2, y2])

normalized_bboxes = [
    [
        (x1 / image_width),
        (y1 / image_height),
        (x2 / image_width),
        (y2 / image_height),
    ]
    for x1, y1, x2, y2 in bboxes
]

# Start Extraction

In [None]:
# Check CUDA availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load the model and tokenizer
model_name = 'Snowflake/snowflake-arctic-embed-l-v2.0'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name, add_pooling_layer=False)
model.eval().to(device)  # Move model to GPU

with open('labels.json', 'r') as f:
    k = json.load(f)

# Define queries and documents
query_prefix = 'query: '
keys = [t['key'] for t in k[label]]
queries = [t['question'] for t in k[label]]
coords = [t['target_coords'] for t in k[label]]


In [522]:
# Get special instructions
instructs = []
search_areas = []
ignore_words = []
for t in k[label]:
    try:
        instructs.append(t['prep'])
    except:
        instructs.append(None)

for t in k[label]:
    try:
        search_areas.append(t['search_coords'])
    except:
        search_areas.append(None)

for t in k[label]:
    try:
        ignore_words.append(t['ignore_words'])
    except:
        ignore_words.append(None)

In [523]:

# Prepare input tokens
queries_with_prefix = [f"{query_prefix}{i}" for i in queries]
query_tokens = tokenizer(queries_with_prefix, padding=True, truncation=True, return_tensors='pt', max_length=8192).to(device)
document_tokens = tokenizer(words, padding=True, truncation=True, return_tensors='pt', max_length=8192).to(device)

# Compute token embeddings
with torch.no_grad():
    query_embeddings = model(**query_tokens)[0][:, 0]
    document_embeddings = model(**document_tokens)[0][:, 0]

# Normalize embeddings
query_embeddings = torch.nn.functional.normalize(query_embeddings, p=2, dim=1)
document_embeddings = torch.nn.functional.normalize(document_embeddings, p=2, dim=1)

# Compute cosine similarity
scores = torch.mm(query_embeddings, document_embeddings.transpose(0, 1))

normalized_scores = (scores + 1) / 2

search_masks = []
word_masks = []
# Carve out the relevant scores that have bboxes that fall into the search coords
for s in search_areas:
    if s:
        # Get search area
        a1, a2, a3, a4 = eval(s)
        # Iterate through bboxes
        masks = []
        for b in normalized_bboxes:
            b1, b2, b3, b4 = b
            val = 1 if np.all([a1 <= b1, a2 <= b2, a3 >= b3, a4 >= b4]) else 0
            masks.append(val)
        search_masks.append(masks)
    else:
        search_masks.append([1 for i in range(len(bboxes))])

for s in ignore_words:
    if s:
        # Get list of words for removal
        igs = eval(s)
        # Iterate through bboxes
        # For each word to ignore,
        masks = []
        for s in igs:
            submask = []
            # Iterate through each word in the document
            for b in words:
                # Assign 0 if a word contains a word to ignore, 1 otherwise
                val = 0 if s.lower() in b.lower() else 1
                # Append the value to an array
                submask.append(val)
            masks.append(submask)
            # Multiply all arrays in logics so that we end up with the mask for which scores to keep
        masks = np.prod(masks, axis=0)
        word_masks.append(masks)
    else:
        word_masks.append([1 for i in range(len(bboxes))])


In [524]:
all_masks = [np.array(t) * np.array(z) for t, z in zip(search_masks, word_masks)]

In [525]:
filtered_scores = [[float(n) for n in s] * np.array(z) for s, z in zip(normalized_scores, all_masks)]

In [526]:
best_matches = [(words[np.argmax(s).item()], s[np.argmax(s).item()]) for s in filtered_scores]

In [529]:
bbox_list = []
target_bbox_list = []
label_data  = {}
all_answers = []
counter = 0

for t in best_matches:
    labels = {}
    # Match value for the label
    match_word = t[0]
    # Match confidence for the label
    match_confidence = t[1]
    # Find the index of the matching word in words
    match_idx = [i for i, w in enumerate(words) if w == match_word][0]
    # Find the corresponding bbox of the label
    match_bbox = normalized_bboxes[match_idx]
    # Store label info in dictionary
    label_data[keys[counter]] = {}
    label_data[keys[counter]]['label'] = match_word
    label_data[keys[counter]]['label_bbox'] = match_bbox
    label_data[keys[counter]]['label_confidence'] = match_confidence
    counter+=1

In [530]:
# Now that you have the label data stored, it can be used in reference 
counter = 0
for t, bb in zip(label_data.keys(), coords):
    bbox_list.append(label_data[t]['label_bbox'] * np.array([image_width, image_height, image_width, image_height]))
    match_bbox = label_data[t]['label_bbox']
    x1, y1, x2, y2 = match_bbox
    target_bbox = eval(bb)
    search_area = np.array(target_bbox) * np.array([image_width, image_height, image_width, image_height])
    bbox_list.append(search_area)
    best_match, all_matches = find_best_and_all_matching_bboxes(target_bbox, normalized_bboxes, threshold=0.5)
    match_words = []
    match_scores = []
    match_area = []
    for i in all_matches:
        match_words.append(words[i['index']])
        match_scores.append(i['ratio'])
        match_area.append(bboxes[i['index']])
        bbox_list.append(bboxes[i['index']])

    label_data[t]['value'] = match_words #' '.join(match_words)
    if instructs[counter]:
        if '[' in instructs[counter]:
            func = eval(instructs[counter])
            label_data[t]['value'] = func(match_words)
        else:
            label_data[t]['value'] = [eval(instructs[counter])(a) for a in label_data[t]['value']][0]
    else:
        label_data[t]['value'] = ' '.join(label_data[t]['value'])
    label_data[t]['value_bbox'] = [
        min(x[0] for x in match_area),  # Min of element 1
        min(x[1] for x in match_area),  # Min of element 2
        max(x[2] for x in match_area),  # Max of element 3
        max(x[3] for x in match_area)   # Max of element 4
    ] if match_area else [0,0,0,0]
    bbox_list.append(label_data[t]['value_bbox'])
    label_data[t]['value_confidence'] = np.mean(match_scores)
    # all_answers.append(answers)
    counter+=1

In [None]:
df_results = pd.DataFrame.from_dict(label_data,orient='index')

# End Extraction

In [532]:


# tx1, ty1, tx2, ty2 = target_bbox

# bbox_list = [
#     bboxes[match_idx],
#     bboxes[best_idx] if best_idx else [0,0,0,0],
#     [tx1 * image_width, ty1 * image_height, tx2 * image_width, ty2 * image_height]
# ]
# Draw bounding boxes
annotated_image = draw_bboxes(image, bbox_list, color="red", width=4)

# Show and save the annotated image
annotated_image.show()  # Open in default viewer
annotated_image.save("annotated_image.jpg")


In [None]:
XXX

In [None]:
[num for num, w in enumerate(words) if 'assels' in w]

In [None]:
scores

In [None]:
XXX

In [None]:
from rapidfuzz import fuzz, process

def robust_fuzzy_matching(word, phrases):
    """
    Robust fuzzy matching using RapidFuzz.
    
    Args:
        word (str): The word/phrase to match.
        phrases (list): List of candidate phrases.
        
    Returns:
        list: Sorted list of (phrase, confidence) tuples.
    """
    return process.extract(word, phrases, scorer=fuzz.ratio, limit=3)

# Example usage
word = "gross profit"
phrases = [
    "Gross receipts or sales",
    "Gross profit.Subtract line 2 from line 1c.",
    "Print"
]
results = robust_fuzzy_matching(word, phrases)
print(results)


In [None]:
from custom_pipeline import layoutlm_paddleocr_pipeline

layoutlm_paddleocr_pipeline(image_path=image_path, question='What is gross profit?')

In [10]:
from sentence_transformers import SentenceTransformer

# Load the model
model_name = 'Snowflake/snowflake-arctic-embed-l-v2.0'
model = SentenceTransformer(model_name)

# Define the queries and documents
queries = [
    'Locate the line with the word "Balance"',
    'Locate the cost of goods sold line.',
    'Locate the gross profit line.',
    'Locate the "Interest" line for interest paid.',
    'Locate the "Depreciation" line',
    'Locate the line for total deductions.'
    'Locate the line for net profit.'
    ]

# Compute embeddings: use prompt_name="query" to encode queries!
query_embeddings = model.encode(queries, prompt_name="query") 
document_embeddings = model.encode(words)

# Compute cosine similarity scores
scores = model.similarity(query_embeddings, document_embeddings)



In [None]:
float(best_matches[0][1])

In [None]:
sm = F.softmax(scores, dim=-1)
[(words[torch.argmax(s).item()], s[torch.argmax(s).item()]) for s in sm]

In [None]:
print([words[np.argmax(s)] for s in scores])

In [None]:
print([(words[np.argmax(s)], s[np.argmax(s)]) for s in scores])

In [None]:

# Output the results
for query, query_scores in zip(queries, scores):
    doc_score_pairs = list(zip(words, query_scores))
    doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
    print("Query:", query)
    for document, score in doc_score_pairs:
        print(score, document)


In [None]:
import torch
torch.cuda.is_available()