In [2]:
from typing import Optional

from google.api_core.client_options import ClientOptions
from google.cloud import documentai  # type: ignore
import pandas as pd
import numpy as np
from typing import Dict, List, Any
import os

In [None]:

# Set the environment variable
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/mohit/Documents/gen-lang-client-xxx.json"


In [None]:
project_id = ""
location = "us" # Format is "us" or "eu"
processor_id = "" # Create processor before running sample
file_path = "statements/test_statement.pdf"
mime_type = "application/pdf" # Refer to https://cloud.google.com/document-ai/docs/file-types for supported file types
# field_mask = "text,entities,pages.pageNumber"  # Optional. The fields to return in the Document object.
processor_version_id = "pretrained-bankstatement-v3.0-2022-05-16" # Optional. Processor version to use

In [5]:
# Parse staement using DocumentAI
def process_document_sample(
    project_id: str,
    location: str,
    processor_id: str,
    file_path: str,
    mime_type: str,
    field_mask: Optional[str] = None,
    processor_version_id: Optional[str] = None,
) -> None:
    # You must set the `api_endpoint` if you use a location other than "us".
    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")

    client = documentai.DocumentProcessorServiceClient(client_options=opts)

    if processor_version_id:
        # The full resource name of the processor version, e.g.:
        # `projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}`
        name = client.processor_version_path(
            project_id, location, processor_id, processor_version_id
        )
    else:
        # The full resource name of the processor, e.g.:
        # `projects/{project_id}/locations/{location}/processors/{processor_id}`
        name = client.processor_path(project_id, location, processor_id)

    # Read the file into memory
    with open(file_path, "rb") as image:
        image_content = image.read()

    # Load binary data
    raw_document = documentai.RawDocument(content=image_content, mime_type=mime_type)

    # For more information: https://cloud.google.com/document-ai/docs/reference/rest/v1/ProcessOptions
    # Optional: Additional configurations for processing.
    process_options = None

    request_params = {
    "name": name,
    "raw_document": raw_document,
    "field_mask": field_mask,
    }

    if process_options:
        request_params["process_options"] = process_options

    request = documentai.ProcessRequest(**request_params)

    result = client.process_document(request=request)

    # For a full list of `Document` object attributes, reference this page:
    # https://cloud.google.com/document-ai/docs/reference/rest/v1/Document
    document = result.document

    # Read the text recognition output from the processor
    print("The document contains the following text:")
    print(document.text)
    return document

In [6]:
# Extract results in specific format
def extract_entity_text(entity: documentai.Document.Entity, document_text: str) -> str:
    """Extract text from entity text segments"""
    if not entity.text_anchor or not entity.text_anchor.text_segments:
        return entity.mention_text or ""
    
    text_parts = []
    for segment in entity.text_anchor.text_segments:
        start_index = int(segment.start_index) if segment.start_index else 0
        end_index = int(segment.end_index) if segment.end_index else len(document_text)
        text_parts.append(document_text[start_index:end_index])
    
    return "".join(text_parts).strip()

def parse_bank_statement(document: documentai.Document) -> Dict[str, Any]:
    """Parse bank statement into structured data with corrected logic."""
    
    # Group entities by type
    entities_by_type = {}
    for entity in document.entities:
        entity_type = entity.type_
        if entity_type not in entities_by_type:
            entities_by_type[entity_type] = []
        entities_by_type[entity_type].append(entity)
    
    statement_info = {}

    # --- CORRECTED & COMBINED HEURISTIC FOR BANK NAME ---
    KNOWN_BANK_NAMES = ["citi", "capital one", "chase", "bank of america", "discover", "wells fargo", "american express"]
    
    bank_name_entity = entities_by_type.get('bank_name', [None])[0]
    
    # First, check if the model found a bank_name entity at all
    if bank_name_entity:
        extracted_bank_name = extract_entity_text(bank_name_entity, document.text)
        
        # Next, check if the extracted name seems incorrect (e.g., it's your name)
        if "MOHIT AGGARWAL" in extracted_bank_name:
            # If it's incorrect, search the full document text for a known bank name
            statement_info['bank_name'] = 'Not Found'  # Default
            for b_name in KNOWN_BANK_NAMES:
                if b_name in document.text.lower():
                    statement_info['bank_name'] = b_name.title()
                    break
        else:
            # The extracted name seems valid, so we'll use it
            statement_info['bank_name'] = extracted_bank_name
    else:
        # The model didn't find a bank_name, so fall back to searching the text
        statement_info['bank_name'] = 'Not Found'  # Default
        for b_name in KNOWN_BANK_NAMES:
            if b_name in document.text.lower():
                statement_info['bank_name'] = b_name.title()
                break

    # Hardcoding names is a good practical solution for a personal script
    statement_info['primary_client_name'] = "MOHIT AGGARWAL"
    statement_info['all_cardholders'] = ["MOHIT AGGARWAL", "HIMANI SOOD"]
    
    return statement_info, entities_by_type



def parse_table_items(entities_by_type: Dict, document_text: str, all_cardholders: List[str]) -> List[Dict[str, Any]]:
    """
    Parse table_item entities into transaction records, associating each with the correct cardholder.
    """
    if 'table_item' not in entities_by_type:
        return []
    
    transactions = []
    current_cardholder = "Unknown"  # Start with a default value
    
    print(f"Found {len(entities_by_type['table_item'])} table items")
    
    for i, table_item in enumerate(entities_by_type['table_item']):
        raw_text = extract_entity_text(table_item, document_text)
        
        # Check if the raw text indicates a change in the cardholder context.
        # The document processor often groups section headers with the first transaction.
        for name in all_cardholders:
            if name in raw_text:
                current_cardholder = name
                break
        
        transaction = {
            'item_id': i,
            'cardholder': current_cardholder,  # Add the tracked cardholder to the record
            'raw_text': raw_text
        }
        
        # Extract properties from each table item
        if table_item.properties:
            for prop in table_item.properties:
                prop_type = prop.type_
                prop_value = extract_entity_text(prop, document_text)
                transaction[prop_type] = prop_value
        
        transactions.append(transaction)
    
    return transactions

# Main execution code
def analyze_and_extract_transactions(document):
    """Main function to analyze and extract transactions"""
    
    print("=== BANK STATEMENT ANALYSIS ===")
    
    # Parse basic statement info
    statement_info, entities_by_type = parse_bank_statement(document)
    
    print("\n=== STATEMENT INFO ===")
    for key, value in statement_info.items():
        print(f"{key}: {value}")
        
    # Extract transactions, passing in the list of known cardholders
    print(f"\n=== EXTRACTING TRANSACTIONS ===")
    transactions = parse_table_items(entities_by_type, document.text, statement_info['all_cardholders'])
    
    if transactions:
        # Convert to DataFrame
        df = pd.DataFrame(transactions)
        
        # Add the bank_name from the statement_info to every transaction row
        df['bank_name'] = statement_info.get('bank_name', 'N/A')
        
        # Reorder columns to bring important info to the front
        desired_order = ['bank_name', 'cardholder', 'item_id'] + [col for col in df.columns if col not in ['bank_name', 'cardholder', 'item_id']]
        df = df[desired_order]

        print(f"\n=== TRANSACTION SUMMARY ===")
        print(f"Total transactions found: {len(df)}")
        print(f"Columns: {list(df.columns)}")
        
        # Display the data
        print(f"\n=== TRANSACTION DATA === ")
        display(df)
        
        # Save to CSV
        df.to_csv("bank_transactions_updated.csv", index=False)
        print(f"\n💾 Transactions saved to 'bank_transactions_updated.csv'")
        
        return df, statement_info
    else:
        print("❌ No transactions extracted")
        return None, statement_info

# To run the updated code, ensure you have the 'result' object from the API call
# and then execute the following line:
#
# df, info = analyze_and_extract_transactions(result.document)

In [9]:
# Pre-process extracted data
def preprocess_transactions(df: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocesses a transaction DataFrame by cleaning, coalescing, renaming,
    and filtering out records with no amount.

    Args:
        df: The input DataFrame with raw transaction data.

    Returns:
        A cleaned DataFrame with a simplified schema.
    """
    # Create a copy to avoid modifying the original DataFrame
    processed_df = df.copy()

    # 1. Remove 'item_id' and 'raw_text' columns
    processed_df.drop(columns=['item_id', 'raw_text'], inplace=True)

    # 2. Coalesce description columns
    processed_df['table_item/transaction_withdrawal_description'] = processed_df['table_item/transaction_withdrawal_description'].fillna(
        processed_df['table_item/transaction_deposit_description']
    )

    # 3. Coalesce amount columns
    processed_df['table_item/transaction_withdrawal'] = processed_df['table_item/transaction_withdrawal'].fillna(
        processed_df['table_item/transaction_deposit']
    )
    
    # Coalesce date columns for completeness
    processed_df['table_item/transaction_withdrawal_date'] = processed_df['table_item/transaction_withdrawal_date'].fillna(
        processed_df['table_item/transaction_deposit_date']
    )

    # 4. Rename the primary columns
    rename_map = {
        'table_item/transaction_withdrawal_date': 'transaction_date',
        'table_item/transaction_withdrawal_description': 'description',
        'table_item/transaction_withdrawal': 'amount'
    }
    processed_df.rename(columns=rename_map, inplace=True)

    # 5. Drop the now-redundant original deposit columns
    processed_df.drop(columns=[
        'table_item/transaction_deposit_date',
        'table_item/transaction_deposit_description',
        'table_item/transaction_deposit'
    ], inplace=True)

    # 6. Drop records where the final 'amount' is missing
    processed_df.dropna(subset=['amount'], inplace=True)

    # 7. Drop records where the amount is $0.00
    zero_values = ['$0.00', '+$0.00']
    processed_df = processed_df[~processed_df['amount'].isin(zero_values)]

    # 8. Standardize transaction_date format
    def standardize_date(date_str):
        if pd.isna(date_str) or date_str == '':
            return None
        
        date_str = str(date_str).strip()
        current_year = pd.Timestamp.now().year
        
        # Try different date formats
        date_formats = [
            '%m/%d/%Y',    # MM/DD/YYYY
            '%m-%d-%Y',    # MM-DD-YYYY  
            '%Y-%m-%d',    # YYYY-MM-DD
            '%b %d',       # Jun 25, Jul 7 (abbreviated month, no year)
            '%B %d',       # June 25, July 7 (full month, no year)
            '%b %d, %Y',   # Jun 25, 2024 (abbreviated month with year)
            '%B %d, %Y',   # June 25, 2024 (full month with year)
            '%d %b',       # 25 Jun (day first, abbreviated month)
            '%d %B',       # 25 June (day first, full month)
            '%d %b %Y',    # 25 Jun 2024 (day first with year)
            '%d %B %Y',    # 25 June 2024 (day first with year)
            '%m/%d',       # MM/DD (no year)
            '%m-%d'        # MM-DD (no year)
        ]
        
        for fmt in date_formats:
            try:
                if fmt in ['%m/%d', '%m-%d', '%b %d', '%B %d', '%d %b', '%d %B']:
                    # Add current year for formats without year
                    if fmt in ['%m/%d', '%m-%d']:
                        parsed_date = pd.to_datetime(f"{current_year}/{date_str}", format=f'%Y/{fmt}')
                    else:
                        # For month name formats, append current year
                        parsed_date = pd.to_datetime(f"{date_str} {current_year}", format=f'{fmt} %Y')
                else:
                    parsed_date = pd.to_datetime(date_str, format=fmt)
                
                # Return as YYYY-MM-DD string format
                return parsed_date.strftime('%Y-%m-%d')
            except (ValueError, TypeError):
                continue
        
        # Last resort - let pandas try to parse
        try:
            parsed_date = pd.to_datetime(date_str, errors='coerce')
            if pd.notna(parsed_date):
                return parsed_date.strftime('%Y-%m-%d')
        except:
            pass
        
        return None
    
    # Apply date standardization
    processed_df['transaction_date'] = processed_df['transaction_date'].apply(standardize_date)
    
    # Drop records with invalid dates
    processed_df.dropna(subset=['transaction_date'], inplace=True)

    return processed_df

In [None]:
# MAIN BATCH PROCESSING LOGIC
# ==============================================================================

def main():
    """
    Processes all PDF statements in a folder, combines the cleaned data,
    and exports it to a single CSV file.
    """
    # --- CONFIGURATION ---
    project_id = "gen-lang-client-0299904904"
    location = "us"
    processor_id = "bf2685d686b2d8db"
    # ❗ IMPORTANT: Set this to the path of your statements folder
    statements_folder = "statements" 
    temp_folder = "temp"
    output_csv_path = os.path.join(temp_folder, "data.csv")

    # Create temp folder if it doesn't exist
    os.makedirs(temp_folder, exist_ok=True)
    print(f"Temp folder '{temp_folder}' ready for output")
    
    # --- PROCESSING ---
    all_cleaned_dfs = []

    print(f"🚀 Starting batch processing for files in '{statements_folder}'...")

    # Get a list of all files in the directory
    try:
        file_names = os.listdir(statements_folder)
    except FileNotFoundError:
        print(f"❌ Error: The directory '{statements_folder}' was not found. Please check the path.")
        return

    for file_name in file_names:
        # Process only PDF files, skipping others (like .DS_Store on macOS)
        if file_name.lower().endswith(".pdf"):
            file_path = os.path.join(statements_folder, file_name)
            print(f"\n📄 Processing file: {file_name}")

            # 1. Call Document AI API
            document = process_document_sample(
                project_id=project_id,
                location=location,
                processor_id=processor_id,
                file_path=file_path,
                mime_type="application/pdf"
                # Note: processor_version_id is omitted to use the latest default version
            )

            if document:
                # 2. Extract transactions from the result
                df, info = analyze_and_extract_transactions(document)

                if df is not None and not df.empty:
                    # 3. Preprocess and clean the DataFrame
                    cleaned_df = preprocess_transactions(df)
                    all_cleaned_dfs.append(cleaned_df)
                    print(f"✅ Successfully cleaned and added {len(cleaned_df)} transactions from {file_name}.")
                else:
                    print(f"⚠️ No transactions were extracted from {file_name}.")
    
    # --- FINAL EXPORT ---
    if all_cleaned_dfs:
        # Combine all the individual cleaned DataFrames into one
        final_df = pd.concat(all_cleaned_dfs, ignore_index=True)
        
        # Export the final combined DataFrame to a CSV file
        final_df.to_csv(output_csv_path, index=False)
        
        print("\n===================================================")
        print(f"🎉 Batch processing complete!")
        print(f"Total transactions processed: {len(final_df)}")
        print(f"💾 Combined data saved to '{output_csv_path}'")
        print("===================================================")
    else:
        print("\n⏹️ No transactions were processed or found in any of the files.")

# --- RUN THE SCRIPT ---
if __name__ == "__main__":
    main()