In [1]:
%pip install openpyxl

Note: you may need to restart the kernel to use updated packages.


In [2]:
import kagglehub
import sys
import pandas as pd
import numpy as np
import re
import unicodedata

from typing import Dict, List
from pathlib import Path

# ensure project root is on sys.path for imports when running in a notebook
ROOT = Path.cwd().resolve().parents[0]
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from src.DataCleaner import DataCleaner

data_path = ROOT / "data" / "1. Bank Reconciliation Sample.xlsx"

sheets = pd.read_excel(data_path, sheet_name=None, header=4)
print(sheets.keys())

  from .autonotebook import tqdm as notebook_tqdm


dict_keys(['Company Cashbook', 'Bank Statement', 'Bank Reconciliation Statement', 'W-1 (BS to CB)', 'W-2 (CB to BS)'])


In [3]:
df_bs = pd.read_excel(data_path, sheet_name="Bank Statement", header=5)
df_cb = pd.read_excel(data_path, sheet_name="Company Cashbook", header=4)

print(df_bs.shape, df_cb.shape)
print(df_bs.columns)
print(df_cb.columns)

(18, 6) (12, 8)
Index(['Unnamed: 0', 'Date', 'Particulars', 'Debit', 'Credit', 'Balance'], dtype='object')
Index(['Unnamed: 0', 'Date', 'Details', 'Amount ($)', 'Unnamed: 4', 'Date.1',
       'Details.1', 'Amount ($).1'],
      dtype='object')


In [4]:
def drop_empty_unnamed(df):
    """"   
    drop unnamed columns if they are fully empty
    """
    df = df.copy()
    unnamed = [c for c in df.columns if str(c).startswith("Unnamed")]
    df = df.drop(columns=[c for c in unnamed if df[c].isna().all()], errors="ignore")
    return df

def clean_cols(df):
    """
    Clean/ standardize column names
    """
    df = df.copy()
    df.columns = (
        df.columns.astype(str)
        .str.strip()
        .str.replace("\n", " ", regex=False)
        .str.replace(" ", "_")
        .str.replace(".", "_")
        .str.upper()
    )
    return df

In [5]:
df_bs = drop_empty_unnamed(df_bs)
df_cb = drop_empty_unnamed(df_cb)

df_bs = clean_cols(df_bs)
df_cb = clean_cols(df_cb)

print(df_bs.columns)
print(df_cb.columns)

Index(['DATE', 'PARTICULARS', 'DEBIT', 'CREDIT', 'BALANCE'], dtype='object')
Index(['DATE', 'DETAILS', 'AMOUNT_($)', 'DATE_1', 'DETAILS_1', 'AMOUNT_($)_1'], dtype='object')


In [6]:
# df_cb we see DATE and DATE_1 because the data is split into two section - debit and credit
# the last row of each section is the total
# we need to split into debit and credit, and remove the total 


def split_debit_credit(df):
    df = df.copy()

    if "DATE_1" in df.columns:
        debit_cols = ["DATE", "DETAILS", "AMOUNT_($)"]
        credit_cols = ["DATE_1", "DETAILS_1", "AMOUNT_($)_1"]
        debit_rename = {"AMOUNT_($)": "AMOUNT"}
        credit_rename = {"DATE_1": "DATE", "DETAILS_1": "DETAILS", "AMOUNT_($)_1": "AMOUNT"}


    debit = df[debit_cols].rename(columns=debit_rename).copy()
    credit = df[credit_cols].rename(columns=credit_rename).copy()

    # standardize names for rest of pipeline
    debit.columns = ["DATE", "DETAILS", "AMOUNT"]
    credit.columns = ["DATE", "DETAILS", "AMOUNT"]

    return debit, credit

def drop_total_and_blank_rows(df):
    df = df.copy()

    df = df.dropna(how="all")

    # drop "total" rows based on DETAILS text
    if "DETAILS" in df.columns:
        mask_total = df["DETAILS"].astype(str).str.contains(r"\btotal\b", case=False, na=False)
        df = df.loc[~mask_total]

    return df.reset_index(drop=True)


# next we should clean all the columns of data to ensure that they are processed and the right types assigned
cleaner = DataCleaner()

COLUMN_CLEANERS = {
    "DATE": cleaner.apply_date,
    "DETAILS": cleaner.apply_text,
    "PARTICULARS": cleaner.apply_text,
    "AMOUNT": cleaner.apply_number,
    "DEBIT": cleaner.apply_number,
    "CREDIT": cleaner.apply_number,
    "BALANCE": cleaner.apply_number,
}

def apply_cleaner(df):
    df = df.copy()

    for col, clean_func in COLUMN_CLEANERS.items():
        if col in df.columns:
            df[col] = clean_func(df[col])  

    return df


In [7]:
debit, credit = split_debit_credit(df_cb)

debit = drop_total_and_blank_rows(debit)
credit = drop_total_and_blank_rows(credit)

# the opening row of debit and closing row of credit have balances that need to be removed
# the credit and debit databases shpuld be transactions only 
# we can save them seperately if needed later

cb_opening_bal = debit.loc[debit["DETAILS"] == "Balance b/d"]
cb_closing_bal = credit.loc[credit["DETAILS"] == "Balance c/d"]
bs_opening_bal = df_bs.loc[df_bs["PARTICULARS"] == "Opening Balance"]

debit = debit.drop(cb_opening_bal.index)
credit = credit.drop(cb_closing_bal.index)
df_bs = df_bs.drop(bs_opening_bal.index)


# Now apply the cleaner
debit = apply_cleaner(debit)
credit = apply_cleaner(credit)
df_bs = apply_cleaner(df_bs)


  return pd.to_datetime(x, errors="coerce", infer_datetime_format=True)
  return pd.to_datetime(x, errors="coerce", infer_datetime_format=True)
  return pd.to_datetime(x, errors="coerce", infer_datetime_format=True)


In [8]:
# now we can create a unified cashbook dataframe by combining debit and credit
# ! this is a cash account where debit increases cash and credit decreases 

cb_debit = debit.copy()
cb_debit["TYPE"] = "RECIEPT"
cb_debit["SIGNED_AMOUNT"] = cb_debit["AMOUNT"]

cb_credit = credit.copy()
cb_credit["TYPE"] = "PAYMENT"
cb_credit["SIGNED_AMOUNT"] = -cb_credit["AMOUNT"]

df_cb_unified = (
    pd.concat([cb_debit, cb_credit], ignore_index=True)
      .sort_values("DATE")
      .reset_index(drop=True)
)

In [9]:
# combine debit and credit into signed amount in bank statement
df_bs["SIGNED_AMOUNT"] = (
    df_bs["CREDIT"].fillna(0)
    - df_bs["DEBIT"].fillna(0)
)

# standardize column names -> particulars = details in df_bs
df_bs = df_bs.rename(columns={
    "PARTICULARS": "DETAILS"
})

# we want to split the details and the transaction nums to make matching easier
# extract trailing transaction number (if present)
df_bs["TRANSACTION_NO"] = df_bs["DETAILS"].str.extract(r"(\d+)$")
df_bs["DETAILS_TEXT"] = df_bs["DETAILS"].str.replace(r"\s*\d+$", "", regex=True)

df_cb_unified["TRANSACTION_NO"] = df_cb_unified["DETAILS"].str.extract(r"(\d+)$")
df_cb_unified["DETAILS_TEXT"] = df_cb_unified["DETAILS"].str.replace(r"\s*\d+$", "", regex=True)


df_bs["DETAILS"] = df_bs["DETAILS_TEXT"]
df_bs.drop(columns="DETAILS_TEXT", inplace=True)

df_cb_unified["DETAILS"] = df_cb_unified["DETAILS_TEXT"]
df_cb_unified.drop(columns="DETAILS_TEXT", inplace=True)

In [10]:
# make sure all the transaction number column is type int
df_bs["TRANSACTION_NO"] = cleaner.apply_trans_num(df_bs["TRANSACTION_NO"])
df_cb_unified["TRANSACTION_NO"] = cleaner.apply_trans_num(df_cb_unified["TRANSACTION_NO"])

df_bs["DETAILS"] = cleaner.apply_text(df_bs["DETAILS"])
df_cb_unified["DETAILS"] = cleaner.apply_text(df_cb_unified["DETAILS"])

In [11]:
# start with unified cashbook + cleaned bank statement
df_cb_remaining = df_cb_unified.copy()
df_bs_remaining = df_bs.copy()

# keep row ids for removal after matching
df_cb_remaining = df_cb_remaining.reset_index(drop=False).rename(columns={"index": "CB_ROWID"})
df_bs_remaining = df_bs_remaining.reset_index(drop=False).rename(columns={"index": "BS_ROWID"})

In [12]:
df_match = df_cb_remaining.merge(
    df_bs_remaining,
    on=["TRANSACTION_NO", "SIGNED_AMOUNT"],
    how="inner",
    suffixes=("_cb", "_bs")
)

# remove matched rows from remaining datasets
matched_cb_ids = df_match["CB_ROWID"].unique()
matched_bs_ids = df_match["BS_ROWID"].unique()

df_cb_remaining = df_cb_remaining[~df_cb_remaining["CB_ROWID"].isin(matched_cb_ids)].reset_index(drop=True)
df_bs_remaining = df_bs_remaining[~df_bs_remaining["BS_ROWID"].isin(matched_bs_ids)].reset_index(drop=True)

In [None]:
df_match.drop(columns=["DEBIT", "CREDIT"])

In [None]:
# since this data set is simple, we have matched all possible rows
# in real world data sets, I would add more matching logic here based on fuzzy matching of details and date ranges 
# and combinations of signed amounts 

In [18]:
#processing for business insight:

df_cb_remaining["RECON_CATEGORY"] = np.where(
    df_cb_remaining["TYPE"] == "PAYMENT",
    "Unpresented payment",
    np.where(
        df_cb_remaining["TYPE"] == "RECIEPT",
        "Outstanding deposit",
        "Other reconciling item"
    )
)

def classify_bs(details):
    details = str(details).upper()
    if "BANK CHARGES" in details:
        return "Bank charges not recorded"
    elif "DIVIDEND" in details:
        return "Direct income not recorded"
    elif "DIRECT DEBIT" in details:
        return "Direct debit not recorded"
    else:
        return "Other reconciling item"
    
df_bs_remaining["RECON_CATEGORY"] = df_bs_remaining["DETAILS"].apply(classify_bs)

In [19]:
# check final reconcialiation proof

# --- Bank Statment
sum_matched = cleaner.apply_number(df_match["SIGNED_AMOUNT"]).sum() 
sum_bs_unmatched = cleaner.apply_number(df_bs_remaining["SIGNED_AMOUNT"]).sum()

bs_opening_bal = bs_opening_bal["BALANCE"]

bs_check = bs_opening_bal + sum_matched + sum_bs_unmatched

# check if the sum of the opening balance, the sum of all matche dtransactions, and the sum of all unmatched
# are equal to the closing bank balance

bs_check == df_bs["BALANCE"].iloc[-1]

0    True
Name: BALANCE, dtype: bool

In [21]:
# --- Company cashbook

sum_cb_unmatched = cleaner.apply_number(df_cb_remaining["SIGNED_AMOUNT"]).sum()
cb_check = cb_opening_bal["AMOUNT"] + sum_cb_unmatched + sum_matched

closing_cash = float(cb_closing_bal["AMOUNT"])

cb_check == closing_cash

  closing_cash = float(cb_closing_bal["AMOUNT"])


0    True
Name: AMOUNT, dtype: bool

In [28]:
df_cb_unified.to_csv((ROOT / "data") / "df_cb_unified.csv", index=False)
df_match.to_csv((ROOT / "data") / "df_matched.csv", index=False)
df_cb_remaining.to_csv((ROOT / "data") / "df_cb_remaining.csv", index=False)
df_bs_remaining.to_csv((ROOT / "data") / "df_bs_remaining.csv", index=False)