In [None]:
!pip install --upgrade transformers
!pip install --upgrade torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

In [3]:
from transformers import LiltForTokenClassification, LayoutLMv3Processor
from PIL import Image, ImageDraw, ImageFont
import torch
 
# load model and processor from huggingface hub
model = LiltForTokenClassification.from_pretrained("philschmid/lilt-en-funsd")
processor = LayoutLMv3Processor.from_pretrained("philschmid/lilt-en-funsd")
 
 
# helper function to unnormalize bboxes for drawing onto the image
def unnormalize_box(bbox, width, height):
    return [
        width * (bbox[0] / 1000),
        height * (bbox[1] / 1000),
        width * (bbox[2] / 1000),
        height * (bbox[3] / 1000),
    ]
 
 
label2color = {
    "B-HEADER": "blue",
    "B-QUESTION": "red",
    "B-ANSWER": "green",
    "I-HEADER": "blue",
    "I-QUESTION": "red",
    "I-ANSWER": "green",
}
# draw results onto the image
def draw_boxes(image, boxes, predictions):
    width, height = image.size
    normalizes_boxes = [unnormalize_box(box, width, height) for box in boxes]
 
    # draw predictions over the image
    draw = ImageDraw.Draw(image)
    font = ImageFont.load_default()
    for prediction, box in zip(predictions, normalizes_boxes):
        if prediction == "O":
            continue
        draw.rectangle(box, outline="black")
        draw.rectangle(box, outline=label2color[prediction])
        draw.text((box[0] + 10, box[1] - 10), text=prediction, fill=label2color[prediction], font=font)
    return image
 
 
# run inference
def run_inference(image, model=model, processor=processor, output_image=True):
    # create model input
    encoding = processor(image, return_tensors="pt")
    del encoding["pixel_values"]
    # run inference
    outputs = model(**encoding)
    predictions = outputs.logits.argmax(-1).squeeze().tolist()
    # get labels
    labels = [model.config.id2label[prediction] for prediction in predictions]
    if output_image:
        return draw_boxes(image, encoding["bbox"][0], labels)
    else:
        return labels
 
 
# run_inference(dataset["test"][34]["image"])
 

In [None]:
from PIL import Image, ImageDraw, ImageFont

# Load image from a file
image_path = "./pages/image.jpg"  # Update this to your image's file path
jpg = Image.open(image_path)
jpg
image = jpg.convert("RGB")

print("## Input")      
image

# Run inference
output_image = run_inference(image)

print("## Output")      
# Display the annotated image in Jupyter
output_image


In [12]:
def run_inference_with_details(image, model=model, processor=processor):
    # Ensure proper image preprocessing (resize, normalize, etc.)
    encoding = processor(image, return_tensors="pt", padding=True, truncation=True)
    
    # Check the sizes of input_ids and bbox
    if "input_ids" not in encoding or "bbox" not in encoding:
        raise ValueError("Missing required encoding fields: 'input_ids' or 'bbox'")
    
    # Run inference on the image
    try:
        outputs = model(**encoding)
    except Exception as e:
        print(f"Error during model inference: {e}")
        raise
    
    predictions = outputs.logits.argmax(-1).squeeze().tolist()
    
    # Ensure that we only work with valid tokens
    labels = [model.config.id2label[pred] for pred in predictions]
    bboxes = encoding.get("bbox", [])[0]  # Safely get the bounding boxes
    words = encoding.get("input_ids", [])[0]  # Safely get the input_ids
    word_texts = processor.tokenizer.decode(words, skip_special_tokens=True).split()

    # Collect detailed results
    results = []
    for label, bbox, word in zip(labels, bboxes, word_texts):
        if label != "O":  # Skip non-entity tokens
            results.append({
                "ocr_value": word,
                "position": {
                    "x_min": bbox[0],
                    "y_min": bbox[1],
                    "x_max": bbox[2],
                    "y_max": bbox[3],
                },
                "data_type": label
            })
    
    return results


In [16]:
import os
import json
from pathlib import Path
from PIL import Image

# Ensure the output directory exists
input_dir = "./pages"
output_dir = "./annotated"
os.makedirs(output_dir, exist_ok=True)

# List all PNG files in the input directory
files = [f for f in os.listdir(input_dir) if f.endswith((".png", ".jpeg", ".jpg"))]

# Process each file
for file in files:
    file_path = os.path.join(input_dir, file)
    output_image_path = os.path.join(output_dir, f"annotated_{file}")
    output_data_path = os.path.join(output_dir, f"{file.split('.')[0]}_data.json")
    
    # Load the image
    image = Image.open(file_path).convert("RGB")
    
    # Run inference
    labels = run_inference(image, output_image=False)
    annotated_image = run_inference(image, output_image=True)
    
    # Save the annotated image
    annotated_image.save(output_image_path)
    
    # Save the returned labels (data) as JSON
    with open(output_data_path, "w") as f:
        json.dump(labels, f, indent=4)
    
    print(f"Processed and saved: {file}")


Processed and saved: image.jpg


In [17]:
import os
import json
from pathlib import Path
from PIL import Image

# Ensure the output directory exists
input_dir = "./pages"
output_dir = "./annotated"
os.makedirs(output_dir, exist_ok=True)

# Helper function to match questions and answers based on position
def match_question_answer(data):
    questions = []
    answers = []
    
    # Separate questions and answers based on their type
    for entry in data:
        if entry["data_type"] == "B-QUESTION" or entry["data_type"] == "I-QUESTION":
            questions.append(entry)
        elif entry["data_type"] == "B-ANSWER" or entry["data_type"] == "I-ANSWER":
            answers.append(entry)
    
    # Attempt to match questions with answers based on vertical position (y_min)
    paired_data = []
    for question in questions:
        closest_answer = None
        for answer in answers:
            # Match question and answer by finding the closest answer vertically (y_min of answer should be after y_max of question)
            if answer["position"]["y_min"] > question["position"]["y_max"]:
                if closest_answer is None or answer["position"]["y_min"] < closest_answer["position"]["y_min"]:
                    closest_answer = answer
        if closest_answer:
            paired_data.append({
                "question": question["ocr_value"],
                "answer": closest_answer["ocr_value"],
                "question_position": question["position"],
                "answer_position": closest_answer["position"]
            })
            answers.remove(closest_answer)  # Remove the matched answer to avoid reuse
    
    return paired_data

# Updated run_inference function to return detailed data
def run_inference_with_details(image, model=model, processor=processor):
    # Prepare model input
    encoding = processor(image, return_tensors="pt")
    del encoding["pixel_values"]
    # Run inference
    outputs = model(**encoding)
    predictions = outputs.logits.argmax(-1).squeeze().tolist()
    # Get labels
    labels = [model.config.id2label[pred] for pred in predictions]
    bboxes = encoding["bbox"][0].tolist()
    words = encoding["input_ids"][0].tolist()
    word_texts = processor.tokenizer.decode(words).split()

    # Collect detailed results
    results = []
    for label, bbox, word in zip(labels, bboxes, word_texts):
        if label != "O":  # Skip non-entity tokens
            results.append({
                "ocr_value": word,
                "position": {
                    "x_min": bbox[0],
                    "y_min": bbox[1],
                    "x_max": bbox[2],
                    "y_max": bbox[3],
                },
                "data_type": label
            })
    return results

# List all JPEG and PNG files in the input directory
files = [f for f in os.listdir(input_dir) if f.lower().endswith((".png", ".jpeg", ".jpg"))]

# Process each file
for file in files:
    file_path = os.path.join(input_dir, file)
    output_image_path = os.path.join(output_dir, f"annotated_{file}")
    output_data_path = os.path.join(output_dir, f"{file.split('.')[0]}_data.json")
    
    # Load the image
    image = Image.open(file_path).convert("RGB")
    
    # Run inference
    detailed_data = run_inference_with_details(image)
    paired_data = match_question_answer(detailed_data)  # Match questions and answers
    annotated_image = run_inference(image, output_image=True)
    
    # Save the annotated image
    annotated_image.save(output_image_path)
    
    # Save the detailed question-answer matched data as JSON
    with open(output_data_path, "w") as f:
        json.dump(paired_data, f, indent=4)
    
    print(f"Processed and saved: {file}")


Processed and saved: image.jpg


In [29]:
import os
import json

# Example loop to process all images
image_folder = "pages"
output_folder = "annotated"
os.makedirs(output_folder, exist_ok=True)

for filename in os.listdir(image_folder):
    file_path = os.path.join(image_folder, filename)
    
    if file_path.endswith(('.png', '.jpg', '.jpeg')):  # Process only image files
        try:
            image = Image.open(file_path).convert("RGB")
            
            # Run inference and get detailed data
            detailed_data = run_inference_with_details(image)
            
            if detailed_data is not None:               

                detailed_data_fileName = os.path.join(output_folder, f"detailed_data_{filename}.json")
                with open(detailed_data_fileName, "w") as f:
                    json.dump(detailed_data, f, indent=4)
                    print(f"wrote {detailed_data_fileName}")
                    
                # Match question and answer
                paired_data = match_question_answer(detailed_data) 
                
                # Annotate the image and save the result
                annotated_image = run_inference(image, output_image=True)
                annotated_image.save(os.path.join(output_folder, f"annotated_{filename}"))
                
                # Optionally save the paired data
                with open(os.path.join(output_folder, f"paired_{filename}.json"), 'w') as f:
                    json.dump(paired_data, f, indent=4)
            else:
                print(f"Skipping file {filename} due to error.")
                
        except Exception as e:
            logging.error(f"Error processing file {file_path}: {e}")
            continue  # Skip the file and move to the next


wrote annotated/detailed_data_image.jpg.json
