# DeepSeek-OCR-2 Invoice Processing

This notebook processes invoice images using DeepSeek-OCR-2 (3B) with GPU acceleration on Kaggle.
Uses the custom `model.infer()` API with the `deepseek-ai/DeepSeek-OCR-2` model.

In [None]:
# Cell 1: Install dependencies
# DeepSeek-OCR-2 requires pinned transformers==4.46.3 and flash-attn
# PyMuPDF provides native PDF support (model handles PDFs directly)
!pip install transformers==4.46.3 tokenizers==0.20.3 --quiet
!pip install einops addict easydict Pillow numpy PyMuPDF img2pdf --quiet

In [None]:
# Cell 2: Verify GPU
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"VRAM: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

In [None]:
# Cell 3: Setup and initialize DeepSeek-OCR-2
import os
import json
import time
from pathlib import Path
from transformers import AutoModel, AutoTokenizer

# Configuration
INPUT_DIR = Path("/kaggle/input/synthetic-invoices-test")
OUTPUT_DIR = Path("/kaggle/working/deepseek-ocr-2")
OUTPUT_DIR.mkdir(exist_ok=True)
(OUTPUT_DIR / "raw").mkdir(exist_ok=True)

MODEL_NAME = "deepseek-ai/DeepSeek-OCR-2"

# Initialize DeepSeek-OCR-2
# - trust_remote_code=True: required for custom model architecture
# - flash_attention_2: required for inference
# - BF16 precision: optimal for this model
print("Loading DeepSeek-OCR-2 model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
model = AutoModel.from_pretrained(
    MODEL_NAME,
    _attn_implementation='eager',
    trust_remote_code=True,
    use_safetensors=True
)
model = model.eval().cuda().to(torch.bfloat16)
print("DeepSeek-OCR-2 loaded!")

In [None]:
# Cell 4: Define processing function
import io
import contextlib

def process_image(image_path, model, tokenizer):
    """Process a single image or PDF with DeepSeek-OCR-2, capturing stdout."""
    start_time = time.time()

    prompt = "<<image>>\n<<|grounding|>>Convert the document to markdown."

    try:
        # Capture stdout since model.infer() prints output instead of returning it
        stdout_capture = io.StringIO()
        with contextlib.redirect_stdout(stdout_capture):
            result = model.infer(
                tokenizer,
                prompt=prompt,
                image_file=str(image_path),
                output_path="/tmp/deepseek_ocr_tmp",
                base_size=1024,
                image_size=768,
                crop_mode=True,
                save_results=False
            )

        # Get captured output - this contains the actual OCR text
        captured_output = stdout_capture.getvalue()

        # Clean up captured output - remove debug lines (BASE:, PATCHES:, === lines)
        lines = captured_output.strip().split('\n')
        cleaned_lines = [l for l in lines if not l.startswith(('=', 'BASE:', 'PATCHES:'))]
        output_text = '\n'.join(cleaned_lines).strip()

        # Use return value if valid string, otherwise use cleaned captured stdout
        if result and isinstance(result, str) and result.strip() and result.strip() != "None":
            output_text = result.strip()

        processing_time = time.time() - start_time

        return {
            "success": True,
            "raw_text": output_text,
            "raw_data": [{"text": output_text}] if output_text else [],
            "processing_time_seconds": round(processing_time, 3)
        }
    except Exception as e:
        import traceback
        return {
            "success": False,
            "error": f"{str(e)}\n{traceback.format_exc()}",
            "raw_text": "",
            "raw_data": [],
            "processing_time_seconds": time.time() - start_time
        }

In [None]:
# Cell 5: Test on a single image before batch processing
image_files = sorted(
    list(INPUT_DIR.glob("*.png")) +
    list(INPUT_DIR.glob("*.jpeg")) +
    list(INPUT_DIR.glob("*.pdf"))
)
print(f"Found {len(image_files)} files total")

if image_files:
    test_img = image_files[0]
    print(f"\nTesting on: {test_img.name}")

    test_result = process_image(test_img, model, tokenizer)
    print(f"Success: {test_result['success']}")
    print(f"Processing time: {test_result['processing_time_seconds']:.2f}s")

    if test_result["raw_text"]:
        preview = test_result["raw_text"][:500]
        print(f"Text preview ({len(test_result['raw_text'])} chars):\n{preview}")
        print("\nOCR pipeline is working correctly!")
    else:
        print("\nWARNING: No text extracted!")
        if test_result.get("error"):
            print(f"Error: {test_result['error']}")
else:
    print("ERROR: No images found in input directory!")

In [None]:
# Cell 6: Process all images
print(f"Processing {len(image_files)} files...")

empty_count = 0
results = []
for i, image_path in enumerate(image_files):
    print(f"Processing {i+1}/{len(image_files)}: {image_path.name}", end="")

    result = process_image(image_path, model, tokenizer)

    if not result.get("raw_text"):
        empty_count += 1
        print(" - WARNING: no text extracted!", end="")

    print()

    # Create output structure
    output = {
        "filename": image_path.name,
        "model_name": "deepseek-ocr-2",
        "raw_text": result.get("raw_text", ""),
        "processing_time_seconds": result.get("processing_time_seconds", 0),
        "file_size_bytes": image_path.stat().st_size,
        "success": result.get("success", False)
    }

    if not result["success"]:
        output["error"] = result.get("error", "Unknown error")

    results.append(output)

    # Save individual result
    result_file = OUTPUT_DIR / f"{image_path.stem}_{image_path.suffix.lstrip('.')}.json"
    with open(result_file, "w") as f:
        json.dump(output, f, indent=2)

    # Save raw response
    raw_file = OUTPUT_DIR / "raw" / f"{image_path.stem}_{image_path.suffix.lstrip('.')}_raw.json"
    with open(raw_file, "w") as f:
        json.dump({
            "content": result.get("raw_text", ""),
            "detections": result.get("raw_data", [])
        }, f, indent=2)

print(f"\nCompleted! Processed {len(results)} files")

In [None]:
# Cell 7: Save combined results and summary
all_results_file = OUTPUT_DIR / "all_results.json"
with open(all_results_file, "w") as f:
    json.dump(results, f, indent=2)

# Print summary
successful = sum(1 for r in results if r.get("success", False))
with_text = sum(1 for r in results if r.get("raw_text", ""))
total_time = sum(r.get("processing_time_seconds", 0) for r in results)
print(f"\nSummary:")
print(f"  Total files: {len(results)}")
print(f"  Successful: {successful}")
print(f"  With text extracted: {with_text}")
print(f"  Empty results: {empty_count}")
print(f"  Failed: {len(results) - successful}")
print(f"  Total processing time: {total_time:.1f}s")
print(f"  Average time per file: {total_time/len(results):.2f}s")
print(f"\nResults saved to: {OUTPUT_DIR}")

In [None]:
# Cell 8: Create downloadable ZIP
import shutil

shutil.make_archive(
    "/kaggle/working/deepseek_ocr2",
    'zip',
    OUTPUT_DIR
)
print("ZIP created: /kaggle/working/deepseek_ocr2.zip")
print("Download from the Output tab after notebook completes.")