# Eksperimen 7: LightOnOCR Experiment

This notebook implements the OCR pipeline using **LightOnOCR-2-1B** (Hugging Face) instead of PaddleOCR.
LightOnOCR is an end-to-end Vision-Language Model optimized for document transcription.

**Key Changes:**
- Replaced PaddleOCR with LightOnOCR-2-1B.
- Using Hugging Face Transformers for inference.
- Compares performance against Ground Truth.

In [None]:
# Install required packages if not already installed
# %pip install git+https://github.com/huggingface/transformers.git torch torchvision pillow accelerate ollama pandas matplotlib seaborn opencv-python

import os
import cv2
import matplotlib.pyplot as plt
import time
import glob
import numpy as np
import pandas as pd
import subprocess
import torch
from PIL import Image
from transformers import AutoModelForVision2Seq, AutoProcessor

### DATASET

In [None]:
DATASET_DIR = r'f:/projek dosen/tutoring/Agentic Multimodal Tutor - SLL/dataset/UTS/SOAL2'
IMAGES_DIR = DATASET_DIR
GT_DIR = DATASET_DIR

# ===================== LIMIT PROCESSING =====================
USE_LIMIT = True  # Set to True to limit files for testing
LIMIT_COUNT = 10   # Process fewer files for initial test (LightOn might be slower on CPU)

### CER METRIC

In [None]:
def levenshtein_distance(s1, s2):
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)
    if len(s2) == 0:
        return len(s1)
    previous_row = range(len(s2) + 1)
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
    return previous_row[-1]

def calculate_cer(reference, hypothesis):
    if not reference:
        return 0.0
    ref = " ".join(reference.split())
    hyp = " ".join(hypothesis.split())
    return levenshtein_distance(ref, hyp) / len(ref)

### GROUND TRUTH LOADER

In [None]:
def read_ground_truth(filename_base):
    path = os.path.join(GT_DIR, f"{filename_base}.txt")
    if os.path.exists(path):
        with open(path, "r", encoding="utf-8") as f:
            return f.read().strip()
    return ""

### LIGHTON OCR INITIALIZATION

In [None]:
MODEL_NAME = "lightonai/LightOnOCR-2-1B"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Initializing LightOnOCR on {DEVICE}...")
try:
    processor = AutoProcessor.from_pretrained(MODEL_NAME, trust_remote_code=True)
    model = AutoModelForVision2Seq.from_pretrained(
        MODEL_NAME,
        torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
        device_map="auto",
        trust_remote_code=True
    )
    print("LightOnOCR Model Loaded successfully.")
except Exception as e:
    print(f"Error loading LightOnOCR: {e}")
    raise e

### LLM REFINEMENT (Optional)
We can still use Qwen to refine, or just use LightOnOCR as the final output since it's an LLM itself.

In [None]:
def run_llm_refinement(prompt):
    # Run subprocess with robust encoding handling
    try:
        result = subprocess.run(
            ["ollama", "run", "qwen2.5:3b-instruct"],
            input=prompt,
            text=True,
            capture_output=True,
            encoding='utf-8',
            errors='replace'
        )
        if result.returncode != 0:
            return None
        return result.stdout.strip()
    except Exception as e:
        return None

### DATA LOADING

In [None]:
image_files = (
    glob.glob(os.path.join(IMAGES_DIR, "*.jpg")) +
    glob.glob(os.path.join(IMAGES_DIR, "*.png")) +
    glob.glob(os.path.join(IMAGES_DIR, "*.jpeg"))
)

results = []

if USE_LIMIT and LIMIT_COUNT > 0:
    print(f"Limiting processing to first {LIMIT_COUNT} images.")
    image_files = image_files[:LIMIT_COUNT]

print(f"Found {len(image_files)} images.")

### MAIN PROCESSING LOOP

In [None]:
for idx, image_path in enumerate(image_files):
    filename = os.path.basename(image_path)
    filename_base = os.path.splitext(filename)[0]
    gt_text = read_ground_truth(filename_base)

    print(f"\nProcessing [{idx+1}/{len(image_files)}]: {filename}...")
    start_time = time.time()

    # ---------- LIGHTON OCR ----------
    try:
        image = Image.open(image_path).convert("RGB")
        
        # Prepare conversation/prompt for LightOn
        # The model is trained to transcribe images directly.
        conversation = [
            {
                "role": "user",
                "content": [
                    {"type": "image"},
                    {"type": "text", "text": "Transcribe this document into Markdown."}
                ]
            }
        ]
        
        text_prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
        inputs = processor(
            text=[text_prompt],
            images=[image],
            padding=True,
            return_tensors="pt"
        ).to(model.device)

        # Generate
        generated_ids = model.generate(
            **inputs, 
            max_new_tokens=1024,
            do_sample=False  # Greedy typically best for OCR
        )
        
        generated_text = processor.batch_decode(
            generated_ids, 
            skip_special_tokens=True, 
            clean_up_tokenization_spaces=False
        )[0]
        
        # Clean up output (remove prompt if echo'd, though skip_special_tokens should handle most)
        # LightOn output usually follows the prompt. 
        # For now, we take it as is or split if necessary.
        raw_text = generated_text
        
        # If the model echoes the prompt, we might need to split. 
        # Usually standard chat templates separate roles.
        
    except Exception as e:
        print(f"  [OCR ERROR] {e}")
        raw_text = ""

    # ---------- LLM REFINEMENT (Optional) ----------
    # LightOn is already an LLM, so 'final_text' can just be 'raw_text'.
    # But to match previous experiment structure:
    final_text = raw_text
    
    # Uncomment below to enable secondary refinement if desired
    # if raw_text:
    #     prompt = f"Fix formatting errors in this OCR text:\n\n{raw_text}"
    #     llm_out = run_llm_refinement(prompt)
    #     if llm_out: final_text = llm_out

    # ---------- METRICS ----------
    elapsed = time.time() - start_time
    cer_raw = calculate_cer(gt_text, raw_text)
    cer_refined = calculate_cer(gt_text, final_text)

    print(
        f"  Length: {len(raw_text)} | "
        f"CER: {cer_raw:.2%} | "
        f"Time: {elapsed:.2f}s"
    )

    results.append({
        "filename": filename,
        "time": elapsed,
        "cer_raw": cer_raw,
        "cer_refined": cer_refined,
        "raw_text": raw_text,
        "final_text": final_text,
        "ground_truth": gt_text
    })

    # Save partial results
    if len(results) > 0:
        pd.DataFrame(results).to_csv('results/exp7_lighton_results.csv', index=False)

print("\nDONE. Total processed:", len(results))

In [None]:
# ===================== VISUALIZE METRICS =====================
if results:
    df = pd.DataFrame(results)
    print(f"Average Time: {df['time'].mean():.4f}s")
    print(f"Average CER (LightOn): {df['cer_raw'].mean():.2%}")
    
    # Visualization
    try:
        plt.figure(figsize=(12, 6))
        sns.barplot(data=df, x='filename', y='cer_raw', palette='viridis')
        plt.title('CER per Image (LightOnOCR-2-1B)')
        plt.xlabel('Filename')
        plt.ylabel('Character Error Rate')
        if len(df) > 20: plt.xticks([])
        else: plt.xticks(rotation=45, ha='right')
        plt.tight_layout()
        plt.savefig('results/cer_lighton.png')
        plt.show()
    except Exception as e:
        print(f"Error plotting: {e}")