Automated purchase order email processor. Takes .eml files with PO attachments (scanned images, PDFs, DOCX), extracts structured data via Gemini, resolves entities against a Supabase product catalog, and validates the result with business rules + Gemini cross-check.
Suppliers send purchase orders as email attachments — scanned images, PDFs, Word documents. The data is messy:
- SKUs may not match the catalog exactly (
SKU-13is actuallySKU-1-3) - Email body may override fields from the PO document ("push back delivery for SKU-1-3 to 02/01/2027")
- Unknown products, changed quantities, past dates, missing fields
- Different suppliers, different PO formats
The pipeline must extract everything, match it to the right products, flag anomalies, and determine what's safe to write to the database vs. what needs human review.
- Python 3.13 (3.14 has langfuse compatibility issues)
- uv package manager
- Supabase project with the schema from
task/supabase-schema.sql - Gemini API key
# Create venv and install
uv venv --python 3.13 .venv
source .venv/bin/activate
uv pip install -e ".[dev,stress]"
# Configure credentials
cat > .env.local << 'EOF'
GEMINI_API_KEY=your-gemini-key
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_KEY=your-anon-key
# Optional: Langfuse observability (see langfuse.md)
LANGFUSE_PUBLIC_KEY=pk-lf-xxx
LANGFUSE_SECRET_KEY=sk-lf-xxx
LANGFUSE_BASE_URL=https://cloud.langfuse.com
EOF# Process a single email
.venv/bin/python -m po_processor.main process task/sample.eml -v
# With Rich terminal output
.venv/bin/python -m po_processor.main process task/sample.eml --rich
# Stress test (mock mode — no Gemini costs, 22 scenarios)
.venv/bin/python -m po_processor.main stress-test --mode mock
# Stress test (real mode — full E2E with Gemini)
.venv/bin/python -m po_processor.main stress-test --mode real --scenarios sc01,sc02,sc03
# Run tests
.venv/bin/python -m pytest po_processor/tests/ -v.eml file
|
v
[Stage 1] Email Parser ─────── stdlib `email` module
| Extracts sender, subject, body, attachments
v
[Stage 2] Format Router ────── Docling for DOCX/PPTX conversion
| Routes images/PDFs directly to Gemini
v
[Stage 3] Gemini Extraction ── Single multimodal API call
| Returns structured JSON (header + line items + email overrides)
| Extracts UOM per line and currency from PO header
v
[Stage 4] Entity Resolution ── 4-step cascade with self-learning
| exact → supplier_sku → fuzzy → Gemini semantic
| Embedding pre-filter: e5-base-v2 narrows Gemini candidates
| Domain-based supplier fallback when exact email fails
| Write-back: saves learned Gemini aliases to DB for future use
v
[Stage 5] Validation ────────── Layer 1: deterministic business rules
| Layer 2: Gemini cross-validation
v
PipelineResult (JSON)
Each extracted SKU goes through a 4-step resolution cascade against the Supabase product catalog:
| Step | Method | Example | Confidence |
|---|---|---|---|
| 1 | Exact match | SKU-1 == product.sku |
1.0 |
| 2 | Supplier SKU | SKU-13 → normalize → SKU13 matches supplier_product.supplier_sku → SKU-1-3 |
0.95 |
| 3 | RapidFuzz fuzzy | SKU-to-SKU + title-to-title scoring (threshold: 85.0) | varies |
| 4 | Gemini semantic | Pre-filtered by embedding top-N candidates, validated against catalog, must meet auto_resolve_confidence (0.9) |
varies |
| - | Unresolved | No match found — flagged as CRITICAL | 0.0 |
Embedding pre-filter: The local intfloat/e5-base-v2 model computes cosine similarity to narrow candidates before Gemini — it does not auto-resolve SKUs. Passage text is enriched with known supplier aliases to improve similarity scoring.
Supplier resolution also has a fallback: if the sender email doesn't match any supplier exactly, the pipeline extracts the domain and looks for a single supplier on that domain (confidence 0.85).
Self-learning: When a SKU is resolved via Gemini (confidence ≥ 0.7), the alias is written back to supplier_product so future emails resolve it instantly via step 2. Write-back only updates supplier_sku — existing price_per_unit values are preserved.
Layer 1 (deterministic, always runs):
| Category | Severity | Trigger |
|---|---|---|
unknown_sku |
CRITICAL | SKU unresolved after all cascade steps |
unknown_supplier |
CRITICAL | Sender email not in supplier table — blocks all lines |
quantity_mismatch |
WARNING/CRITICAL | PO qty differs from DB; CRITICAL if >5x change |
date_in_past |
WARNING | Delivery date is before today |
missing_date |
WARNING | No delivery date on line |
missing_price |
WARNING | No total price on line |
uom_mismatch |
WARNING | Multiple different units of measure across lines |
supplier_domain_match |
WARNING | Supplier resolved by domain only (not exact email) |
sku_alias_resolved |
INFO | SKU matched via supplier_sku, fuzzy, or Gemini |
email_override_applied |
INFO | Email body changed a field (date or quantity) |
new_po_line |
INFO | Product not in existing PO lines |
missing_uom |
INFO | No unit of measure on line |
Layer 2 (Gemini cross-validation): reviews all Layer 1 findings and extraction data, flags anything missed.
Lines with CRITICAL findings are marked ready_for_db = False and require human review. Global criticals (unknown supplier, Layer-2 global findings) block all lines.
Proposed actions: After validation, the pipeline generates a proposed_actions list — concrete DB operations (create_po, create_line, update_qty, update_date, no_change, review) by comparing validated lines against current DB state.
po_processor/
├── config.py # Settings, env vars, Langfuse init
├── exceptions.py # Custom exception hierarchy
├── main.py # CLI entrypoint (process + stress-test subcommands)
├── models/
│ ├── database.py # Pydantic models for Supabase tables
│ ├── extraction.py # ExtractionResult, ExtractedLineItem (+UOM), EmailOverride
│ ├── resolution.py # EntityResolutionResult, ResolvedSKU, MatchType (5 types)
│ └── validation.py # ValidationResult, ReviewItem, ProposedAction, ActionType
├── db/
│ └── client.py # Typed Supabase wrapper (reads + writes + alias upsert)
├── pipeline/
│ ├── email_parser.py # .eml parsing → ParsedEmail
│ ├── format_router.py # MIME routing, Docling conversion
│ ├── extractor.py # Gemini multimodal extraction
│ ├── embedding_cache.py # Product embedding cache (e5-base-v2, disk-cached)
│ ├── entity_resolver.py # 4-step SKU resolution cascade + write-back
│ ├── validator.py # Business rules + Gemini cross-check
│ └── orchestrator.py # Pipeline coordinator
├── prompts/
│ ├── extraction.py # Gemini extraction prompt template
│ └── validation.py # Gemini validation prompt template
├── stress/
│ ├── scenarios.py # 22 edge-case scenario definitions
│ ├── mock_generator.py # Scenario → ExtractionResult (skip Gemini)
│ ├── email_builder.py # .eml assembly from scenarios
│ ├── document_renderer.py # PNG/PDF/DOCX PO document generation
│ └── runner.py # Batch orchestrator with callbacks
├── cli/
│ └── display.py # Rich terminal output (progress bars, tables)
└── tests/
├── conftest.py # Shared fixtures, mock DB, sample data
├── test_email_parser.py # 10 tests
├── test_format_router.py # 5 tests
├── test_extractor.py # 5 tests
├── test_entity_resolver.py # 20 tests (incl. Gemini validation, domain, write-back)
├── test_validator.py # 18 tests (incl. actions, unknown supplier, L2 blocking)
├── test_orchestrator.py # 2 tests
├── test_redteam.py # 22 tests
└── test_stress.py # 11 tests
.venv/bin/python -m po_processor.main process <eml_path> [options]| Flag | Description |
|---|---|
-v, --verbose |
Debug logging to stderr |
-o, --output FILE |
Write JSON to file instead of stdout |
--model MODEL |
Override Gemini model (default: gemini-3-flash-preview) |
--rich |
Rich terminal output with colored tables |
Backward compatibility: python -m po_processor.main task/sample.eml -v still works (auto-routes to process).
.venv/bin/python -m po_processor.main stress-test [options]| Flag | Description |
|---|---|
--mode {mock,real} |
mock skips Gemini (fast), real is full E2E (default: mock) |
--seed N |
RNG seed for reproducible scenarios (default: 42) |
--scenarios IDS |
Comma-separated scenario IDs (default: all 22) |
--output-dir DIR |
Directory for generated .eml and document files |
-v, --verbose |
Debug logging |
--model MODEL |
Override Gemini model |
Mock mode generates ExtractionResult objects directly from scenario definitions and injects them into the pipeline, skipping stages 2-3. Runs entity resolution and validation against the real Supabase DB. No Gemini API costs.
Real mode renders PO documents (PNG/PDF/DOCX), builds .eml files, and runs the full pipeline including Gemini extraction.
22 scenarios covering all edge cases, built against the seed data:
| # | ID | Tests | Format |
|---|---|---|---|
| 1 | sc01_happy_path |
All exact SKUs, quantities match | PNG |
| 2 | sc02_sku_alias |
SKU-13 → SKU-1-3 via supplier_sku | PNG |
| 3 | sc03_unknown_sku |
SKU-99 not in catalog | |
| 4 | sc04_multi_unknown_sku |
SKU-99 + SKU-88 | PNG |
| 5 | sc05_email_date_override |
Email overrides delivery date | DOCX |
| 6 | sc06_email_qty_override |
Email overrides quantity | PNG |
| 7 | sc07_multi_override |
Two overrides on different SKUs | |
| 8 | sc08_qty_small_change |
+10% qty change (WARNING) | PNG |
| 9 | sc09_qty_large_change |
+300% qty change (WARNING) | |
| 10 | sc10_qty_extreme |
250x qty change (CRITICAL) | PNG |
| 11 | sc11_date_in_past |
All dates 2024-01-01 | DOCX |
| 12 | sc12_missing_all_dates |
delivery_date=None everywhere | PNG |
| 13 | sc13_missing_all_prices |
total_price=None everywhere | |
| 14 | sc14_missing_titles |
title=None for all lines | PNG |
| 15 | sc15_unknown_sender |
unknown@random.com | |
| 16 | sc16_new_po_line |
SKU-4 not in PO-12 | DOCX |
| 17 | sc17_different_po |
PO-35 from Small Supplier | PNG |
| 18 | sc18_kitchen_sink |
Alias + unknown + override + past date + missing price + new line | |
| 19 | sc19_zero_overlap_sku |
VENDOR-ALPHA-004 → SKU-4 via supplier_sku (zero lexical overlap) | PNG |
| 20 | sc20_supplier_domain_match |
Supplier resolved by domain fallback | PNG |
| 21 | sc21_uom_present |
Mixed UOMs (pcs + kg) → uom_mismatch WARNING | PNG |
| 22 | sc22_embedding_resolution |
PROD-ONE-GLOBAL → embedding similarity (real mode only) | PNG |
See task/supabase-schema.sql for the full schema. Key tables:
product (id, sku, title)
supplier (id, name, email)
purchase_order (id, reference_num, supplier_id, delivery_date)
purchase_order_line(id, purchase_order_id, product_id, quantity, delivery_date)
supplier_product (supplier_id, product_id, supplier_sku, price_per_unit)
Seed data includes 5 products (SKU-1 through SKU-4 + SKU-1-3), 3 suppliers, 3 POs, and supplier SKU aliases (SKU13 → SKU-1-3, VENDOR-ALPHA-004 → SKU-4).
All settings are in po_processor/config.py as a frozen dataclass:
| Setting | Default | Description |
|---|---|---|
gemini_model |
gemini-3-flash-preview |
Gemini model for extraction + validation |
fuzzy_match_threshold |
85.0 |
RapidFuzz SKU matching threshold (WRatio) |
fuzzy_title_threshold |
70.0 |
RapidFuzz title matching threshold |
embedding_model |
intfloat/e5-base-v2 |
Sentence-transformer model for embedding resolution |
embedding_top_n |
30 |
Number of embedding candidates to consider |
embedding_similarity_threshold |
0.7 |
Cosine similarity threshold for embedding pre-filter candidates |
embedding_cache_dir |
"" (auto) |
Disk cache directory for embeddings (default: .embedding_cache/) |
gemini_writeback_min_confidence |
0.7 |
Min confidence to write-back learned SKU alias to DB |
quantity_extreme_multiplier |
5.0 |
Qty change ratio above this = CRITICAL |
auto_resolve_confidence |
0.9 |
Min confidence for Gemini semantic resolution to be accepted |
gemini_max_retries |
3 |
Max Gemini API retries |
po_reference_prefix |
PO- |
Prepended to raw PO reference for DB lookup |
Langfuse integration provides LLM observability. See langfuse.md for setup and usage.
Pipeline stages are decorated with @observe() — when Langfuse keys are configured, every run produces a trace with nested spans and Gemini call details visible in the Langfuse dashboard. Without keys, tracing is silently disabled.
Core:
google-genai— Gemini API clientdocling— DOCX/PPTX to markdown conversionsupabase— Database clientpydantic— Data validationrapidfuzz— Fuzzy string matchingsentence-transformers— Local embedding model (e5-base-v2) for semantic SKU resolutionnumpy— Embedding similarity computationpillow— Image handlinglangfuse+openinference-instrumentation-google-genai— LLM observability
Stress testing (pip install -e ".[stress]"):
rich— Terminal UI (progress bars, colored tables)reportlab— PDF generationpython-docx— DOCX generation
Development (pip install -e ".[dev]"):
pytest,pytest-asyncio
# All 97 tests
.venv/bin/python -m pytest po_processor/tests/ -v
# Just stress tests
.venv/bin/python -m pytest po_processor/tests/test_stress.py -v
# Specific test module
.venv/bin/python -m pytest po_processor/tests/test_validator.py -vAll tests use mocked DB and Gemini clients — no real API calls, no credentials needed.
Processing task/sample.eml (PO-12 from Big Supplier with scanned PO image):
SUMMARY: Processed 5 lines: 3 ready for DB, 2 need human review.
Total findings: 12 (1 critical, 4 warnings, 7 info)
Items needing human review:
[CRITICAL] SKU 'SKU-7' could not be matched to any product in the catalog
The pipeline correctly:
- Resolves
SKU-13→SKU-1-3via supplier_sku normalization - Applies the email override:
SKU-1-3delivery date →2027-02-01 - Flags
SKU-7as unknown (CRITICAL, blocks DB write) - Detects quantity changes as WARNINGs
- Detects past dates as WARNINGs