In [55]:
import pandas as pd
import datetime
import os
from dataclasses import dataclass

In [56]:
# Number of months to collect, parse, and analyze financial data
lookback_months = 8

###### CSV Header Names ######
## Global Columns, or one that should be normalized to be global, such as date and cost
category = 'Category'
retailer = 'Description'
date = 'Date'
cost = 'Amount'

## Credit Card CSV Columns
transaction_date = 'Transaction Date'
transaction_cost = 'Debit'

## Bank Account CSV Columns
banking_date = "Date"
banking_cost = "Amount"

## Capital One 360 Account CSV Columns
co_360_date = 'Transaction Date'
co_360_retailer = 'Transaction Description'
co_360_cost = 'Transaction Amount'
co_360_balance = 'Balance'

## THESE ARE THE NAMES OF GROCERIES AS THEY APPEAR ON THE TRANSACTIONS CSV FILE
grocery_keywords = ['KROGER', 'GIANT', 'SAFEWAY', 'HELLOFRESH', 'WEGMANS', 'FOOD LION']

###### Mappings and Lookups ######
banking_to_income_lookup = {
    "SAFECO": "Car Insurance",
    "PAYMENT AT CAPITAL ONE ONLINE PMT": "Credit Card",
    "PAYMENT AT CAPITAL ONE CRCARDPMT": "Credit Card",
    "PAYMENT AT CAPITAL ONE MOBILE PMT" : "Credit Card",
    "Preauthorized Deposit from FIFTH THIRD BANK" : "HYSA Transfer",
    "PAYMENT AT CAPITAL ONE TRANSFER": "HYSA Transfer",
    "PAYMENT AT SCHWAB": "Investments",
    "SCHWAB BROKERAGE MONEYLINK" : "Investments",
    "PAYMENT AT GOVERNORS GREEN": "Rent",
    "HUMANA PAYROLL": "Payroll",
    "XAVIER UNIVERSIT PAYROLL" : "Payroll",
    "PLANET FIT CLUB FEES" : "Fitness",
    "COMCAST": "Internet",
    "RECURRING PURCHASE AT SPECTRUM" : "Internet",
    "5/3 ONLINE TRANSFER" : "Fifth Third Transfer",
    "ZELLE PMT" : "Zelle",
    "VENMO" : "Venmo",
    "MOBILE DEPOSIT" : "Deposit"
}
transaction_to_expenses_lookup = {
    "Gas/Automotive" : "Gas",
    "Health Care" : "Healthcare",
    "Entertainment" : "Other"
}

In [57]:
date_format = "%Y-%m-%d"
transaction_path = "./Transactions"
banking_path = "./Banking"
coc_path = "./COC"
hysa_path = "./HYSA"
excel_filename = "budget.xlsx"
excel = True

In [58]:
def main():
    merged_transactions = merge_events(input_file_path=transaction_path)
    merged_banking = merge_events(banking_path)
    merged_coc = merge_events(coc_path)
    merged_hysa = merge_events(hysa_path)
    merged_transactions = enrich_grocery(merged_transactions)

    merged_event_dfs = {
        "transactions" : merged_transactions, 
        "banking" : merged_banking, 
        "coc" : merged_coc, 
        "hysa" : merged_hysa
    }

    net_df = iterate_months(merged_event_dfs, lookback_months)

    dataframe_sheets = {
        "Net" : net_df,
        "All Transactions" : merged_transactions,
        "All Banking" : merged_banking,
        "All Cap One Checking" : merged_coc,
        "All HYSA" : merged_hysa
    }
    if excel == True:
        try:
            export_to_excel(dataframe_sheets)
        except Exception as e:
            print("Failed to create Excel file:", e)
        else:
            print("Excel file created successfully.", excel_filename)
    else:
        print()

In [59]:
@dataclass
class MonthlyModel:
    month: str
    income: float
    expenses: float
    fixed_expenses: float
    variable_expenses: float
    discretionary_expenses: float
    investments: float
    hy_savings: float

In [60]:
def sort_df_by_date(df, date_field):
    df[date_field] = pd.to_datetime(df[date_field])
    df = df.sort_values(by=date_field, ascending=True)
    df = df.reset_index(drop=True)
    df[date_field] = df[date_field].dt.strftime('%Y-%m-%d')
    return df

In [61]:
def merge_events(input_file_path):
    # Ingest CSV lines
    input_events = [os.path.join(input_file_path, f) for f in os.listdir(input_file_path) if os.path.isfile(os.path.join(input_file_path, f))]
    input_df = [pd.read_csv(file) for file in input_events]
    merged_events = pd.concat(input_df)
    # Normalize column names
    column_field_mapping = {
        cost : [banking_cost, transaction_cost, co_360_cost],
        retailer : [co_360_retailer],
        date : [banking_date, transaction_date, co_360_date]
    }
    for col in column_field_mapping:
        for field in column_field_mapping[col]:
            if field in merged_events.columns:
                merged_events = merged_events.rename(columns={field: col})
                break
    # Dedup
    merged_events = merged_events.drop_duplicates(subset=[cost, retailer, date])
    # Remove $0 events
    merged_events = merged_events[merged_events[cost].notna()]
    
    # Fill in categories
    if not category in merged_events.columns:
        # Fill in category for Capital One
        if "COC" in input_file_path or "HYSA" in input_file_path:
            merged_events[category] = str(input_file_path).replace("./", "")
        # Populate the 'category' column based on retailer
        for lookup, category_value in banking_to_income_lookup.items():
            merged_events.loc[merged_events[retailer].str.contains(lookup), category] = category_value
        # Fill in "Other" for events not defined in the lookup
        merged_events.loc[merged_events[category].isna(), category] = "Other"
        merged_events = sort_df_by_date(merged_events, date)
    return merged_events

In [62]:
def filter_events_by_date(start_date, end_date, merged_events):
    filtered_events = merged_events[(pd.to_datetime(merged_events[date], format=date_format) >= start_date) & 
                        (pd.to_datetime(merged_events[date], format=date_format) <= end_date)].sort_values(by=date, ascending=False) 
    return filtered_events

In [63]:
def enrich_grocery(merged_transactions):
    for keyword in grocery_keywords:
        contains_keyword = merged_transactions[retailer].str.contains(keyword, case=False, na=False)
        not_fuel = ~merged_transactions[retailer].str.contains('FUEL', case=False, na=False)
        if contains_keyword.any() and not_fuel.any():
            merged_transactions.loc[contains_keyword & not_fuel, category] = 'Grocery'
    return merged_transactions

In [64]:
def merge_cash_flow(dfs, negate_cost_for=None):
    frames = []
    for name, df in dfs.items():
        if name in negate_cost_for:
            df = df.assign(**{cost: -df[cost]})
        frames.append(df[[date, retailer, category, cost]])

    return pd.concat(frames, ignore_index=True).sort_values(by=date)

In [65]:
def get_cash_flow(event_dfs: pd.DataFrame):
    exception_categories = ["HYSA Transfer", "Investments", "Credit Card"]
    event_dfs = event_dfs[~event_dfs[category].isin(exception_categories)]
    income_df, expenses_df = pd.DataFrame(), pd.DataFrame()
    income_df = event_dfs[event_dfs[cost] > 0]
    expenses_df = event_dfs[event_dfs[cost] < 0]
    return income_df, expenses_df

In [66]:
def define_expenses(expenses_df):
    # Rent, insurance, internet
    fixed_expenses_df = expenses_df[expenses_df[category].isin(["Rent", "Car Insurance", "Health Care", "Internet"])]
    # Groceries, gas, vet
    variable_expenses_df = expenses_df[expenses_df[category].isin(["Gas/Automotive", "Grocery", "Professional Services"])]
    # Dining, coffee, entertainment
    discretionary_expenses_df = expenses_df[
        ~expenses_df[category].isin(fixed_expenses_df[category].unique())
        & ~expenses_df[category].isin(variable_expenses_df[category].unique())]
    return fixed_expenses_df, variable_expenses_df, discretionary_expenses_df

In [67]:
def build_cashflow_model(month_start, master_events_df, income_df, expenses_df):
    fixed_expenses_df, variable_expenses_df, discretionary_expenses_df = define_expenses(expenses_df)
    return MonthlyModel(
        month = month_start,
        income = income_df[cost].sum(),
        expenses = expenses_df[cost].sum(),
        fixed_expenses = fixed_expenses_df[cost].sum(),
        variable_expenses = variable_expenses_df[cost].sum(),
        discretionary_expenses = discretionary_expenses_df[cost].sum(),
        investments = abs(master_events_df.loc[master_events_df[category] == "Investments", cost].sum()),
        hy_savings = master_events_df.loc[master_events_df[category] == "HYSA", cost].sum()
    )

In [68]:
def get_state(merged_event_dfs, month_start, month_end):
    # Get time period (length of unit of time minus 1 day)
    filtered_transactions = filter_events_by_date(month_start, month_end, merged_event_dfs["transactions"])
    filtered_banking = filter_events_by_date(month_start, month_end, merged_event_dfs["banking"])
    filtered_coc = filter_events_by_date(month_start, month_end, merged_event_dfs["coc"])
    filtered_hysa = filter_events_by_date(month_start, month_end, merged_event_dfs["hysa"])

    master_events_df = merge_cash_flow(
        {
            "transactions": filtered_transactions,
            "banking": filtered_banking,
            "coc": filtered_coc,
            "hysa": filtered_hysa,
        },
        negate_cost_for={"transactions"}
    )
    income_df, expenses_df = get_cash_flow(master_events_df)
    return build_cashflow_model(month_start, master_events_df, income_df, expenses_df)

In [69]:
def iterate_months(merged_event_dfs, months_back: int):
    states = []

    today = datetime.date.today()
    current_month_start = today.replace(day=1)

    for i in range(months_back):
        # shift month back by i
        year = current_month_start.year
        month = current_month_start.month - i

        while month <= 0:
            month += 12
            year -= 1

        month_start = datetime.date(year, month, 1)

        # compute month end
        next_month = month_start.replace(day=28) + datetime.timedelta(days=4)
        month_end = next_month - datetime.timedelta(days=next_month.day)

        states.append(
            get_state(
                merged_event_dfs,
                pd.to_datetime(month_start),
                pd.to_datetime(month_end),
            )
        )

    state_df = pd.DataFrame(states)
    state_df.columns = (col.title() for col in state_df.columns)
    return sort_df_by_date(state_df, "Month")

In [70]:
def recurring_expenses(expenses_df):
    recurring_expenses_df = pd.DataFrame()
    return recurring_expenses_df

In [71]:
def export_to_excel(dataframe_sheets):
    with pd.ExcelWriter(excel_filename) as writer:
        keys_list = list(dataframe_sheets.keys())
        for sheet in dataframe_sheets:
            dataframe_sheets[sheet].to_excel(writer, sheet_name=sheet, index=keys_list.index(sheet))

In [72]:
if __name__ == "__main__":
    main()

  df[date_field] = pd.to_datetime(df[date_field])
  df[date_field] = pd.to_datetime(df[date_field])


Excel file created successfully. budget.xlsx
