# PDF extraction
## Option 1: Process all JPEG files in stage using AI_PARSE_DOCUMENT
### Step 1: Load files into stage




In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import Session

# Import required libraries
import snowflake.connector
import re
import time

session = get_active_session()


In [None]:
-- This step needs to be done from local machine
-- The stage must follow this guide:

In [None]:
if session.sql('select * from raw.pdf.file_list') is None:

    session.sql(f'''
    CREATE OR REPLACE TABLE raw.pdf.file_list as
    SELECT distinct        
        metadata$filename as file_name,
        metadata$file_last_modified as file_last_modified,
        split_part(file_name, '_',1) as from_where,
        split_part(split_part(file_name, '/',2),'.',1) as table_name
    FROM @RAW.PDF.BNK
    where metadata$filename ilike '%jpeg'
    ''')

file_list = session.sql('select * from raw.pdf.file_list').to_pandas()
file_list


### Step 2: Parse everything into tables

In [None]:

# process_count = 0
# total_files = len(file_list)

for x in file_list.iterrows():
    file_name = x[1]['FILE_NAME']
    table_name = x[1]['TABLE_NAME']
    print(file_name, table_name)
    parse_query = f"""
        create or replace table RAW.PDF.{table_name} as (
        SELECT AI_PARSE_DOCUMENT (
            TO_FILE('@"RAW"."PDF"."BNK"','{file_name}'),
            {{'mode': 'LAYOUT', 'page_split': false}}) AS content);
            """
    session.sql(parse_query).collect()
    session.sql(f"select * from RAW.PDF.{table_name}").collect()

print("Completed loading to tables")

In [None]:
process_count = 0
total_files = len(file_list)
print(total_files)
query = ""


for x in file_list.iterrows():
    table_name = x[1]['TABLE_NAME']
    
    process_count +=1
    if process_count < total_files:
        query += f"""
            select '{table_name}' as file_source, content from RAW.PDF.{table_name} union all  --{process_count}
        """
    else:
        query += f"""
            select '{table_name}' as file_source, content from RAW.PDF.{table_name}  --{process_count}
        """

print(query)


In [None]:

    create database transform;
    create schema transform.intermediate;
    -- create table transform.intermediate.unioned_pdf as         

In [None]:
session.sql(f"""
    create or replace table transform.intermediate.unioned_pdf as 
    (with unioned as ({query})
    
    select u.*,
        fl.from_where
    from unioned as u
    left join raw.pdf.file_list as fl
        on u.file_source = fl.table_name)
""")
# print(query)

In [None]:
select * from transform.intermediate.unioned_pdf

### Step 3: Create BankA table out of PDFs

In [None]:
create or replace table transform.intermediate.statements_bank_a as (
with joined as (
    select *
    from transform.intermediate.unioned_pdf
    where from_where ilike '%bank_a%'
),
cleansed as (
    select
        from_where,
        file_source,
        index,
        mod(index - 1, 11) as position_in_group,
        value,
        row_number() over(partition by file_source, position_in_group order by index) as record_group
    from joined,
    lateral split_to_table(content['content']::string,'|')
    order by file_source,index 
),
-- select * from cleansed;
transformed_data AS (
    SELECT 
        file_source,
        record_group,
        MAX(CASE WHEN position_in_group = 1 THEN trim(value) END) AS transaction_date,
        MAX(CASE WHEN position_in_group = 2 THEN trim(value) END) AS remitter,
        MAX(CASE WHEN position_in_group = 3 THEN trim(value) END) AS remitter_bank,
        MAX(CASE WHEN position_in_group = 4 THEN trim(value) END) AS details,
        MAX(CASE WHEN position_in_group = 5 THEN trim(value) END) AS transaction_no,
        MAX(CASE WHEN position_in_group = 6 THEN trim(value) END) AS debit,
        MAX(CASE WHEN position_in_group = 7 THEN trim(value) END) AS credit,
        MAX(CASE WHEN position_in_group = 8 THEN trim(value) END) AS fee_interest,
        MAX(CASE WHEN position_in_group = 9 THEN trim(value) END) AS tax,
        MAX(CASE WHEN position_in_group = 10 THEN trim(value) END) AS balance
    from cleansed
    group by all
    having len(transaction_date) = 10
    
)
-- select * from transformed_data order by file_source, record_group ;

select 
    file_source,
    to_date(transaction_date, 'dd/mm/yyyy') AS transaction_date,
    remitter,
    remitter_bank,
    details,
    transaction_no,
    TRY_CAST(REPLACE(debit, ',', '') AS DECIMAL(18,2)) AS debit,
    TRY_CAST(REPLACE(credit, ',', '') AS DECIMAL(18,2)) AS credit,
    TRY_CAST(REPLACE(fee_interest, ',', '') AS DECIMAL(18,2)) AS fee_interest,
    TRY_CAST(REPLACE(tax, ',', '') AS DECIMAL(18,2)) AS tax,
    TRY_CAST(REPLACE(balance, ',', '') AS DECIMAL(18,2)) AS balance,
    row_number() over(order by file_source,record_group) AS record_sequence
from transformed_data 
order by file_source, record_group 
)

## Option 2: Process all JPEG files in stage using DOCUMENT AI
### Step 1: Set up new db, schema, roles for Document AI


In [None]:
-- Set up new db and schema and roles for Document AI
CREATE DATABASE doc_ai_db;
CREATE SCHEMA doc_ai_db.doc_ai_schema;

USE ROLE ACCOUNTADMIN;

CREATE ROLE doc_ai_role;

GRANT DATABASE ROLE SNOWFLAKE.DOCUMENT_INTELLIGENCE_CREATOR TO ROLE doc_ai_role;
GRANT USAGE, OPERATE ON WAREHOUSE COMPUTE_WH TO ROLE doc_ai_role;
GRANT USAGE ON DATABASE doc_ai_db TO ROLE doc_ai_role;
GRANT USAGE ON SCHEMA doc_ai_db.doc_ai_schema TO ROLE doc_ai_role;
GRANT CREATE STAGE ON SCHEMA doc_ai_db.doc_ai_schema TO ROLE doc_ai_role;
GRANT CREATE SNOWFLAKE.ML.DOCUMENT_INTELLIGENCE ON SCHEMA doc_ai_db.doc_ai_schema TO ROLE doc_ai_role;
GRANT CREATE MODEL ON SCHEMA doc_ai_db.doc_ai_schema TO ROLE doc_ai_role;
GRANT CREATE STREAM, CREATE TABLE, CREATE TASK, CREATE VIEW ON SCHEMA doc_ai_db.doc_ai_schema TO ROLE doc_ai_role;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE doc_ai_role;
GRANT ROLE doc_ai_role TO USER <user_name>;

### Step 2: Create new staging for the files

In [None]:

use role doc_ai_role;
use database doc_ai_db;
use schema doc_ai_schema;

-- create document processing stage
CREATE OR REPLACE STAGE my_pdf_stage
  DIRECTORY = (ENABLE = TRUE)
  ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

-- create stream on stage
CREATE OR REPLACE STREAM my_pdf_stream ON STAGE my_pdf_stage;
ALTER STAGE my_pdf_stage REFRESH;

-- create pdf_reviews to store information about docs
CREATE OR REPLACE TABLE pdf_reviews (
  file_name VARCHAR,
  file_size VARIANT,
  last_modified VARCHAR,
  snowflake_file_url VARCHAR,
  json_content VARCHAR
);

### Step 3: Train documents in Document AI
- Use the doc_ai_role just created for these steps
- Build a Document AI model
- Upload sample files into Document (choose Table extraction)
- Enter key, column names, then click Extract
- Validate the tables extracted
- Go to the build and Publish if satisfactory

After this step is done you can try out the model on new documents uploaded onto your stage


In [None]:

---testing on 1 single file
SELECT DOC_AI_DB.DOC_AI_SCHEMA.BANK_PDF_TABLE!PREDICT(
  GET_PRESIGNED_URL('@my_pdf_stage', 'IMG_5281.jpeg'), 1);

### Step 4: Apply the Document AI model onto all files

In [None]:
-- Storing the extraction result into pdf_reviews table
CREATE OR REPLACE TABLE pdf_reviews AS (
  SELECT
    RELATIVE_PATH AS file_name,
    size AS file_size,
    last_modified,
    file_url AS snowflake_file_url,
    BANK_PDF_TABLE!PREDICT(GET_PRESIGNED_URL('@my_pdf_stage', RELATIVE_PATH), 1) AS json_content
  FROM my_pdf_stream
  WHERE METADATA$ACTION = 'INSERT'
);

In [None]:
create or replace table pdf_result as (
--flatten json to column-based rows
with base as (
    select 
        file_name,
        json_content,
        content.*
    from pdf_reviews,
    lateral flatten(input => json_content) as content
    -- where file_name = 'IMG_5281.jpeg'
),
--flatten next to column-record-based rows
staging as (
    select 
        base.file_name,
        v.seq as column_ordinal,
        lower(replace(replace(base.key,'transaction|',''),' ','_')) as column_name,
        coalesce(base.value['ocrScore'], v.value['score'])::float as ocr_score,
        v.value['value']::string as value,
        v.index as _row_number
    from base,
    lateral flatten(input => value) as v
),
-- select * from staging;
meta_data as (
    select
        file_name,
        case when column_name = '__documentmetadata' then ocr_score end as ocr_score_on_file,
    from staging
    where _row_number is null
),
-- select * from meta_data;
pivoted as (
    select 
        staging.file_name,
        _row_number,
        meta_data.ocr_score_on_file,
        max(case when column_name = 'sequence_no' then value end)as seq_no,
        max(case when column_name = 'transaction_code' then value end)as txn_code,
        -- max(case when column_name = 'cheque_no' then value end)as cheque_no, --has no value
        max(case when column_name = 'transaction_date' then value end)as txn_date,
        max(case when column_name = 'effective_date' then value end)as effective_date,
        max(case when column_name = 'withdrawal' then value end)as withdrawal,
        max(case when column_name = 'deposit' then value end)as deposit,
        max(case when column_name = 'balance' then value end)as balance,
        max(case when column_name = 'remarks' then value end)as remarks
    
    from staging
    left join meta_data on staging.file_name = meta_data.file_name
    where _row_number is not null
    group by all
    
)
select
    file_name,
    _row_number,
    ocr_score_on_file,
    seq_no,
    txn_code,
    try_to_date(txn_date, 'dd/mm/yyyy') as txn_date,
    try_to_date(effective_date, 'dd/mm/yyyy') as effective_date,
    TRY_CAST(REPLACE(withdrawal, ',', '') AS DECIMAL(18,2)) AS withdrawal,
    TRY_CAST(REPLACE(deposit, ',', '') AS DECIMAL(18,2)) AS deposit,
    TRY_CAST(REPLACE(balance, ',', '') AS DECIMAL(18,2)) AS balance,
    remarks
from pivoted
order by file_name, _row_number
)