# Agentic Document Extraction with Snowflake Insertion

This notebook demonstrates a **complete pipeline** for parsing documents using [Agentic Document Extraction (ADE)](https://docs.landing.ai/ade/ade-overview) and inserting structured results into **Snowflake**.

You’ll learn how to:

1) Parse documents into machine-readable markdown and JSON with **maximal parallelism**.
2) Extract structured fields from documents using LandingAI's Agentic Document Extraction (ADE) service.
3) Insert both:
   - the **original document** into a Snowflake external stage,
   - and the **extracted data** into multiple Snowflake tables.
4) Use a **buffered, shard-based insertion approach** for scalable loading.

> This notebook focuses on **invoice extraction**, but the pattern is modular and can be adapted to **any document type**.

In [None]:
# ---
# Title: Agentic Document Extraction with Snowflake Insertion
# Author: Andrea Kropp
# Description: This notebook demonstrates a modular pipeline using Agentic Document Extraction (ADE) to parse invoices and stream structured results into Snowflake. 
# Target Audience: Developers, Snowflake Partners
# Content Type: Workflow Tutorial
# Publish Date: 2025-09-07
# ADE Version: v0.3.1
# Change Log:
#    - v1.0: Initial draft
# ---

## 📁 Project Structure

The pipeline is modular and lives in reusable Python modules:

```bash
project/
├── ade_sf_pipeline_main.py   # 🔁 Main orchestration: parse → transform → stage → insert
├── config.py                 # ⚙️  Centralized settings (from .env or environment variables)
├── sf_utils.py               # 🧊 Snowflake utilities: connect, stage/table naming, DDL setup
├── doc_utils.py              # 📄 Document utilities: page counting, helpers for metadata
├── metrics.py                # ⏱️  Track wall time, parse latency, total pages, OK/FAIL count
├── version_utils.py          # 📦 Resolve installed ADE package version
├── row_builder.py            # 🧱 Build output rows from parsed documents (custom per schema)
├── loader.py                 # 📤 Buffered uploader, handles local → stage → Snowflake COPY
├── invoice_schema.py         # 📑 Defines Pydantic schema of fields to extract (custom per use case)
├── row_utils.py              # 🔧 Shared helpers for row construction (type coercion, dig, etc.)

### 🧩 What You Need to Provide

To adapt this pipeline for your specific document type, you only need to supply three components:

1. ✅ **Pydantic Schema Definition**  
   Define a schema class (e.g., `InvoiceExtractionSchema`) in `invoice_schema.py` that specifies the fields to extract from your documents using ADE.

2. ✅ **Row Builder Adapter**  
   Implement a `rows_from_doc()` function in `row_builder.py` that maps a parsed document to structured rows for each target Snowflake table (e.g., main, line items, chunks, markdown).

3. ✅ **Snowflake Column Lists**  
   Provide column lists (`COLS_MAIN`, `COLS_LINES`, etc.) that align exactly with your Snowflake table definitions.

> 📁 **Only modify:** `invoice_schema.py`, `row_builder.py`, and optionally `loader.py`  
These are the only files you need to adapt when working with a new document type or use case.


## 🔧 Configuration and Imports

This section loads the required modules for:

- Parsing documents with Agentic Document Extrcation from LandingAI
- Configuring and connecting to Snowflake
- Uploading extracted results via `Loader`
- Tracking wall-clock and parse time via `Metrics`

In [2]:
# General utilities
import os, uuid
import random
from pathlib import Path
from datetime import datetime, timezone
from pprint import pprint

# Agentic Document Extraction from LandingAI
from agentic_doc.parse import parse

# Configuration & settings management (pydantic)
from config import Settings

# Snowflake utilities: table naming, setup, SQL execution
from sf_utils import ensure_formats_and_stages, sfcursor, fq_table, put_original_to_raw_stage

# Basic utilities: page counting and reproducibility
from doc_utils import get_doc_pages
from version_utils import get_installed_version

# Core logic: row transformation from parsed doc
from row_builder import rows_from_doc

# CSV buffering and Snowflake loader class
from loader import Loader, COLS_MAIN, COLS_LINES

# High-level pipeline orchestration
from ade_sf_pipeline_main import run_pipeline_streaming

# Custom schema for invoice processing
from invoice_schema import InvoiceExtractionSchema


[2m2025-09-07 15:13:51[0m [info   [0m] [1mSettings loaded: {
  "endpoint_host": "https://api.va.landing.ai",
  "vision_agent_api_key": "OTBiN[REDACTED]",
  "batch_size": 50,
  "max_workers": 2,
  "max_retries": 5,
  "max_retry_wait_time": 30,
  "retry_logging_style": "log_msg",
  "pdf_to_image_dpi": 96,
  "split_size": 10,
  "extraction_split_size": 50
}[0m [[0m[1m[34magentic_doc.config[0m][0m (config.py:170)


# ⚙️ Settings & Environment Variables

This notebook has an accompying `.env` file which follows the example provided under [Configuration Options](https://docs.landing.ai/ade/ade-retries#configuration-options) in the documentation. All other required setting are in the class Setting in the `.config.py` file. Review the comments in the `.config` file to learn how these settings interact with `.env` settings.

In [3]:
# Load configuration (from .env or environment variables)
S = Settings()

# Display resolved settings (for verification)
pprint(S.model_dump(exclude={"VISION_AGENT_API_KEY"}))

{'BATCH_SIZE': 50,
 'MAX_RETRIES': 5,
 'MAX_RETRY_WAIT_TIME': 30,
 'MAX_WORKERS': 2,
 'RETRY_LOGGING_STYLE': 'log_msg',
 'copy_after_files': 8,
 'csv_file_format_name': 'CSV_STD',
 'database': 'DEMOS_ADE_FINANCE',
 'file_exts': {'.png', '.jpg', '.pdf', '.jpeg'},
 'json_file_format_name': 'JSON_STD',
 'max_rows_per_file': 5000,
 'max_sec_per_file': 3.0,
 'max_threads': 16,
 'private_key_file': '/Users/andreakropp/secure_keys/rsa_key.p8',
 'role': 'ADE_DEMOS',
 'snowflake_account_identifier': 'RPWERKO-LAI_SNOW_SALES',
 'snowflake_schema': 'INVOICES',
 'snowflake_user': 'MACHINE_USER_2',
 'stage_ingest_name': 'INGEST_TMP',
 'stage_raw_name': 'PARSED_INVOICES_COMPLETED',
 'table_chunks': 'PARSED_CHUNKS',
 'table_lines': 'INVOICE_LINE_ITEMS',
 'table_main': 'INVOICES_MAIN',
 'table_markdown': 'MARKDOWN',
 'warehouse': 'SNOWFLAKE_TUTORIALS'}


## Ensure Required Snowflake Formats and Stages Exist

This step checks that the necessary tables, internal stages and file formats exist in your Snowflake account. If they don't, it creates them using standard options defined in `sf_utils.py`.

We suggest **creating these manually from inside Snowflake**. See the `.sql` file that accompanies this notebook. You will need to give the snowflake_user specified in Settings a Snowflake Role which has the correct permissions.

In [4]:
# Ensure file formats & stages exist
ensure_formats_and_stages(S)
print("Formats and stages are ready.")

Snowflake Connector for Python Version: 3.17.2, Python Version: 3.12.11, Platform: macOS-15.5-arm64-arm-64bit (connection.py:521)
Connecting to GLOBAL Snowflake domain (connection.py:1464)
Formats and stages are ready.


## Files to be Processed

In [5]:
# Define input directory path and files to be processed
input_folder = Path(os.getcwd()) / "input_folder2"
file_exts = set(S.file_exts)
files = sorted([str(p) for p in input_folder.iterdir() if p.suffix.lower() in file_exts])
print(f"Found {len(files)} files.")
files[:5]

Found 4 files.


['/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_1.pdf',
 '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_2.pdf',
 '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_3.pdf',
 '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_4.pdf']

# Import your Schema

The well-defined schema is a critical part of use case success. Take some time to study the invoice schema in `invoice_schema.py` and review the documentation on [schema development](https://docs.landing.ai/ade/ade-extract-playground#start-a-schema).

This schema was developed iteratively using the Visual Playground provided by LandingAI at [https://va.landing.ai/demo/doc-extraction](https://va.landing.ai/demo/doc-extraction). 

Schemas can be defined using `pydantic` or JSON syntax. This notebook uses a `pydantic` schema with one level of nesting. Consult the Agentic Document Extraction documentation. 

In [6]:
# Import your Pydantic schema used by parse()
# The schema is already imported in the first code cell, but is repeated here for clarity. 

from invoice_schema import InvoiceExtractionSchema

# 🐤 Canary Parse + Stage

Before streaming all documents, we run a **"canary" document** through the full pipeline. This helps validate that:

- ADE parsing works as expected on real data
- Schema-to-row conversion is correct
- Snowflake stages and COPY commands succeed

🛠️ This operation:
1. Parses a single document using `InvoiceExtractionSchema`
2. Builds main, line-item, and markdown rows
3. Uploads results to Snowflake stages
4. Inserts into all four target tables

This allows us to validate the pipeline end-to-end (Agentic extraction → Snowflake COPY) before processing many files. The rows(s) inserted into Snowflake are clearly marked _CANARY for easy removal.

In [None]:
# Select one file randonly and assign it a Run ID for the Canary Run
one_file = random.choice(files)
print("Selected:", one_file)
paths_str = [str(one_file)]

CANARY_RUN_ID = (
    datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")+ "_"+ uuid.uuid4().hex[:6]+ "_CANARY")
print("CANARY_RUN_ID:", CANARY_RUN_ID)

Selected: /Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_3.pdf
CANARY_RUN_ID: 20250907T151355_a6e655_CANARY


In [8]:
# Send the selected file for parsing and extraction (no Snowflke insertion yet)
# See https://docs.landing.ai/ade/ade-parse-docs

sent_at = datetime.now(timezone.utc)
agentic_version = get_installed_version("agentic-doc") 

# 🧭 Step 1: Parse the document using the schema and track time
results = parse(paths_str, extraction_model=InvoiceExtractionSchema)
doc = results[0]

# agentic-doc outputs
markdown = getattr(doc, "markdown", None)
chunks   = getattr(doc, "chunks", None) or []
f        = getattr(doc, "extraction", None)
m        = getattr(doc, "extraction_metadata", None)

print("Parsed markdown:", isinstance(markdown, str), "| chunks:", len(chunks))


[2m2025-09-07 15:13:56[0m [info   [0m] [1mAPI key is valid.             [0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:42)
[2m2025-09-07 15:13:56[0m [info   [0m] [1mParsing 1 documents           [0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:280)
[2m2025-09-07 15:13:56[0m [info   [0m] [1mSplitting PDF: '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_3.pdf' into 0 parts under '/var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpwory84x8'[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:236)


Parsing documents:   0%|          | 0/1 [00:00<?, ?it/s]

[2m2025-09-07 15:13:56[0m [info   [0m] [1mCreated /var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpwory84x8/invoice_3_1.pdf[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:252)
[2m2025-09-07 15:13:56[0m [info   [0m] [1mStart parsing document part: 'File name: invoice_3_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:670)




HTTP Request: POST https://api.va.landing.ai/v1/tools/agentic-document-analysis "HTTP/1.1 200 OK" (_client.py:1025)
[2m2025-09-07 15:14:36[0m [info   [0m] [1mTime taken to successfully parse a document chunk: 39.75 seconds[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:823)
[2m2025-09-07 15:14:36[0m [info   [0m] [1mSuccessfully parsed document part: 'File name: invoice_3_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:679)


Parsing document parts from 'invoice_3.pdf': 100%|██████████| 1/1 [00:39<00:00, 39.78s/it]
Parsing documents: 100%|██████████| 1/1 [00:39<00:00, 39.80s/it]

Parsed markdown: True | chunks: 8





In [9]:
# Inspect the extracted fields and associated metadata
print(f)
print(m)

invoice_info=DocumentInfo(invoice_date_raw='10/03/2022', invoice_date=datetime.date(2022, 10, 3), invoice_number='52255', order_date=None, po_number=None, status=None) customer_info=CustomerInfo(sold_to_name='Leo Vincent', sold_to_address='1000 Sarah Dr\nSummerfield, NC  27358 USA', customer_email=None) company_info=SupplierInfo(supplier_name='CUSTOM ELECTRIC & PLUMBING, INC.', supplier_address='PO Box 533\nSummerfield, NC  27358', representative='Mathew', email='Mat@CustomEandP.com', phone='336.701.5589', gstin=None, pan=None) order_details=TermsAndShipping(payment_terms='by Cash or Check, All sales final. $750.00 deposit at acceptance of contract balance due upon substantial completion of work. Agreement to include conditions of agreement addendum Clarifications: It is agreed that during the performance of the work drywall will/may be cut at the discretion of the contractor, this quotation does not include any drywall or painting repair or touchup. Parking to be provided onsite for a

In [None]:
# Perform the Snowflake insertions for the one processed document
# Query the four target Snowflake tables using the `run_id` to verify that rows were copied successfully.
# If counts are zero or inconsistent, review earlier steps for parsing or staging issues.

# 🧭 Step 2: Build all output rows from the document
main_row, line_rows, chunk_rows, markdown_record, _uuid = rows_from_doc(
    fp=str(one_file), doc=doc, run_id=CANARY_RUN_ID, sent_at=sent_at, agentic_version=agentic_version
)

# 🧭 Step 3: Add rows to loader for staging → Snowflake COPY
loader = Loader(CANARY_RUN_ID, S, cols_main=COLS_MAIN, cols_lines=COLS_LINES)

if main_row: loader.add_main(main_row)
for r in (line_rows or []): loader.add_line(r)
for r in (chunk_rows or []): loader.add_chunk(r)
if markdown_record: loader.add_markdown(markdown_record)

# 🧭 Step 4: Upload the original file to the raw stage (optional archival)
put_original_to_raw_stage(str(one_file), S, loader.conn)

# 🧭 Step 5: Force a final flush and COPY (even with 1 file)
loader.close()

# 🧭 Step 6: Run SELECT COUNT(*) queries for this run_id
with sfcursor(settings=S) as cur:
    cur.execute(f"SELECT COUNT(*) FROM {fq_table(S, S.table_main)} WHERE run_id=%s", (CANARY_RUN_ID,))
    main_ct = cur.fetchone()[0]
    cur.execute(f"SELECT COUNT(*) FROM {fq_table(S, S.table_lines)} WHERE run_id=%s", (CANARY_RUN_ID,))
    lines_ct = cur.fetchone()[0]
    cur.execute(f"SELECT COUNT(*) FROM {fq_table(S, S.table_chunks)} WHERE run_id=%s", (CANARY_RUN_ID,))
    chunks_ct = cur.fetchone()[0]
    cur.execute(f"SELECT COUNT(*) FROM {fq_table(S, S.table_markdown)} WHERE run_id=%s", (CANARY_RUN_ID,))
    md_ct = cur.fetchone()[0]

print(f"COPY complete — rows: main={main_ct}, lines={lines_ct}, chunks={chunks_ct}, markdown={md_ct}")


Snowflake Connector for Python Version: 3.17.2, Python Version: 3.12.11, Platform: macOS-15.5-arm64-arm-64bit (connection.py:521)
Connecting to GLOBAL Snowflake domain (connection.py:1464)
Snowflake Connector for Python Version: 3.17.2, Python Version: 3.12.11, Platform: macOS-15.5-arm64-arm-64bit (connection.py:521)
Connecting to GLOBAL Snowflake domain (connection.py:1464)
COPY complete — rows: main=1, lines=3, chunks=8, markdown=1


## 🚀 Streaming Document Processing with Metrics

This cell runs the full pipeline over **all files** in the input folder.

For each document, it:
1. Parses the file using your schema
2. Builds rows for multiple tables
3. Uploads CSVs/JSON to Snowflake stage
4. Performs `COPY INTO` for atomic insertion
5. Captures timing and success/failure counts.


### Parse and Extract Portion of the Pipeline
The `agentic-doc` python libray will start work on 50 **documents** simultaneously due to BATCH_SIZE=50 in the `.env` file. Each document has MAX_WORKERS=2 associated with it. This means that within each document, up to 2 **pages** are processed in parallel. 

These settings work well with invoices because the majority are a single page and very few are more than 4 pages. Adjust the settings and check the timings based on your use case.

### Snowflake Insertion Portion of the Pipeline
As results from parse() come back, a per-file concurrent pipeline builds rows, stages shards and copies into tables.


In [11]:

# Optional: run a small subset first, e.g., files[:25]
batch_streaming = files[:]  # or files[:25]

metrics_streaming = run_pipeline_streaming(
    batch_streaming,  # or a subset like files[:25]
    schema_cls=InvoiceExtractionSchema,
    rows_from_doc_fn=rows_from_doc,
    settings=S,
    cols_main=COLS_MAIN,
    cols_lines=COLS_LINES,
    # run_id_suffix="",  # Optional to add your own suffix to the generated run_id"
)


Snowflake Connector for Python Version: 3.17.2, Python Version: 3.12.11, Platform: macOS-15.5-arm64-arm-64bit (connection.py:521)
Connecting to GLOBAL Snowflake domain (connection.py:1464)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mAPI key is valid.             [0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:42)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mAPI key is valid.             [0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:42)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mParsing 1 documents           [0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:280)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mAPI key is valid.             [0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:42)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mParsing 1 documents           [0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:280)


Parsing documents:   0%|          | 0/1 [00:00<?, ?it/s]

[2m2025-09-07 15:14:48[0m [info   [0m] [1mAPI key is valid.             [0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:42)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mParsing 1 documents           [0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:280)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mParsing 1 documents           [0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:280)






[2m2025-09-07 15:14:48[0m [info   [0m] [1mSplitting PDF: '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_1.pdf' into 0 parts under '/var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpn9_2mc4a'[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:236)


Parsing documents:   0%|          | 0/1 [00:00<?, ?it/s][A[A

[2m2025-09-07 15:14:48[0m [info   [0m] [1mCreated /var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpn9_2mc4a/invoice_1_1.pdf[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:252)





Parsing documents:   0%|          | 0/1 [00:00<?, ?it/s]

[2m2025-09-07 15:14:48[0m [info   [0m] [1mSplitting PDF: '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_3.pdf' into 0 parts under '/var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpvdd752cj'[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:236)


[A[A[A

[2m2025-09-07 15:14:48[0m [info   [0m] [1mStart parsing document part: 'File name: invoice_1_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:670)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mCreated /var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpvdd752cj/invoice_3_1.pdf[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:252)








[2m2025-09-07 15:14:48[0m [info   [0m] [1mSplitting PDF: '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_4.pdf' into 0 parts under '/var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpc_usu2w0'[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:236)


Parsing document parts from 'invoice_1.pdf':   0%|          | 0/1 [00:00<?, ?it/s][A[A[A[A

[2m2025-09-07 15:14:48[0m [info   [0m] [1mSplitting PDF: '/Users/andreakropp/Documents/Demos/ADE Demos/Notebooks/Snowflake_Insertion_Demo/input_folder2/invoice_2.pdf' into 0 parts under '/var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmp68r1e1f3'[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:236)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mStart parsing document part: 'File name: invoice_3_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:670)
[2m2025-09-07 15:14:48[0m [info   [0m] [1mCreated /var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmpc_usu2w0/invoice_4_1.pdf[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:252)









[2m2025-09-07 15:14:48[0m [info   [0m] [1mCreated /var/folders/wn/5bkqt1cs3x9_tn9h8nbwfpnm0000gn/T/tmp68r1e1f3/invoice_2_1.pdf[0m [[0m[1m[34magentic_doc.utils[0m][0m (utils.py:252)


Parsing document parts from 'invoice_3.pdf':   0%|          | 0/1 [00:00<?, ?it/s]

[2m2025-09-07 15:14:48[0m [info   [0m] [1mStart parsing document part: 'File name: invoice_4_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:670)


[A[A[A[A[A

[2m2025-09-07 15:14:48[0m [info   [0m] [1mStart parsing document part: 'File name: invoice_2_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:670)







[A[A[A[A[A





[A[A[A[A[A[A

HTTP Request: POST https://api.va.landing.ai/v1/tools/agentic-document-analysis "HTTP/1.1 200 OK" (_client.py:1025)
[2m2025-09-07 15:15:11[0m [info   [0m] [1mTime taken to successfully parse a document chunk: 22.57 seconds[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:823)
[2m2025-09-07 15:15:11[0m [info   [0m] [1mSuccessfully parsed document part: 'File name: invoice_1_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:679)





Parsing document parts from 'invoice_1.pdf': 100%|██████████| 1/1 [00:22<00:00, 22.59s/it]
Parsing documents: 100%|██████████| 1/1 [00:22<00:00, 22.62s/it]


HTTP Request: POST https://api.va.landing.ai/v1/tools/agentic-document-analysis "HTTP/1.1 200 OK" (_client.py:1025)
[2m2025-09-07 15:15:16[0m [info   [0m] [1mTime taken to successfully parse a document chunk: 27.13 seconds[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:823)
[2m2025-09-07 15:15:16[0m [info   [0m] [1mSuccessfully parsed document part: 'File name: invoice_2_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:679)








Parsing document parts from 'invoice_2.pdf': 100%|██████████| 1/1 [00:27<00:00, 27.14s/it]


Parsing documents: 100%|██████████| 1/1 [00:27<00:00, 27.24s/it]


HTTP Request: POST https://api.va.landing.ai/v1/tools/agentic-document-analysis "HTTP/1.1 200 OK" (_client.py:1025)
[2m2025-09-07 15:15:26[0m [info   [0m] [1mTime taken to successfully parse a document chunk: 37.37 seconds[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:823)
[2m2025-09-07 15:15:26[0m [info   [0m] [1mSuccessfully parsed document part: 'File name: invoice_3_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:679)






Parsing document parts from 'invoice_3.pdf': 100%|██████████| 1/1 [00:37<00:00, 37.32s/it]
Parsing documents: 100%|██████████| 1/1 [00:37<00:00, 37.42s/it]


HTTP Request: POST https://api.va.landing.ai/v1/tools/agentic-document-analysis "HTTP/1.1 200 OK" (_client.py:1025)
[2m2025-09-07 15:15:28[0m [info   [0m] [1mTime taken to successfully parse a document chunk: 39.37 seconds[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:823)
[2m2025-09-07 15:15:28[0m [info   [0m] [1mSuccessfully parsed document part: 'File name: invoice_4_1.pdf	Page: [0:0]'[0m [[0m[1m[34magentic_doc.parse[0m][0m (parse.py:679)







Parsing document parts from 'invoice_4.pdf': 100%|██████████| 1/1 [00:39<00:00, 39.38s/it]

Parsing documents: 100%|██████████| 1/1 [00:39<00:00, 39.48s/it]


## 📊 Results Summary

Final metrics show the overall performance of the pipeline run.

Includes:
- ✅ Count of successfully parsed documents
- ❌ Count of failures
- 📄 Total pages processed
- ⏱️ Wall-clock time and parse time
- 📈 Average time per document and per page

Check the **wall time per document and per page**. The optimal settings will depend on your document mix. To really test the timing, we suggest a batch of documents that is 5X the BATCH_SIZE. In this case BATCH _SIZE=50, so testing with 250 invoices would best demonstrate how Agentic Document Extraction begin processing the next files upon successful completion of a prior file and Snowflake streaming insertion is continuous.

The `parse()` from Agentic Document Extraction includes both parsing and field extraction in the timing. Longer documents and longer schemas require more time.

In [12]:
print(metrics_streaming.summary())

Run ID: 20250907T151447_5c7a95
Files: 4 OK / 0 failed / 4 total
Pages: 4

Total times (seconds):
  Wall clock:   47.58
  Parse:        129.07
  COPY:         7.51

Avg time per PAGE (s):
  Parse:        32.267
  Pipeline:     34.146
  Wall clock:   11.895

Avg time per DOC (s):
  Parse:        32.267
  Pipeline:     34.146
  Wall clock:   11.895



## 🧾 Conclusions & Support

This notebook provides a working reference for using ADE with Snowflake to parse, transform, and persist structured data from documents like invoices.

Because it is modular, it can be easily adapted to other document types — simply update:
- The schema (e.g., PO, receipts, resumes)
- The `rows_from_doc()` logic
- The Snowflake table structure

### 💬 Need Help?

📚 [Documentation](https://docs.landing.ai/ade/ade-overview)  
👾 [Discord Support Server](https://docs.landing.ai/ade/ade-support#discord-channel)  
📥 Submit a ticket or talk to the bot on Discord.

We’re here to help!
