# Test Split-Classify on Epson Workforce Scanner PDFs

This notebook tests the **split-classify** pipeline on real scanned PDFs from the
Epson Workforce scanner. These PDFs often contain multiple document types per file
(receipts, bills, letters, etc.) from a single scanning session.

Pipeline: **Ingest** → **Parse** → **Split & Classify** → **Inspect results**

In [1]:
import sys, os

PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)
os.chdir(PROJECT_ROOT)

from dotenv import load_dotenv
load_dotenv(os.path.join(PROJECT_ROOT, ".env"))
print(f"CWD: {os.getcwd()}")

CWD: /Users/vykhand/DEV/mydocs2


In [2]:
from lightodm import MongoConnection

conn = MongoConnection()
await conn.get_async_client()
print("Connected to MongoDB")

Connected to MongoDB


## Configuration

Set the scanner directory and sample size. PDFs are randomly selected with a fixed seed for reproducibility.

In [3]:
import random
from pathlib import Path

# Directory containing scanned PDFs from Epson Workforce
WORKFORCE_DIR = Path.home() / "OneDrive/Documents/SCANS/epson/workforce"
SAMPLE_SIZE = 20
TAG = "split-classify-test"
SEED = 42

assert WORKFORCE_DIR.is_dir(), f"Directory not found: {WORKFORCE_DIR}"

# Discover all PDFs and sample
all_pdfs = sorted(WORKFORCE_DIR.glob("*.pdf"))
print(f"Found {len(all_pdfs)} PDFs in {WORKFORCE_DIR}")

rng = random.Random(SEED)
sample_pdfs = rng.sample(all_pdfs, min(SAMPLE_SIZE, len(all_pdfs)))
print(f"Selected {len(sample_pdfs)} PDFs for testing:")
for p in sample_pdfs:
    print(f"  {p.name}")

Found 230 PDFs in /Users/vykhand/OneDrive/Documents/SCANS/epson/workforce
Selected 20 PDFs for testing:
  Receipt_20241016_0001.pdf
  Document_20240913_0004.pdf
  Document_20001110_0001.pdf
  Receipt_20250307_0001.pdf
  Document_20250415_0002.pdf
  Document_20250304_0002.pdf
  Document_20250303_0001.pdf
  Document_20241206_0001.pdf
  Receipt_20250305_0002.pdf
  Document_20240913_0002.pdf
  Receipt_20250302_0001.pdf
  Receipt_20260207_0001.pdf
  JYSK_20241109_0001.pdf
  Document_20231006_0001.pdf
  Receipt_20240112_0002.pdf
  Document_20260105_0013.pdf
  Document_20070222_0001.pdf
  Document_20001206_0001.pdf
  Document_20240102_0001.pdf
  Document_20250302_0005.pdf


## Ingest + Parse

Ingest the sample PDFs as external documents (skipping already-ingested ones),
then parse any unparsed documents.

In [4]:
from mydocs.models import Document, DocumentPage, StorageModeEnum
from mydocs.parsing.pipeline import ingest_files, parse_document

# Ingest sample PDFs (skips already-ingested files)
documents, skipped = await ingest_files(
    source=[str(p) for p in sample_pdfs],
    storage_mode=StorageModeEnum.EXTERNAL,
    tags=[TAG],
)

print(f"Ingested: {len(documents)}, Skipped (already exist): {len(skipped)}")

# Collect all document IDs (both new and existing)
doc_ids = [str(d.id) for d in documents]

# Look up already-existing documents that were skipped
for pdf_path in sample_pdfs:
    abs_path = str(pdf_path.resolve())
    existing = await Document.afind({"original_path": abs_path})
    for ex in existing:
        if str(ex.id) not in doc_ids:
            doc_ids.append(str(ex.id))

print(f"Total documents to process: {len(doc_ids)}")

[2026-02-18 23:14:36] [[32mINFO[0m] [2m[pipeline.ingest_files:90][0m[2m[0m Discovered 20 files from 20 source(s)
[2026-02-18 23:14:36] [[32mINFO[0m] [2m[pipeline.ingest_files:119][0m[2m [file_name=Receipt_20241016_0001.pdf][0m Ingesting file: Receipt_20241016_0001.pdf
[2026-02-18 23:14:48] [[32mINFO[0m] [2m[pipeline.ingest_files:172][0m[2m [file_name=Receipt_20241016_0001.pdf][0m Ingested document dc658277108566bf53c497ae0021e57c for file Receipt_20241016_0001.pdf -> Receipt_20241016_0001.pdf
[2026-02-18 23:14:48] [[32mINFO[0m] [2m[pipeline.ingest_files:119][0m[2m [file_name=Document_20240913_0004.pdf][0m Ingesting file: Document_20240913_0004.pdf
[2026-02-18 23:14:50] [[32mINFO[0m] [2m[pipeline.ingest_files:172][0m[2m [file_name=Document_20240913_0004.pdf][0m Ingested document ae0e3d54594bfe9f2bc4d563e5b227ed for file Document_20240913_0004.pdf -> Document_20240913_0004.pdf
[2026-02-18 23:14:50] [[32mINFO[0m] [2m[pipeline.ingest_files:119][0m[2m [fil

In [5]:
# Parse any unparsed documents
parsed_count = 0
for doc_id in doc_ids:
    doc = await Document.aget(doc_id)
    if doc and doc.status != "parsed":
        try:
            await parse_document(doc_id)
            parsed_count += 1
            print(f"  Parsed: {doc.original_file_name}")
        except Exception as e:
            print(f"  FAILED: {doc.original_file_name}: {e}")

print(f"\nNewly parsed: {parsed_count}, Already parsed: {len(doc_ids) - parsed_count}")

[2026-02-18 23:15:25] [[32mINFO[0m] [2m[base_config.apply_yaml_config:70][0m[2m [document_id=dc658277108566bf53c497ae0021e57c file_name=Receipt_20241016_0001.pdf][0m Successfully loaded and applied configuration from: /Users/vykhand/DEV/mydocs2/config/parser.yml
[2026-02-18 23:15:25] [[32mINFO[0m] [2m[base_parser.__aenter__:46][0m[2m [document_id=dc658277108566bf53c497ae0021e57c file_name=Receipt_20241016_0001.pdf][0m Locking document: dc658277108566bf53c497ae0021e57c, config hash: 29fad80768d46df867b5b3d3e31c9745e97a349d0bd6bec0c029526f95b686a5
[2026-02-18 23:15:25] [[32mINFO[0m] [2m[parser.parse:47][0m[2m [document_id=dc658277108566bf53c497ae0021e57c file_name=Receipt_20241016_0001.pdf][0m Processing file: /Users/vykhand/Library/CloudStorage/OneDrive-Personal/Documents/SCANS/epson/workforce/Receipt_20241016_0001.pdf
[2026-02-18 23:15:25] [[32mINFO[0m] [2m[parser._aprocess_file:73][0m[2m [document_id=dc658277108566bf53c497ae0021e57c file_name=Receipt_20241016_00

## Split & Classify

Load the split-classify prompt config and run `split_and_classify()` on each document.

In [None]:
from mydocs.extracting.prompt_utils import get_split_classify_prompt
from mydocs.extracting.splitter import split_and_classify
from mydocs.extracting.models import SplitClassifyResult

# Load prompt config
prompt_config = get_split_classify_prompt("generic")
print(f"Prompt config loaded: {prompt_config.name}")
print(f"  model: {prompt_config.model}")
print(f"  batch_size: {prompt_config.batch_size}")
print(f"  overlap_factor: {prompt_config.overlap_factor}")
print(f"  output_schema: {prompt_config.output_schema}")

# Run split-classify on each document
results: dict[str, SplitClassifyResult] = {}

for doc_id in doc_ids:
    doc = await Document.aget(doc_id)
    if not doc:
        print(f"  Document {doc_id} not found, skipping")
        continue
    try:
        result = await split_and_classify(
            document_id=doc_id,
            prompt_config=prompt_config,
            case_type="generic",
        )
        results[doc_id] = result
        seg_summary = ", ".join(
            f"{s.document_type}(pp{s.page_numbers[0]}-{s.page_numbers[-1]})"
            for s in result.segments
        )
        print(f"  {doc.original_file_name}: {len(result.segments)} segments — {seg_summary}")
    except Exception as e:
        print(f"  FAILED {doc.original_file_name}: {e}")

print(f"\nCompleted: {len(results)}/{len(doc_ids)} documents")

Prompt config loaded: generic_split_classify
  model: azure/gpt-4.1
  batch_size: 12
  overlap_factor: 3
  output_schema: split_classify
[2026-02-18 23:20:18] [[32mINFO[0m] [2m[splitter.split_and_classify:273][0m[2m[0m Splitting document dc658277108566bf53c497ae0021e57c: 84 pages
[2026-02-18 23:20:18] [[32mINFO[0m] [2m[splitter.split_and_classify:279][0m[2m[0m Created 9 batches (batch_size=12, overlap=3)
[2026-02-18 23:20:21] [[36mDEBUG[0m] [2m[splitter.split_and_classify:289][0m[2m[0m Batch 1/9: 10 segments
[2026-02-18 23:20:23] [[36mDEBUG[0m] [2m[splitter.split_and_classify:289][0m[2m[0m Batch 2/9: 12 segments
[2026-02-18 23:20:24] [[36mDEBUG[0m] [2m[splitter.split_and_classify:289][0m[2m[0m Batch 3/9: 11 segments
[2026-02-18 23:20:27] [[36mDEBUG[0m] [2m[splitter.split_and_classify:289][0m[2m[0m Batch 4/9: 12 segments
[2026-02-18 23:20:28] [[36mDEBUG[0m] [2m[splitter.split_and_classify:289][0m[2m[0m Batch 5/9: 12 segments
[2026-02-18 23:20:30

## Results Summary

Aggregate statistics: document type distribution, multi-document PDF counts.

In [None]:
from collections import Counter
from IPython.display import display, HTML

# Aggregate stats
type_counter = Counter()
multi_doc_count = 0
total_segments = 0

for doc_id, result in results.items():
    total_segments += len(result.segments)
    if len(result.segments) > 1:
        multi_doc_count += 1
    for seg in result.segments:
        type_counter[seg.document_type] += 1

# Summary table
rows = ""
for doc_type, count in type_counter.most_common():
    pct = count / total_segments * 100 if total_segments else 0
    rows += f"<tr><td>{doc_type}</td><td>{count}</td><td>{pct:.1f}%</td></tr>\n"

html = f"""
<h3>Split-Classify Results</h3>
<p><b>Documents processed:</b> {len(results)}</p>
<p><b>Total segments:</b> {total_segments}</p>
<p><b>Multi-document PDFs:</b> {multi_doc_count} ({multi_doc_count/len(results)*100:.0f}% of processed)</p>
<table border='1' cellpadding='4'>
<tr><th>Document Type</th><th>Count</th><th>%</th></tr>
{rows}</table>
"""
display(HTML(html))

## Visualization

Render PDF pages with colored borders and labels for each classified segment.

In [None]:
try:
    import fitz  # PyMuPDF
except ImportError:
    raise ImportError("Install PyMuPDF: pip install pymupdf")

from IPython.display import display, Image as IPImage

# Color palette per document type
TYPE_COLORS = {
    "receipt":        (0.0, 0.6, 0.0),    # green
    "bill":           (0.0, 0.4, 0.8),    # blue
    "invoice":        (0.8, 0.4, 0.0),    # orange
    "letter":         (0.6, 0.0, 0.8),    # purple
    "form":           (0.8, 0.0, 0.4),    # magenta
    "certificate":    (0.0, 0.7, 0.7),    # teal
    "contract":       (0.5, 0.5, 0.0),    # olive
    "id_document":    (0.8, 0.0, 0.0),    # red
    "bank_statement": (0.3, 0.3, 0.8),    # slate blue
    "other":          (0.5, 0.5, 0.5),    # gray
}

def get_type_color(doc_type: str) -> tuple:
    return TYPE_COLORS.get(doc_type, (0.5, 0.5, 0.5))

In [None]:
from mydocs.extracting.models import SplitSegment


def render_document_segments(
    pdf_path: str,
    segments: list[SplitSegment],
    dpi: int = 120,
) -> list[bytes]:
    """Render PDF pages with colored borders and labels per segment.

    Returns list of PNG bytes, one per page.
    """
    # Build page_number -> segment mapping
    page_to_segment: dict[int, SplitSegment] = {}
    for seg in segments:
        for pn in seg.page_numbers:
            page_to_segment[pn] = seg

    pdf = fitz.open(pdf_path)
    images = []

    for page_idx in range(len(pdf)):
        page = pdf[page_idx]
        page_num = page_idx + 1  # 1-based
        seg = page_to_segment.get(page_num)

        if seg:
            color = get_type_color(seg.document_type)
            # Draw a thick border around the entire page
            rect = page.rect
            border = fitz.Rect(rect.x0 + 2, rect.y0 + 2, rect.x1 - 2, rect.y1 - 2)
            page.draw_rect(border, color=color, width=4)

            # Label in top-left corner
            label = f"[{seg.document_type}] pp {seg.page_numbers[0]}-{seg.page_numbers[-1]}"
            # Draw a filled background for the label
            label_rect = fitz.Rect(rect.x0 + 4, rect.y0 + 4, rect.x0 + 250, rect.y0 + 22)
            page.draw_rect(label_rect, color=color, fill=color)
            page.insert_text(
                fitz.Point(rect.x0 + 6, rect.y0 + 18),
                label,
                fontsize=10,
                color=(1, 1, 1),  # white text on colored background
            )

        mat = fitz.Matrix(dpi / 72, dpi / 72)
        pix = page.get_pixmap(matrix=mat)
        images.append(pix.tobytes("png"))

    pdf.close()
    return images

In [None]:
# Display rendered pages for the first 5 documents
MAX_DISPLAY = 5

for i, (doc_id, result) in enumerate(results.items()):
    if i >= MAX_DISPLAY:
        break

    doc = await Document.aget(doc_id)
    if not doc:
        continue

    pdf_path = doc.managed_path or doc.original_path
    if not pdf_path or not os.path.isfile(pdf_path):
        print(f"PDF not found for {doc.original_file_name}")
        continue

    seg_summary = ", ".join(
        f"{s.document_type}(pp{s.page_numbers[0]}-{s.page_numbers[-1]})"
        for s in result.segments
    )
    print(f"\n{'='*60}")
    print(f"{doc.original_file_name}: {seg_summary}")
    print(f"{'='*60}")

    images = render_document_segments(pdf_path, result.segments)
    for img_bytes in images:
        display(IPImage(data=img_bytes))

## Run on Any File

Set `TEST_PDF_PATH` to any PDF and run the full pipeline: ingest → parse → split-classify → render.

In [None]:
# Set this to any PDF path
TEST_PDF_PATH = str(WORKFORCE_DIR / "Receipt_20260129_0002.pdf")
assert os.path.isfile(TEST_PDF_PATH), f"File not found: {TEST_PDF_PATH}"

# Ingest (skip if already exists)
test_docs, test_skipped = await ingest_files(
    source=[TEST_PDF_PATH],
    storage_mode=StorageModeEnum.EXTERNAL,
    tags=[TAG],
)
if test_docs:
    test_doc = test_docs[0]
elif test_skipped:
    existing = await Document.afind({"original_path": os.path.abspath(TEST_PDF_PATH)})
    test_doc = existing[0]
else:
    raise RuntimeError("Ingest returned no documents and no skipped entries")

TEST_DOC_ID = str(test_doc.id)
print(f"Document: {test_doc.original_file_name} (id={TEST_DOC_ID})")

# Parse if needed
if test_doc.status != "parsed":
    test_doc = await parse_document(TEST_DOC_ID)
    print(f"Parsed: {test_doc.status}")

# Split-classify
test_result = await split_and_classify(
    document_id=TEST_DOC_ID,
    prompt_config=prompt_config,
    case_type="generic",
)

for seg in test_result.segments:
    print(f"  {seg.document_type}: pages {seg.page_numbers}")

# Render
test_pdf_path = test_doc.managed_path or test_doc.original_path
images = render_document_segments(test_pdf_path, test_result.segments)
for img in images:
    display(IPImage(data=img))

## SubDocument Inspection

Reload the document and inspect the persisted `SubDocument` objects.

In [None]:
# Inspect persisted subdocuments on any processed document
# Change this to any doc_id from the results
INSPECT_DOC_ID = TEST_DOC_ID

doc = await Document.aget(INSPECT_DOC_ID)
if doc and doc.subdocuments:
    print(f"Document: {doc.original_file_name}")
    print(f"SubDocuments: {len(doc.subdocuments)}\n")
    for sd in doc.subdocuments:
        page_nums = [pr.page_number for pr in sd.page_refs]
        print(f"  id: {sd.id}")
        print(f"  case_type: {sd.case_type}")
        print(f"  document_type: {sd.document_type}")
        print(f"  pages: {page_nums}")
        print(f"  created_at: {sd.created_at}")
        print()
else:
    print(f"No subdocuments found on document {INSPECT_DOC_ID}")

## Cleanup

Uncomment to delete test documents and associated data from MongoDB.

In [None]:
# # Cleanup: remove all documents tagged with the test tag
# from mydocs.extracting.models import FieldResultRecord
#
# tagged_docs = await Document.afind({"tags": TAG})
# for d in tagged_docs:
#     did = str(d.id)
#     await FieldResultRecord.adelete_many({"document_id": did})
#     await DocumentPage.adelete_many({"document_id": did})
#     await Document.adelete_one({"_id": did})
#     print(f"Deleted: {d.original_file_name} ({did})")
#
# print(f"\nCleaned up {len(tagged_docs)} documents")