
General DEPREC Code - Shared Utility Functions

This module contains common utility functions used across the DEPREC reconciliation system.
These functions handle data cleaning, type conversion, and common data transformations.

## ***Section 1 - Imports and Configuration***


In [1]:


# Standard library imports
import os
import re
import math
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime

# Third-party imports
import pandas as pd
import numpy as np
from rapidfuzz import process, fuzz

# Telegram (optional - only if sending messages)
from telegram import Bot
from telegram.constants import ParseMode


## **2) Data Import & Cleanup - Getting to Central Bank Statement DataFrame**

This section handles the complete pipeline from raw bank statement files to a unified `central_df`:

### **Steps:**
1. **Import raw bank statements** - Automatically loads all CSV/Excel files from directory (excludes GWID/Rubric files)
   - Cleans numeric columns (removes `$`, `%`, commas)
   - Converts date columns to datetime format
   - Removes "Beginning balance" rows
   - Drops blank rows/columns and "Unnamed" columns

2. **Detect bank type** - Identifies whether each file is Chase, BofA, or Unknown based on column structure
   - Chase: `['Details', 'Posting Date', 'Description', 'Amount', 'Type']`
   - BofA: `['Date', 'Description', 'Amount']`

3. **Split by bank & validate** - Separates files by bank type, concatenates each bank's files
   - Creates `chase_central` and `bofa_central` DataFrames
   - Validates row counts, column structure, date ranges, and amount totals

4. **Standardize BofA format** - Transforms `bofa_central` to match Chase column structure
   - Maps `Date` → `Posting Date`
   - Derives `Details` from amount sign (negative = DEBIT, positive = CREDIT)
   - Adds `Type` column with `ACH_` prefix

5. **Create unified central_df** - Concatenates `chase_central` + `bofa_central_standardized`
   - **Output:** Single DataFrame (`central_df`) with all bank statements in standardized format
   - Columns: `['Details', 'Posting Date', 'Description', 'Amount', 'Type']`

**Note:** AI-powered bank detection functions are defined but not yet integrated into the main pipeline.

---


In [2]:
import os
import pandas as pd
import numpy as np
from typing import Dict, Optional, List

def _drop_fully_blank_and_unnamed(df: pd.DataFrame) -> pd.DataFrame:
    """
    Remove columns that are either all blank/NaN or whose names are Unnamed: ... (case-insensitive).
    """
    def is_unnamed(col):
        col_str = str(col).strip().lower()
        return col_str.startswith("unnamed:") or col_str == ''
    # First drop full blank columns
    df = df.dropna(axis=1, how='all')
    # Then drop columns that are unnamed
    cols_to_drop = [col for col in df.columns if is_unnamed(col)]
    df = df.drop(columns=cols_to_drop)
    return df

def _drop_fully_blank_rows(df: pd.DataFrame) -> pd.DataFrame:
    """
    Removes rows that are entirely blank/NaN.
    """
    return df.dropna(axis=0, how='all')

def _remove_beginning_balance_rows(df: pd.DataFrame) -> pd.DataFrame:
    """
    Remove rows that contain 'Beginning balance' in any column (case-insensitive).
    
    Args:
        df: DataFrame to clean
        
    Returns:
        DataFrame with 'Beginning balance' rows removed
    """
    df_cleaned = df.copy()
    
    # Check each row to see if any column contains 'Beginning balance'
    mask = df_cleaned.astype(str).apply(
        lambda row: any('beginning balance' in str(val).lower() for val in row), 
        axis=1
    )
    
    # Remove rows where mask is True
    rows_removed = mask.sum()
    if rows_removed > 0:
        df_cleaned = df_cleaned[~mask]
        print(f"  Removed {rows_removed} row(s) containing 'Beginning balance'")
    
    return df_cleaned

def _clean_numeric_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean numeric columns by removing $, %, and commas, then convert to numeric.
    
    Args:
        df: DataFrame to clean
        
    Returns:
        DataFrame with cleaned numeric columns
    """
    df_cleaned = df.copy()
    
    for col in df_cleaned.columns:
        # Check if column is string/object type (might contain numeric data with formatting)
        if df_cleaned[col].dtype == 'object' or df_cleaned[col].dtype == 'string':
            # Try to detect if it looks numeric (contains $, %, or numbers with commas)
            sample_values = df_cleaned[col].dropna().astype(str).head(10)
            looks_numeric = any(
                '$' in str(val) or '%' in str(val) or ',' in str(val) 
                for val in sample_values
            )
            
            if looks_numeric:
                # Remove $, %, and commas, then convert to numeric
                df_cleaned[col] = (
                    df_cleaned[col]
                    .astype(str)
                    .str.replace('$', '', regex=False)
                    .str.replace('%', '', regex=False)
                    .str.replace(',', '', regex=False)
                    .str.strip()
                )
                # Convert to numeric, coercing errors to NaN
                df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce')
    
    return df_cleaned

def _clean_date_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convert columns with 'Date' in the name to datetime format.
    
    Args:
        df: DataFrame to clean
        
    Returns:
        DataFrame with date columns converted to datetime
    """
    df_cleaned = df.copy()
    
    for col in df_cleaned.columns:
        # Check if column name contains 'date' (case-insensitive)
        if 'date' in str(col).lower():
            try:
                # Convert to datetime, coercing errors to NaT
                df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
            except Exception as e:
                # If conversion fails, leave as is and print warning
                print(f"Warning: Could not convert column '{col}' to datetime: {e}")
                continue
    
    return df_cleaned

def bank_statements_retriever(directory: Optional[str] = None) -> Dict[str, pd.DataFrame]:
    """
    Scan a directory for Excel (.xlsx, .xls) and CSV (.csv) files.
    Excludes files containing 'GWID' or 'Rubric' in the filename (case-insensitive).
    Returns a dict mapping filename to DataFrame.
    
    During import, automatically cleans:
    - Numeric columns: removes $, %, and commas, then converts to numeric
    - Date columns: converts columns with 'Date' in name to datetime format
    - Removes rows containing 'Beginning balance' in any column

    Args:
        directory: Path to the folder to scan. Defaults to current working directory.

    Returns:
        Dict[str, pd.DataFrame]: Keys are filenames -> DataFrame values
    """
    directory = directory or os.getcwd()
    all_files: List[str] = os.listdir(directory)

    # Identify candidate files (exclude GWID and Rubric files)
    candidates: List[str] = []
    for file_name in all_files:
        lower_name = file_name.lower()
        # Check if file is Excel or CSV
        is_excel_or_csv = (
            lower_name.endswith(".csv")
            or lower_name.endswith(".xlsx")
            or lower_name.endswith(".xls")
        )
        # Exclude files with 'gwid' or 'rubric' in the name
        has_gwid = "gwid" in lower_name
        has_rubric = "rubric" in lower_name
        
        if is_excel_or_csv and not has_gwid and not has_rubric:
            candidates.append(file_name)

    # Load dataframes with filename as key
    dataframes: Dict[str, pd.DataFrame] = {}
    for file_name in candidates:
        file_path = os.path.join(directory, file_name)
        ext = os.path.splitext(file_name)[1].lower()

        try:
            if ext == ".csv":
                df = pd.read_csv(file_path)
            elif ext in (".xlsx", ".xls"):
                df = pd.read_excel(file_path)
            else:
                continue  # Should not happen due to filter, but guard anyway

            # Clean numeric columns (remove $, %, commas and convert to numeric)
            df = _clean_numeric_columns(df)
            
            # Clean date columns (convert columns with 'Date' in name to datetime)
            df = _clean_date_columns(df)
            
            # Remove rows containing 'Beginning balance'
            df = _remove_beginning_balance_rows(df)
            
            # Clean fully blank columns and "Unnamed" columns
            df = _drop_fully_blank_and_unnamed(df)
            # Drop fully blank rows
            df = _drop_fully_blank_rows(df)

            # If after dropping, the DataFrame is empty (no rows), skip adding
            if df.shape[0] == 0:
                continue

            # Use filename as key (not modified)
            dataframes[file_name] = df
            
        except Exception as exc:
            print(f"Warning: Failed to read file '{file_name}': {exc}")
            continue

    return dataframes

x = bank_statements_retriever()
for name, df in x.items():
    print(df.columns)

  Removed 1 row(s) containing 'Beginning balance'
Index(['Gateway ID', 'Gateway', 'Provider Name', 'Currency',
       'Global Monthly Cap', 'Transactions', 'Orders',
       'Gross Approved Revenue', 'Refund - Partial #', 'Refund - Full #',
       'Void Refund Amount', 'Total $', 'Chargeback #', 'Chargeback Lost Rev',
       'Chargeback %', 'Decline #', 'Decline %', 'Pending #', 'Pending $',
       'Customer Approval %', 'Affiliate Breakdown'],
      dtype='object')
Index(['details', 'postingdate', 'description', 'amount', 'type'], dtype='object')
Index(['Details', 'Posting Date', 'Description', 'Amount', 'Type'], dtype='object')
Index(['Details', 'Posting Date', 'Description', 'Amount', 'Type'], dtype='object')
Index(['Details', 'Posting Date', 'Description', 'Amount', 'Type'], dtype='object')
Index(['Details', 'Posting Date', 'Description', 'Amount', 'Type'], dtype='object')
Index(['Details', 'Posting Date', 'Description', 'Amount', 'Type'], dtype='object')
Index(['Unnamed 0', 'Gatewa

In [3]:

def bank_selector(dataframes_dict: Dict[str, pd.DataFrame]) -> Dict[str, str]:
    """
    Bank Selector Function
    
    This function analyzes a dictionary of DataFrames (where keys are filenames and values are the DataFrames)
    to identify which bank each file belongs to based on column structure.
    
    For each DataFrame:
    - Cleans column names (lowercase, strip whitespace)
    - Checks if columns match Chase bank statement format: ['details', 'posting date', 'description', 'amount', 'type']
    - Checks if columns match BofA bank statement format: ['date', 'description', 'amount']
    - Returns a dictionary mapping filename to bank name
    
    Args:
        dataframes_dict: Dictionary where keys are filenames and values are pandas DataFrames
    
    Returns:
        Dict[str, str]: Dictionary mapping filename -> bank name (e.g., "Chase", "BofA", or "Unknown")
    """
    bank_mapping: Dict[str, str] = {}
    
    # Define the expected Chase columns (cleaned and lowercased from: 'Details', 'Posting Date', 'Description', 'Amount', 'Type')
    chase_columns = ['details', 'posting date', 'description', 'amount', 'type']
    
    # Define the expected BofA columns (cleaned and lowercased from: 'Date', 'Description', 'Amount')
    bofa_columns = ['date', 'description', 'amount']
    
    for filename, df in dataframes_dict.items():
        # Clean column names: lowercase and strip whitespace
        cleaned_columns = [col.strip().lower() for col in df.columns]
        
        # Check if columns match Chase format
        if sorted(cleaned_columns) == sorted(chase_columns):
            bank_mapping[filename] = "Chase"
        # Check if columns match BofA format
        elif sorted(cleaned_columns) == sorted(bofa_columns):
            bank_mapping[filename] = "BofA"
        else:
            bank_mapping[filename] = "Unknown"
    
    return bank_mapping

dataframe_files = bank_selector(x)

In [4]:
def filter_and_split_banks(bank_mapping: Dict[str, str], dataframes_dict: Dict[str, pd.DataFrame]) -> Dict[str, any]:
    """
    Filter and Split Banks Function
    
    This function takes a dictionary mapping filenames to bank names and the original DataFrames dictionary, and:
    - Filters to only keep "Chase" and "Bank of America" (or "BofA") entries
    - Splits the results into two separate dictionaries: one for Chase, one for BofA
    - Concatenates all Chase DataFrames into a single DataFrame (chase_central)
    - Concatenates all BofA DataFrames into a single DataFrame (bofa_central)
    - Validates with robust checks:
        * Row counts match
        * Column names match
        * Date columns: min and max values match
        * Amount columns: totals and standard deviations match
    
    Args:
        bank_mapping: Dictionary where keys are filenames and values are bank names
        dataframes_dict: Dictionary where keys are filenames and values are pandas DataFrames
    
    Returns:
        Dict[str, any]: Dictionary with four keys:
            - "Chase": Dictionary of Chase filenames -> bank name
            - "BofA": Dictionary of Bank of America filenames -> bank name
            - "chase_central": Concatenated DataFrame of all Chase accounts
            - "bofa_central": Concatenated DataFrame of all BofA accounts
    
    Raises:
        ValueError: If any validation checks fail, specifying which checks failed
    """
    chase_files = {}
    bofa_files = {}
    chase_dataframes = []
    bofa_dataframes = []
    
    # Track row counts for validation
    chase_individual_rows = 0
    bofa_individual_rows = 0
    
    for filename, bank_name in bank_mapping.items():
        # Normalize bank name for comparison (case-insensitive)
        bank_lower = bank_name.lower().strip()
        
        if bank_lower == "chase":
            chase_files[filename] = bank_name
            # Add the DataFrame to the list for concatenation
            if filename in dataframes_dict:
                df = dataframes_dict[filename]
                chase_dataframes.append(df)
                chase_individual_rows += len(df)
        elif bank_lower in ["bofa", "bank of america", "boa"]:
            bofa_files[filename] = bank_name
            # Add the DataFrame to the list for concatenation
            if filename in dataframes_dict:
                df = dataframes_dict[filename]
                bofa_dataframes.append(df)
                bofa_individual_rows += len(df)
    
    # Concatenate all Chase DataFrames
    chase_central = pd.concat(chase_dataframes, ignore_index=True) if chase_dataframes else pd.DataFrame()
    
    # Concatenate all BofA DataFrames
    bofa_central = pd.concat(bofa_dataframes, ignore_index=True) if bofa_dataframes else pd.DataFrame()
    
    # Robust validation function
    def validate_concatenation(individual_dfs: List[pd.DataFrame], central_df: pd.DataFrame, bank_name: str) -> List[str]:
        """
        Validate concatenation with multiple checks.
        Returns list of failed check messages.
        """
        failed_checks = []
        passed_checks = []
        
        if not individual_dfs or central_df.empty:
            print(f"⚠ {bank_name}: No data to validate")
            return failed_checks  # Skip validation if empty
        
        print(f"\n=== Validating {bank_name} ===")
        
        # Check 1: Row counts
        individual_rows = sum(len(df) for df in individual_dfs)
        central_rows = len(central_df)
        if individual_rows != central_rows:
            failed_checks.append(f"Row count mismatch: Individual total = {individual_rows}, Central = {central_rows}")
        else:
            passed_checks.append(f"✓ Row count: {individual_rows} rows match")
            print(f"✓ Row count: {individual_rows} rows match")
        
        # Check 2: Column names
        # Get all unique column names from individual DataFrames
        individual_columns = set()
        for df in individual_dfs:
            individual_columns.update(df.columns)
        central_columns = set(central_df.columns)
        
        if individual_columns != central_columns:
            missing_in_central = individual_columns - central_columns
            extra_in_central = central_columns - individual_columns
            if missing_in_central:
                failed_checks.append(f"Column names mismatch: Missing in central = {missing_in_central}")
            if extra_in_central:
                failed_checks.append(f"Column names mismatch: Extra in central = {extra_in_central}")
        else:
            passed_checks.append(f"✓ Column names: {len(individual_columns)} columns match")
            print(f"✓ Column names: {len(individual_columns)} columns match")
        
        # Check 3: Date columns - min and max
        date_columns = [col for col in central_df.columns if 'date' in str(col).lower()]
        if date_columns:
            for date_col in date_columns:
                # Get min and max from individual DataFrames
                individual_mins = []
                individual_maxs = []
                for df in individual_dfs:
                    if date_col in df.columns:
                        col_data = pd.to_datetime(df[date_col], errors='coerce').dropna()
                        if not col_data.empty:
                            individual_mins.append(col_data.min())
                            individual_maxs.append(col_data.max())
                
                if individual_mins and individual_maxs:
                    individual_min = min(individual_mins)
                    individual_max = max(individual_maxs)
                    central_min = pd.to_datetime(central_df[date_col], errors='coerce').dropna().min()
                    central_max = pd.to_datetime(central_df[date_col], errors='coerce').dropna().max()
                    
                    if pd.isna(central_min) or pd.isna(central_max):
                        failed_checks.append(f"Date column '{date_col}': Central has no valid dates")
                    else:
                        min_match = individual_min == central_min
                        max_match = individual_max == central_max
                        
                        if min_match and max_match:
                            passed_checks.append(f"✓ Date column '{date_col}': Min ({individual_min}) and Max ({individual_max}) match")
                            print(f"✓ Date column '{date_col}': Min ({individual_min}) and Max ({individual_max}) match")
                        else:
                            if not min_match:
                                failed_checks.append(f"Date column '{date_col}': Min mismatch - Individual = {individual_min}, Central = {central_min}")
                            if not max_match:
                                failed_checks.append(f"Date column '{date_col}': Max mismatch - Individual = {individual_max}, Central = {central_max}")
        else:
            print(f"ℹ No date columns found to validate")
        
        # Check 4: Amount columns - totals and standard deviations
        amount_columns = [col for col in central_df.columns if 'amount' in str(col).lower()]
        if amount_columns:
            for amount_col in amount_columns:
                # Get totals and std dev from individual DataFrames
                individual_totals = []
                individual_stds = []
                individual_values = []
                
                for df in individual_dfs:
                    if amount_col in df.columns:
                        col_data = pd.to_numeric(df[amount_col], errors='coerce').dropna()
                        if not col_data.empty:
                            individual_totals.append(col_data.sum())
                            individual_stds.append(col_data.std())
                            individual_values.extend(col_data.tolist())
                
                if individual_totals:
                    individual_total = sum(individual_totals)
                    # Calculate overall std from all individual values combined
                    individual_std = pd.Series(individual_values).std() if individual_values else 0
                    
                    central_values = pd.to_numeric(central_df[amount_col], errors='coerce').dropna()
                    if central_values.empty:
                        failed_checks.append(f"Amount column '{amount_col}': Central has no valid numeric values")
                    else:
                        central_total = central_values.sum()
                        central_std = central_values.std()
                        
                        # Use small tolerance for floating point comparison
                        tolerance = 0.01
                        total_match = abs(individual_total - central_total) <= tolerance
                        std_match = abs(individual_std - central_std) <= tolerance
                        
                        if total_match and std_match:
                            passed_checks.append(f"✓ Amount column '{amount_col}': Total ({individual_total:.2f}) and Std Dev ({individual_std:.2f}) match")
                            print(f"✓ Amount column '{amount_col}': Total ({individual_total:.2f}) and Std Dev ({individual_std:.2f}) match")
                        else:
                            if not total_match:
                                failed_checks.append(f"Amount column '{amount_col}': Total mismatch - Individual = {individual_total:.2f}, Central = {central_total:.2f}")
                            if not std_match:
                                failed_checks.append(f"Amount column '{amount_col}': Std deviation mismatch - Individual = {individual_std:.2f}, Central = {central_std:.2f}")
        else:
            print(f"ℹ No amount columns found to validate")
        
        return failed_checks
    
    # Validate Chase
    chase_failed = validate_concatenation(chase_dataframes, chase_central, "Chase")
    if chase_failed:
        error_msg = "Mismatch between Chase individual files and chase_central:\n" + "\n".join(f"  - {check}" for check in chase_failed)
        raise ValueError(error_msg)
    else:
        print(f"✓ All Chase validation checks passed!\n")
    
    # Validate BofA
    bofa_failed = validate_concatenation(bofa_dataframes, bofa_central, "BofA")
    if bofa_failed:
        error_msg = "Mismatch between BofA individual files and bofa_central:\n" + "\n".join(f"  - {check}" for check in bofa_failed)
        raise ValueError(error_msg)
    else:
        print(f"✓ All BofA validation checks passed!\n")
    
    return {
        "Chase": chase_files,
        "BofA": bofa_files,
        "chase_central": chase_central,
        "bofa_central": bofa_central
    }

cc = filter_and_split_banks(dataframe_files, x)


=== Validating Chase ===
✓ Row count: 2247 rows match
✓ Column names: 5 columns match
✓ Date column 'Posting Date': Min (2025-09-02 00:00:00) and Max (2025-10-06 00:00:00) match
✓ Amount column 'Amount': Total (840093.47) and Std Dev (321.91) match
✓ All Chase validation checks passed!


=== Validating BofA ===
✓ Row count: 380 rows match
✓ Column names: 3 columns match
✓ Date column 'Date': Min (2025-07-02 00:00:00) and Max (2025-10-29 00:00:00) match
✓ Amount column 'Amount': Total (2851.14) and Std Dev (153.14) match
✓ All BofA validation checks passed!



In [5]:
cc["chase_central"]

Unnamed: 0,Details,Posting Date,Description,Amount,Type
0,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,25.28,ACH_CREDIT
1,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,100.71,ACH_CREDIT
2,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,175.30,ACH_CREDIT
3,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,295.55,ACH_CREDIT
4,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,358.75,ACH_CREDIT
...,...,...,...,...,...
2242,CREDIT,2025-09-02,ORIG CO NAME:EMS ORIG ID:12...,770.62,ACH_CREDIT
2243,CREDIT,2025-09-02,ORIG CO NAME:EMS ORIG ID:12...,804.69,ACH_CREDIT
2244,CREDIT,2025-09-02,ORIG CO NAME:BANKCARD DEP ORIG ID:20...,1567.69,ACH_CREDIT
2245,CREDIT,2025-09-02,ORIG CO NAME:BANKCARD DEP ORIG ID:20...,1606.47,ACH_CREDIT


In [6]:
cc["bofa_central"]

Unnamed: 0,Date,Description,Amount
0,2025-09-02,BANKCARD DES:MTOT DISC ID:422899490017607 INDN...,-402.02
1,2025-07-16,LQ MERCHANT DES:ADJUSTMENT ID:584600000388686 ...,-335.00
2,2025-07-16,LQ MERCHANT DES:ADJUSTMENT ID:584600000388736 ...,-335.00
3,2025-07-02,LQ MERCHANT DES:MERCH FEES ID:584600000388744 ...,-310.00
4,2025-07-02,LQ MERCHANT DES:MERCH FEES ID:584600000388686 ...,-310.00
...,...,...,...
375,2025-10-03,RETURN OF POSTED CHECK / ITEM (RECEIVED ON 10-...,310.00
376,2025-10-17,BANKCARD DES:MTOT DEP ID:422899490017605 INDN:...,368.94
377,2025-10-17,BANKCARD DES:MTOT DEP ID:422899490017608 INDN:...,427.28
378,2025-07-07,WISE US INC DES:SMART IMPA ID:SMART IMPA INDN:...,1376.90


In [7]:
def bofa_standardizer(bofa_central: Optional[pd.DataFrame] = None, chase_central: Optional[pd.DataFrame] = None) -> pd.DataFrame:
    """
    BofA Standardizer Function
    
    This function transforms the bofa_central DataFrame to match the format and structure of chase_central.
    Column mappings:
    - Details: Based on Amount column (negative = DEBIT, positive = CREDIT)
    - Posting Date: From Date column in bofa
    - Description: Same as Description column
    - Amount: Same as Amount column
    - Type: From Details column with "ACH_" prefix (e.g., ACH_DEBIT, ACH_CREDIT)
    
    The output DataFrame will have the exact same column names and order as chase_central.
    Includes robust validation checks to verify all transformations.
    
    Args:
        bofa_central: BofA central DataFrame (defaults to global bofa_central if not provided)
        chase_central: Chase central DataFrame (defaults to global chase_central if not provided)
    
    Returns:
        pd.DataFrame: Standardized BofA DataFrame matching Chase format with same column names
    
    Raises:
        ValueError: If validation checks fail, listing which checks failed
    """
    # Access from global scope if not provided
    if bofa_central is None:
        try:
            bofa_central = globals().get('cc', {}).get('bofa_central')
            if bofa_central is None:
                raise ValueError("bofa_central not found. Please provide it as parameter or ensure 'cc' dictionary exists.")
        except:
            raise ValueError("bofa_central not found. Please provide it as parameter.")
    
    if chase_central is None:
        try:
            chase_central = globals().get('cc', {}).get('chase_central')
            if chase_central is None:
                raise ValueError("chase_central not found. Please provide it as parameter or ensure 'cc' dictionary exists.")
        except:
            raise ValueError("chase_central not found. Please provide it as parameter.")
    
    if bofa_central.empty:
        raise ValueError("bofa_central is empty, cannot standardize")
    
    if chase_central.empty:
        raise ValueError("chase_central is empty, cannot use as template")
    
    # Get column names from chase_central (use exact names and order)
    chase_columns = list(chase_central.columns)
    
    # Find column mappings in chase_central (case-insensitive matching)
    details_col = None
    posting_date_col = None
    description_col = None
    amount_col_chase = None
    type_col = None
    
    for col in chase_columns:
        col_lower = col.lower().strip()
        if col_lower == 'details':
            details_col = col
        elif 'posting' in col_lower and 'date' in col_lower:
            posting_date_col = col
        elif col_lower == 'description':
            description_col = col
        elif col_lower == 'amount':
            amount_col_chase = col
        elif col_lower == 'type':
            type_col = col
    
    # Validate we found all required Chase columns
    missing_chase_cols = []
    if not details_col:
        missing_chase_cols.append("Details")
    if not posting_date_col:
        missing_chase_cols.append("Posting Date")
    if not description_col:
        missing_chase_cols.append("Description")
    if not amount_col_chase:
        missing_chase_cols.append("Amount")
    if not type_col:
        missing_chase_cols.append("Type")
    
    if missing_chase_cols:
        raise ValueError(f"Missing required columns in chase_central: {missing_chase_cols}")
    
    # Get bofa column names (case-insensitive)
    bofa_columns_lower = [col.lower().strip() for col in bofa_central.columns]
    bofa_date_col = None
    bofa_description_col = None
    bofa_amount_col = None
    
    for col in bofa_central.columns:
        col_lower = col.lower().strip()
        if col_lower == 'date':
            bofa_date_col = col
        elif col_lower == 'description':
            bofa_description_col = col
        elif col_lower == 'amount':
            bofa_amount_col = col
    
    # Validate required columns exist in bofa
    missing_bofa_cols = []
    if not bofa_date_col:
        missing_bofa_cols.append("Date")
    if not bofa_description_col:
        missing_bofa_cols.append("Description")
    if not bofa_amount_col:
        missing_bofa_cols.append("Amount")
    
    if missing_bofa_cols:
        raise ValueError(f"Missing required columns in bofa_central: {missing_bofa_cols}")
    
    # Create new DataFrame with exact same structure as chase_central
    standardized_df = pd.DataFrame(index=bofa_central.index)
    
    # Map columns in the same order as chase_central
    for col in chase_columns:
        if col == details_col:
            # Details column: Based on Amount (negative = DEBIT, positive = CREDIT)
            standardized_df[col] = bofa_central[bofa_amount_col].apply(
                lambda x: "DEBIT" if pd.notna(x) and float(x) < 0 else "CREDIT" if pd.notna(x) else None
            )
        elif col == posting_date_col:
            # Posting Date column: From Date column
            standardized_df[col] = bofa_central[bofa_date_col]
        elif col == description_col:
            # Description column: Same as Description
            standardized_df[col] = bofa_central[bofa_description_col]
        elif col == amount_col_chase:
            # Amount column: Same as Amount
            standardized_df[col] = bofa_central[bofa_amount_col]
        elif col == type_col:
            # Type column: Based on Details with "ACH_" prefix
            details_values = bofa_central[bofa_amount_col].apply(
                lambda x: "DEBIT" if pd.notna(x) and float(x) < 0 else "CREDIT" if pd.notna(x) else None
            )
            standardized_df[col] = details_values.apply(
                lambda x: f"ACH_{x}" if pd.notna(x) else None
            )
        else:
            # For any other columns in chase_central, fill with NaN
            standardized_df[col] = None
    
    # Ensure column order matches chase_central exactly
    standardized_df = standardized_df[chase_columns]
    
    # Reset index to match chase format
    standardized_df = standardized_df.reset_index(drop=True)
    
    # Robust validation checks
    print("\n=== Validating BofA Standardization ===")
    failed_checks = []
    
    # Check 1: Row count matches
    if len(standardized_df) != len(bofa_central):
        failed_checks.append(f"Row count mismatch: Original = {len(bofa_central)}, Standardized = {len(standardized_df)}")
    else:
        print(f"✓ Row count: {len(standardized_df)} rows match")
    
    # Check 2: Column names and order match chase_central exactly
    if list(standardized_df.columns) != chase_columns:
        failed_checks.append(f"Column names/order mismatch: Expected {chase_columns}, Got {list(standardized_df.columns)}")
    else:
        print(f"✓ Column structure: Matches chase_central exactly ({len(chase_columns)} columns)")
    
    # Check 3: Details column values are DEBIT or CREDIT
    details_values = standardized_df[details_col].dropna().unique()
    valid_details = set(['DEBIT', 'CREDIT'])
    if not set(details_values).issubset(valid_details):
        invalid = set(details_values) - valid_details
        failed_checks.append(f"Details column has invalid values: {invalid}")
    else:
        print(f"✓ Details column: All values are DEBIT or CREDIT")
    
    # Check 4: Details matches Amount sign (negative = DEBIT, positive = CREDIT)
    amount_details_match = True
    for idx in standardized_df.index:
        amount_val = standardized_df.loc[idx, amount_col_chase]
        details_val = standardized_df.loc[idx, details_col]
        if pd.notna(amount_val) and pd.notna(details_val):
            expected_detail = "DEBIT" if amount_val < 0 else "CREDIT"
            if details_val != expected_detail:
                amount_details_match = False
                break
    
    if not amount_details_match:
        failed_checks.append("Details column does not match Amount sign (negative should be DEBIT, positive should be CREDIT)")
    else:
        print(f"✓ Details-Amount consistency: Details correctly match Amount sign")
    
    # Check 5: Posting Date matches original Date
    date_match = standardized_df[posting_date_col].equals(bofa_central[bofa_date_col].reset_index(drop=True))
    if not date_match:
        # Check if they're equal after conversion
        date_match = (
            pd.to_datetime(standardized_df[posting_date_col], errors='coerce')
            .reset_index(drop=True)
            .equals(pd.to_datetime(bofa_central[bofa_date_col], errors='coerce').reset_index(drop=True))
        )
    if not date_match:
        failed_checks.append("Posting Date does not match original Date column")
    else:
        print(f"✓ Posting Date: Matches original Date column")
    
    # Check 6: Description matches original Description
    desc_match = standardized_df[description_col].equals(bofa_central[bofa_description_col].reset_index(drop=True))
    if not desc_match:
        failed_checks.append("Description does not match original Description column")
    else:
        print(f"✓ Description: Matches original Description column")
    
    # Check 7: Amount matches original Amount
    amount_match = standardized_df[amount_col_chase].equals(bofa_central[bofa_amount_col].reset_index(drop=True))
    if not amount_match:
        # Check with tolerance for floating point
        amount_diff = abs(standardized_df[amount_col_chase].reset_index(drop=True) - bofa_central[bofa_amount_col].reset_index(drop=True)).max()
        if amount_diff > 0.01:
            failed_checks.append(f"Amount does not match original Amount column (max diff: {amount_diff})")
        else:
            print(f"✓ Amount: Matches original Amount column (within tolerance)")
    else:
        print(f"✓ Amount: Matches original Amount column")
    
    # Check 8: Type column has ACH_ prefix
    type_values = standardized_df[type_col].dropna().unique()
    all_have_prefix = all(str(val).startswith('ACH_') for val in type_values if pd.notna(val))
    if not all_have_prefix:
        invalid_types = [val for val in type_values if not str(val).startswith('ACH_')]
        failed_checks.append(f"Type column has values without ACH_ prefix: {invalid_types}")
    else:
        print(f"✓ Type column: All values have ACH_ prefix")
    
    # Check 9: Type matches Details with ACH_ prefix
    type_details_match = True
    for idx in standardized_df.index:
        details_val = standardized_df.loc[idx, details_col]
        type_val = standardized_df.loc[idx, type_col]
        if pd.notna(details_val) and pd.notna(type_val):
            expected_type = f"ACH_{details_val}"
            if type_val != expected_type:
                type_details_match = False
                break
    
    if not type_details_match:
        failed_checks.append("Type column does not match Details with ACH_ prefix")
    else:
        print(f"✓ Type-Details consistency: Type correctly matches Details with ACH_ prefix")
    
    # Raise error if any checks failed
    if failed_checks:
        error_msg = "BofA standardization validation failed:\n" + "\n".join(f"  - {check}" for check in failed_checks)
        raise ValueError(error_msg)
    else:
        print(f"✓ All BofA standardization checks passed!\n")
    
    return standardized_df


bofa_central_standardized = bofa_standardizer(cc["bofa_central"], cc["chase_central"])


=== Validating BofA Standardization ===
✓ Row count: 380 rows match
✓ Column structure: Matches chase_central exactly (5 columns)
✓ Details column: All values are DEBIT or CREDIT
✓ Details-Amount consistency: Details correctly match Amount sign
✓ Posting Date: Matches original Date column
✓ Description: Matches original Description column
✓ Amount: Matches original Amount column
✓ Type column: All values have ACH_ prefix
✓ Type-Details consistency: Type correctly matches Details with ACH_ prefix
✓ All BofA standardization checks passed!



In [8]:
bofa_central_standardized

Unnamed: 0,Details,Posting Date,Description,Amount,Type
0,DEBIT,2025-09-02,BANKCARD DES:MTOT DISC ID:422899490017607 INDN...,-402.02,ACH_DEBIT
1,DEBIT,2025-07-16,LQ MERCHANT DES:ADJUSTMENT ID:584600000388686 ...,-335.00,ACH_DEBIT
2,DEBIT,2025-07-16,LQ MERCHANT DES:ADJUSTMENT ID:584600000388736 ...,-335.00,ACH_DEBIT
3,DEBIT,2025-07-02,LQ MERCHANT DES:MERCH FEES ID:584600000388744 ...,-310.00,ACH_DEBIT
4,DEBIT,2025-07-02,LQ MERCHANT DES:MERCH FEES ID:584600000388686 ...,-310.00,ACH_DEBIT
...,...,...,...,...,...
375,CREDIT,2025-10-03,RETURN OF POSTED CHECK / ITEM (RECEIVED ON 10-...,310.00,ACH_CREDIT
376,CREDIT,2025-10-17,BANKCARD DES:MTOT DEP ID:422899490017605 INDN:...,368.94,ACH_CREDIT
377,CREDIT,2025-10-17,BANKCARD DES:MTOT DEP ID:422899490017608 INDN:...,427.28,ACH_CREDIT
378,CREDIT,2025-07-07,WISE US INC DES:SMART IMPA ID:SMART IMPA INDN:...,1376.90,ACH_CREDIT


In [9]:
def create_central_df(chase_central: Optional[pd.DataFrame] = None, bofa_central_standardized: Optional[pd.DataFrame] = None) -> pd.DataFrame:
    """
    Create Central DataFrame Function
    
    This function concatenates chase_central and bofa_central_standardized DataFrames
    into a single unified DataFrame called central_df.
    Adds a "Bank Name" column to identify the source: "CHASE" for Chase rows, "BofA" for BofA rows.
    
    Args:
        chase_central: Chase central DataFrame (defaults to global chase_central if not provided)
        bofa_central_standardized: Standardized BofA DataFrame (defaults to global bofa_central_standardized if not provided)
    
    Returns:
        pd.DataFrame: Concatenated DataFrame containing both Chase and BofA data with Bank Name column
    """
    # Access from global scope if not provided
    if chase_central is None:
        try:
            chase_central = globals().get('cc', {}).get('chase_central')
            if chase_central is None:
                raise ValueError("chase_central not found. Please provide it as parameter or ensure 'cc' dictionary exists.")
        except:
            raise ValueError("chase_central not found. Please provide it as parameter.")
    
    if bofa_central_standardized is None:
        try:
            bofa_central_standardized = globals().get('bofa_central_standardized')
            if bofa_central_standardized is None:
                raise ValueError("bofa_central_standardized not found. Please provide it as parameter or ensure it exists.")
        except:
            raise ValueError("bofa_central_standardized not found. Please provide it as parameter.")
    
    if chase_central.empty:
        raise ValueError("chase_central is empty, cannot concatenate")
    
    if bofa_central_standardized.empty:
        raise ValueError("bofa_central_standardized is empty, cannot concatenate")
    
    # Validate that both DataFrames have the same columns (excluding Bank Name which we'll add)
    chase_columns = set(chase_central.columns)
    bofa_columns = set(bofa_central_standardized.columns)
    
    if chase_columns != bofa_columns:
        missing_in_bofa = chase_columns - bofa_columns
        missing_in_chase = bofa_columns - chase_columns
        error_msg = "Column mismatch between chase_central and bofa_central_standardized:\n"
        if missing_in_bofa:
            error_msg += f"  Missing in bofa_central_standardized: {missing_in_bofa}\n"
        if missing_in_chase:
            error_msg += f"  Missing in chase_central: {missing_in_chase}"
        raise ValueError(error_msg)
    
    # Create copies to avoid modifying originals
    chase_df = chase_central.copy()
    bofa_df = bofa_central_standardized.copy()
    
    # Add Bank Name column to each DataFrame
    chase_df['Bank Name'] = 'CHASE'
    bofa_df['Bank Name'] = 'BofA'
    
    # Concatenate the DataFrames
    central_df = pd.concat([chase_df, bofa_df], ignore_index=True)
    
    # Print summary
    print(f"\n=== Central DataFrame Created ===")
    print(f"✓ Chase rows: {len(chase_central)}")
    print(f"✓ BofA rows: {len(bofa_central_standardized)}")
    print(f"✓ Total rows in central_df: {len(central_df)}")
    print(f"✓ Columns: {list(central_df.columns)}")
    print(f"✓ Total columns: {len(central_df.columns)}")
    
    # Validate row count
    expected_rows = len(chase_central) + len(bofa_central_standardized)
    if len(central_df) != expected_rows:
        raise ValueError(
            f"Row count mismatch: Expected {expected_rows} rows, got {len(central_df)} rows"
        )
    else:
        print(f"✓ Row count validation passed: {expected_rows} rows")
    
    # Validate Bank Name column
    chase_count = (central_df['Bank Name'] == 'CHASE').sum()
    bofa_count = (central_df['Bank Name'] == 'BofA').sum()
    if chase_count == len(chase_central) and bofa_count == len(bofa_central_standardized):
        print(f"✓ Bank Name column validation passed: {chase_count} CHASE rows, {bofa_count} BofA rows")
    else:
        raise ValueError(
            f"Bank Name column mismatch: Expected {len(chase_central)} CHASE and {len(bofa_central_standardized)} BofA, "
            f"got {chase_count} CHASE and {bofa_count} BofA"
        )
    
    print(f"\n✓ Central DataFrame creation complete!\n")
    
    return central_df

central_df = create_central_df(cc["chase_central"], bofa_central_standardized)
central_df


=== Central DataFrame Created ===
✓ Chase rows: 2247
✓ BofA rows: 380
✓ Total rows in central_df: 2627
✓ Columns: ['Details', 'Posting Date', 'Description', 'Amount', 'Type', 'Bank Name']
✓ Total columns: 6
✓ Row count validation passed: 2627 rows
✓ Bank Name column validation passed: 2247 CHASE rows, 380 BofA rows

✓ Central DataFrame creation complete!



Unnamed: 0,Details,Posting Date,Description,Amount,Type,Bank Name
0,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,25.28,ACH_CREDIT,CHASE
1,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,100.71,ACH_CREDIT,CHASE
2,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,175.30,ACH_CREDIT,CHASE
3,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,295.55,ACH_CREDIT,CHASE
4,CREDIT,2025-10-06,ORIG CO NAME:LQ MERCHANT CO ENTRY DESCR:D...,358.75,ACH_CREDIT,CHASE
...,...,...,...,...,...,...
2622,CREDIT,2025-10-03,RETURN OF POSTED CHECK / ITEM (RECEIVED ON 10-...,310.00,ACH_CREDIT,BofA
2623,CREDIT,2025-10-17,BANKCARD DES:MTOT DEP ID:422899490017605 INDN:...,368.94,ACH_CREDIT,BofA
2624,CREDIT,2025-10-17,BANKCARD DES:MTOT DEP ID:422899490017608 INDN:...,427.28,ACH_CREDIT,BofA
2625,CREDIT,2025-07-07,WISE US INC DES:SMART IMPA ID:SMART IMPA INDN:...,1376.90,ACH_CREDIT,BofA


## **3) MID Extraction & Enrichment - Transaction Classification Pipeline**

This section extracts Merchant IDs (MIDs) and credit charge types from transaction descriptions, then enriches each transaction with GWID, Processor, and Corp information through fuzzy matching.

### **What this section does:**

1. **Extract metadata from descriptions** (regex-based pattern matching):
   - **MID (Merchant ID)**: Extracts digits between `INDID` and (`INDNAME` or `ORIGID`) tokens
   - **Credit charge type**: Extracts letters between `COENTRYDESCR` and `SEC` tokens (e.g., `merchdep`, `deposit`, `tpresrel`)
   - Creates `clean_description` (lowercase, no spaces/special chars), `midid`, and `credit_charge` columns

2. **Fuzzy match MIDs to canonical reference** (`match_mid_score` function):
   - Normalizes extracted MIDs to digits-only
   - Uses RapidFuzz `partial_ratio` with 85% score cutoff
   - Matches against GWID reference DataFrame to find canonical MID
   - Returns best match + confidence score

3. **Hierarchical lookups** (MID → GWID → Processor → Corp):
   - **MID → GWID**: Maps matched MID to Gateway ID
   - **GWID → Processor**: Maps Gateway ID to processor name (e.g., "Luqra Evolve", "EMS")
   - **GWID/MID → Corp**: Maps to corporate entity when available
   - Appends `matched_mid`, `gwid`, `processor`, and `corp` columns to each transaction

4. **Diagnostics & validation**:
   - Reports rows with missing MIDs and categorizes failure reasons:
     - Missing `INDID` token
     - Missing end token (`INDNAME`/`ORIGID`)
     - Invalid extraction window/regex miss
   - Logs low-confidence matches (score < 85%)
   - Provides sample output for inspection

5. **Output**:
   - Returns enriched DataFrame (`result_df`) with columns:
     - Original: `Details`, `Posting Date`, `Description`, `Amount`, `Type`, `Bank Name`
     - Added: `clean_description`, `midid`, `credit_charge`, `matched_mid`, `gwid`, `processor`, `corp`
   - Ready for aggregation into pivot tables by GWID, Processor, or Corp

### **Key functions:**
- `single_pass_extract_mid_credit()`: Vectorized extraction + fuzzy matching in one pass
- `match_mid_score()`: Fuzzy MID matcher with caching (from Section 1 imports)

### **Dependencies:**
- Requires `gwids_df` reference DataFrame with columns: `mid`, `gwid`, `processor`, `corp`
- Uses `rapidfuzz` library for fuzzy string matching


In [10]:
# Check if API key is set (from environment variable set before starting Jupyter)
# Do NOT set it here - it should be set in your PowerShell session before starting Jupyter
api_key_check = os.getenv("ANTHROPIC_API_KEY")

if api_key_check:
    print(f"✓ API key found in environment")
else:
    print("⚠ API key NOT found!")
    print("\nTo set it properly:")
    print("1. Close this Jupyter notebook")
    print("2. In PowerShell, run: $env:ANTHROPIC_API_KEY = 'your-key-here'")
    print("3. Start Jupyter from that same PowerShell window: jupyter notebook")
    print("\nOR use Option 2 below (python-dotenv with .env file)")

⚠ API key NOT found!

To set it properly:
1. Close this Jupyter notebook
2. In PowerShell, run: $env:ANTHROPIC_API_KEY = 'your-key-here'
3. Start Jupyter from that same PowerShell window: jupyter notebook

OR use Option 2 below (python-dotenv with .env file)


In [11]:
# Option 2: Use python-dotenv to load from .env file (recommended for development)
# First install: pip install python-dotenv
# Then create a .env file in your project root with: ANTHROPIC_API_KEY=your-key-here
# Make sure .env is in .gitignore!

try:
    from dotenv import load_dotenv
    load_dotenv()  # Loads variables from .env file
    
    # Check again
    if os.getenv("ANTHROPIC_API_KEY"):
        print("✓ API key loaded from .env file")
    else:
        print("⚠ No .env file found or API key not in .env")
        print("Create a .env file with: ANTHROPIC_API_KEY=your-key-here")
except ImportError:
    print("python-dotenv not installed. Install with: pip install python-dotenv")
    print("Or use Option 1 (set env var before starting Jupyter)")


⚠ No .env file found or API key not in .env
Create a .env file with: ANTHROPIC_API_KEY=your-key-here


In [12]:
import anthropic
import base64
import os
from pathlib import Path
from typing import Optional

def detect_bank_from_screenshot(excel_file_path: str, api_key: Optional[str] = None) -> str:
    """
    Take screenshot of Excel file's first few rows, send to Claude Haiku.
    Returns: bank name (e.g., "Chase", "Bank of America")
    
    Args:
        excel_file_path: Path to the Excel file to analyze
        api_key: Anthropic API key. If not provided, will try to read from 
                 ANTHROPIC_API_KEY environment variable.
    
    Raises:
        ValueError: If no API key is provided and ANTHROPIC_API_KEY env var is not set.
    """
    
    # Get API key from parameter or environment variable
    if api_key is None:
        api_key = os.getenv("ANTHROPIC_API_KEY")
        if api_key is None:
            raise ValueError(
                "API key not provided. Either pass it as a parameter or set the "
                "ANTHROPIC_API_KEY environment variable."
            )
    
    client = anthropic.Anthropic(api_key=api_key)
    
    # Read first 10 rows as preview
    df_preview = pd.read_excel(excel_file_path, nrows=10)
    preview_text = df_preview.to_string()
    
    prompt = f"""Identify the bank from this statement preview. 

RULES:
- If columns include "Posting Date", "Description", "Amount", "Type", "Balance" → Chase
- If columns include "Posted Date", "Payee", "Address" → Bank of America  
- If columns include "Date", "Check Number", "Description", "Withdrawal", "Deposit" → Wells Fargo
- If you see "CHASE" or "JPMorgan" anywhere → Chase
- If you see "Bank of America" or "BofA" → Bank of America

Preview:
{preview_text}

Respond with ONLY the bank name, nothing else."""

    message = client.messages.create(
        model="claude-3-5-haiku-20241022",
        max_tokens=20,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return message.content[0].text.strip()