# PDF extraction from JPEG files
- Method: Process all JPEG files in stage using AI_PARSE_DOCUMENT
- Steps:
1. Load jpeg files into respective stages
2. AI_PARSE_DOCUMENT on jpeg files in each stage, output as table of `content` parsed
3. Union all into 1 table
4. Transform parsed `content` into respective VIB and TCB tables, preserving original tabular structure in images
5. Test output




In [None]:
# Import python packages
import streamlit as st
import pandas as pd

from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import Session

import snowflake.connector
import re
import time


session = get_active_session()


In [None]:
DATABASE='RAW_DB'
SCHEMA='PDF'
TABLE_NAME='HISTORY_FILE_LIST'
TCB_STAGE='TCB_HISTORY_STAGE'
VIB_STAGE='VIB_HISTORY_STAGE'
CREDIT_STAGE='TCB_CREDIT_STAGE'

TRANSFORM_DB = 'TRANSFORM_DB'
TRANSFORM_BASE = 'TRANSFORM_DB.BASE'
TRANSFORM_INT = 'TRANSFORM_DB.INT'
TRANSFORM_MART = 'TRANSFORM_DB.MART'
UNION_TABLE = 'UNIONED_HISTORICAL_PDF'

FRESH_START = False #if set to true it will run the setup_stages block
REDO = False # reset this to False if you already processed AI_PARSE_DOCUMENT

#comment this out if no redo items
REDO_LIST = [ 
    # 'VIB_IMG_5282',
    # 'VIB_IMG_5283',
    # 'VIB_IMG_5285',
    # 'VIB_IMG_5286',
    # 'VIB_IMG_5290',
    # 'VIB_IMG_5291',
    # 'VIB_IMG_5300',
    # 'VIB_IMG_5302',
    # 'VIB_IMG_5303',
    # 'VIB_IMG_5304',
    # 'VIB_IMG_5306',
    # 'TCB_IMG_5111',
    # 'TCB_IMG_5080'
    # 'VIB_IMG_5284',
    # 'VIB_IMG_5288',
    # 'VIB_IMG_5289',
    # 'VIB_IMG_5294',
    # 'VIB_IMG_5296',
    # 'VIB_IMG_5299'
    # 'TCB_IMG_5072',
    'TCB_IMG_5114','TCB_IMG_5116'
    ] 


In [None]:
if FRESH_START:
    session.sql(f'''
        CREATE DATABASE {DATABASE};
        CREATE SCHEMA {DATABASE}.{SCHEMA};
        create or replace database {TRANSFORM_DB};
        create or replace schema {TRANSFORM_BASE};
        create or replace schema {TRANSFORM_INT};
        create or replace schema {TRANSFORM_MART};
    ''')
    print(f'Setting up {DATABASE}.{SCHEMA} completed')
    
    session.sql(f'''
        ALTER ACCOUNT SET CORTEX_ENABLED_CROSS_REGION = 'ANY_REGION';
        
        -- Server-side encryption (supported)
        CREATE OR REPLACE STAGE {DATABASE}.{SCHEMA}.{TCB_STAGE}
          ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
          DIRECTORY = ( ENABLE = TRUE ); 
        
        CREATE OR REPLACE STAGE {DATABASE}.{SCHEMA}.{VIB_STAGE}
          ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
          DIRECTORY = ( ENABLE = TRUE ); 
        
        CREATE OR REPLACE STAGE {DATABASE}.{SCHEMA}.{CREDIT_STAGE}
          ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
          DIRECTORY = ( ENABLE = TRUE );
    '''
    )
    print('Setting up stages completed. Go ahead and load files into stages')
else:
    print('No setup required')

In [None]:
# Create the DataFrame first
file_list = session.sql(f'''
    SELECT distinct
        metadata$filename as file_name,
        metadata$file_last_modified as file_last_modified,
        '{VIB_STAGE}' as from_where,
        'VIB_' || split_part(file_name, '.',1) as table_name
    FROM @"{DATABASE}"."{SCHEMA}"."{VIB_STAGE}"
    union all
    SELECT distinct
        metadata$filename as file_name,
        metadata$file_last_modified as file_last_modified,
        '{TCB_STAGE}' as from_where,
        'TCB_' || split_part(file_name, '.',1) as table_name
    FROM @"{DATABASE}"."{SCHEMA}"."{TCB_STAGE}"
''').to_pandas()

if file_list is None:
    # Then write to table
    session.write_pandas(
        df=file_list,
        database=DATABASE,
        schema=SCHEMA,
        table_name=TABLE_NAME,
        auto_create_table=True,
        overwrite=True
    )
else:
    print('Already got file_list')

### Step 2: Parse everything into tables

In [None]:
if session.sql('''
    select count(*) as record from raw_db.information_schema.tables
    where 
        table_name like 'TCB%' 
        or table_name like 'VIB%'
    ''').to_pandas()['RECORD'][0] == 0 or REDO:
    for _,row in file_list.iterrows():
        file_name = row['FILE_NAME']
        table_name = row['TABLE_NAME']
        stage_name = row['FROM_WHERE']
        print(f'Processing {file_name} into {table_name}')
        parse_query = f"""
            create or replace table {DATABASE}.{SCHEMA}.{table_name} as (
            SELECT AI_PARSE_DOCUMENT (
                TO_FILE('@"{DATABASE}"."{SCHEMA}"."{stage_name}"','{file_name}'),
                {{'mode': 'LAYOUT', 'page_split': false}}) AS content);
                """
        session.sql(parse_query).collect()
    
    print("Completed loading images to tables")
else:
    print('Already parsed historical images last time')

In [None]:
if REDO_LIST is not None:
    for file in REDO_LIST:
        stage_name = file.split('_',1)[0] + '_HISTORY_STAGE'
        file_name = file.split('_',1)[1] + '.jpeg'
        table_name = file
        
        print(f'Processing {file_name} into {table_name}')
        
        parse_query = session.sql(f"""
            SELECT AI_PARSE_DOCUMENT (
                TO_FILE('@"{DATABASE}"."{SCHEMA}"."{stage_name}"','{file_name}'),
                {{'mode': 'LAYOUT', 'page_split': false}})::object AS content
                """).to_pandas()

        session.write_pandas(
            df=parse_query,
            database=DATABASE,
            schema=SCHEMA,
            table_name=table_name,
            overwrite=True
        )

    print("Completed loading images to tables")

### Step 3: Union all contents into 1 table

In [None]:
process_count = 0
total_files = len(file_list)
print(total_files)
query = ""


for _,row in file_list.iterrows():
    file_name = row['FILE_NAME']
    table_name = row['TABLE_NAME']
    
    process_count +=1
    if process_count < total_files:
        query += f"""
            select '{table_name}' as file_source, content::string as content
            from {DATABASE}.{SCHEMA}.{table_name} union all  --{process_count}
        """
    else:
        query += f"""
            select '{table_name}' as file_source, content::string as content
            from {DATABASE}.{SCHEMA}.{table_name}  --{process_count}
        """

print(query)


In [None]:
session.sql(f"""
    create or replace table {TRANSFORM_INT}.{UNION_TABLE} as 
    (with unioned as ({query})
    
     select 
            file_source,
            try_parse_json(content) as content
        
            from unioned)
""")

In [None]:
%%sql -r dataframe_3
-- select top 10 * from TRANSFORM_DB.BASE.UNIONED_HISTORICAL_PDF where file_source = 'VIB_IMG_5282';
-- select top 10 * from TRANSFORM_DB.BASE.UNIONED_HISTORICAL_PDF where file_source = 'TCB_IMG_5080';
select count(*) from TRANSFORM_DB.BASE.UNIONED_HISTORICAL_PDF ;

### Step 3: Create TCB table out of PDFs

In [None]:
tcb_df = session.sql(f'''
with joined as (
    select *,
        'TCB' as from_where
    from TRANSFORM_DB.BASE.UNIONED_HISTORICAL_PDF  
    where file_source like 'TCB%'
),
cleansed as (
    select
        from_where,
        file_source,
        index,
        mod(index - 1, 11) as position_in_group,
        value,
        row_number() over(partition by file_source, position_in_group order by index) as record_group
    from joined,
    lateral split_to_table(content['content']::string,'|')
    order by file_source,index 
),
transformed_data AS (
    SELECT 
        file_source,
        record_group,
        MAX(CASE WHEN position_in_group = 1 THEN trim(value) END) AS transaction_date,
        MAX(CASE WHEN position_in_group = 2 THEN trim(value) END) AS remitter,
        MAX(CASE WHEN position_in_group = 3 THEN trim(value) END) AS remitter_bank,
        MAX(CASE WHEN position_in_group = 4 THEN trim(value) END) AS details,
        MAX(CASE WHEN position_in_group = 5 THEN trim(value) END) AS transaction_no,
        MAX(CASE WHEN position_in_group = 6 THEN trim(value) END) AS debit,
        MAX(CASE WHEN position_in_group = 7 THEN trim(value) END) AS credit,
        MAX(CASE WHEN position_in_group = 8 THEN trim(value) END) AS fee_interest,
        MAX(CASE WHEN position_in_group = 9 THEN trim(value) END) AS tax,
        MAX(CASE WHEN position_in_group = 10 THEN trim(value) END) AS balance_tmp
    from cleansed
    group by all
    
)

select 
    file_source,
    try_to_date(transaction_date, 'dd/mm/yyyy') AS transaction_date,
    remitter,
    remitter_bank,
    details,
    transaction_no,
    TRY_CAST(REPLACE(debit, ',', '') AS DECIMAL(18,2)) AS debit,
    TRY_CAST(REPLACE(credit, ',', '') AS DECIMAL(18,2)) AS credit,
    TRY_CAST(REPLACE(fee_interest, ',', '') AS DECIMAL(18,2)) AS fee_interest,
    TRY_CAST(REPLACE(tax, ',', '') AS DECIMAL(18,2)) AS tax,
    TRY_CAST(REPLACE(balance_tmp, ',', '') AS DECIMAL(18,2)) AS balance,
    row_number() over(order by file_source,record_group) AS record_sequence
from transformed_data
where balance is not null
order by file_source, record_group 

''').to_pandas()

session.write_pandas(
    df=tcb_df,
    database='TRANSFORM_DB',
    schema='INT',
    table_name='HISTORY_TCB_BS',
    auto_create_table=True,
    overwrite=True
)

### Step 4: Create VIB table out of PDFs

In [None]:
vib_df = session.sql(f'''
with joined as (
    select *,
        'VIB' as from_where
    from TRANSFORM_DB.BASE.UNIONED_HISTORICAL_PDF
    where file_source like 'VIB%'
),
cleansed as (
    select
        from_where,
        file_source,
        index,
        mod(index - 1, 10) as position_in_group,
        value,
        row_number() over(partition by file_source, position_in_group order by index) as record_group
    from joined,
    lateral split_to_table(content['content']::string,'|')
    -- having position_in_group > 0
    order by file_source,index 
)
-- select * from cleansed;
,transformed_data AS (
    SELECT 
        file_source,
        record_group,
        MAX(CASE WHEN position_in_group = 1 THEN trim(value) END) AS seq_no,
        MAX(CASE WHEN position_in_group = 2 THEN trim(value) END) AS transaction_date,
        MAX(CASE WHEN position_in_group = 3 THEN trim(value) END) AS effective_date,
        MAX(CASE WHEN position_in_group = 4 THEN trim(value) END) AS transaction_type,
        MAX(CASE WHEN position_in_group = 5 THEN trim(value) END) AS cheque_ref,
        MAX(CASE WHEN position_in_group = 6 THEN trim(value) END) AS debit,
        MAX(CASE WHEN position_in_group = 7 THEN trim(value) END) AS credit,
        MAX(CASE WHEN position_in_group = 8 THEN trim(value) END) AS balance,
        MAX(CASE WHEN position_in_group = 9 THEN trim(value) END) AS description,
    from cleansed
    group by all
    
)
-- select * from transformed_data order by file_source, record_group ;
, casting as (
select 
    file_source,
    record_group,
    row_number() over(order by record_group) AS record_sequence,
    try_to_decimal(seq_no) as seq_no, --removed the TK doi ung texts
    try_to_date(transaction_date, 'dd/mm/yyyy') AS transaction_date,
    try_to_date(effective_date, 'dd/mm/yyyy') AS effective_date,
    case when len(transaction_type) = 0 then null else transaction_type end as transaction_type,
    case when len(description) = 0 then null else description end as description,
    TRY_CAST(REPLACE(debit, ',', '') AS DECIMAL(18,2)) AS debit,
    TRY_CAST(REPLACE(credit, ',', '') AS DECIMAL(18,2)) AS credit,
    TRY_CAST(REPLACE(balance, ',', '') AS DECIMAL(18,2)) AS balance,
    
from transformed_data
-- where len(seq_no) = 10 
order by file_source, record_group
)
-- select * from casting;

, ffill as (
select
    file_source,
    record_group,
    record_sequence,
    interpolate_ffill(seq_no) over(partition by file_source order by record_group) as seq_no,
    interpolate_ffill(transaction_date) over(partition by file_source order by record_group)  as transaction_date,
    interpolate_ffill(effective_date) over(partition by file_source order by record_group)  as effective_date,
    interpolate_ffill(transaction_type) over(partition by file_source order by record_group)  as transaction_type,
    description,
    interpolate_ffill(debit) over(partition by file_source order by record_group)  as debit,
    interpolate_ffill(credit) over(partition by file_source order by record_group)  as credit,
    interpolate_ffill(balance) over(partition by file_source order by record_group)  as balance,
    
from casting 
order by file_source, seq_no 
)
-- select * from ffill;
select
    file_source,
    listagg(record_group, ',') as record_group_agg,
    listagg(record_sequence, ',') as record_sequence_agg,
    seq_no,
    transaction_date,
    effective_date,
    transaction_type,
    listagg(description, ' ') as description,
    debit,
    credit,
    balance
    
from ffill
where balance is not null --good filter condition, only lost last page 5306 which doesn't contain extra info anw
group by all
order by file_source, seq_no

''').to_pandas()

session.write_pandas(
    df=vib_df,
    database='TRANSFORM_DB',
    schema='INT',
    table_name='HISTORY_VIB_BS',
    auto_create_table=True,
    overwrite=True
)

### Step 5: Check Result

In [None]:
fc_dest = session.sql(f'''
    select '{VIB_STAGE}' as from_where, count(distinct file_source) as file_count 
    from TRANSFORM_DB.INT.HISTORY_VIB_BS
    union all
    select '{TCB_STAGE}' as from_where, count(distinct file_source) as file_count 
    from TRANSFORM_DB.INT.HISTORY_TCB_BS
''').to_pandas()
fc_dest

fc_target = file_list.groupby('FROM_WHERE').count().reindex()
fc_target

fc_dest[fc_dest['FROM_WHERE'] == VIB_STAGE]['FILE_COUNT'].iloc[0] == fc_target['FILE_NAME'].iloc[1]
print(f"{VIB_STAGE} is {fc_target['FILE_NAME'].iloc[1]}")

fc_dest[fc_dest['FROM_WHERE'] == TCB_STAGE]['FILE_COUNT'].iloc[0] == fc_target['FILE_NAME'].iloc[0]
print(f"{TCB_STAGE} is {fc_target['FILE_NAME'].iloc[0]}")

In [None]:
fc_dest = session.sql(f'''
    select distinct file_source::string as file_source, count(*) as row_count 
    from TRANSFORM_DB.INT.HISTORY_VIB_BS
    group by all
    union all
    select distinct file_source::string as file_source, count(*) as row_count 
    from TRANSFORM_DB.INT.HISTORY_TCB_BS
    group by all
''').to_df(['file_source','row_count'])
# fc_dest

fc_target = session.sql(f'''
        select file_source::string as file_source,
            manual_row_count
        from {DATABASE}.AUDIT_FILES.HISTORY_MANUAL_ROW_COUNT
        '''
).to_df(['file_source','manual_row_count'])
# fc_target


from snowflake.snowpark import functions as F
check_rc = fc_target.join(fc_dest, on='file_source', how='left', lsuffix='target', rsuffix='dest').sort('file_source')
check_rc = check_rc.withColumn('diff', F.lit(check_rc['manual_row_count'] - check_rc['row_count'])) 
# check_rc

session.write_pandas(check_rc.to_pandas(),database=TRANSFORM_DB,schema='AUDIT_FILES',table_name='CHECK_ROW_COUNT_HISTORY_BS',auto_create_table=True, overwrite=True)
