In [1]:
import pandas as pd
import numpy as np


In [2]:
df = pd.read_csv("Balance_sheet.csv")

In [3]:
df.head()

Unnamed: 0,ID,Type,Product_Type,Balance,Rate,Maturity_Years,Rate_Type
0,Mortgage_1,Loan,Mortgage,670924,0.0492,23.0,Fixed
1,Mortgage_2,Loan,Mortgage,571403,0.045,20.0,Fixed
2,Mortgage_3,Loan,Mortgage,429520,0.0465,16.0,Fixed
3,Mortgage_4,Loan,Mortgage,636973,0.0469,27.0,Fixed
4,Mortgage_5,Loan,Mortgage,713300,0.049,29.0,Fixed


In [4]:
df.groupby("Type")["Balance"].sum().reset_index()

Unnamed: 0,Type,Balance
0,Deposit,55603572
1,Loan,47466902


In [5]:
df.groupby(["Type","Product_Type"]).count().reset_index()

Unnamed: 0,Type,Product_Type,ID,Balance,Rate,Maturity_Years,Rate_Type
0,Deposit,Chequing_Account,300,300,300,0,300
1,Deposit,GIC,200,200,200,200,200
2,Deposit,HISA,100,100,100,0,100
3,Deposit,Savings_Account,400,400,400,0,400
4,Loan,Auto_Loan,10,10,10,10,10
5,Loan,HELOC,15,15,15,15,15
6,Loan,Loan,25,25,25,25,25
7,Loan,Mortgage,50,50,50,50,50


In [6]:
def prepayment_rate(product_type, loan_rate, market_rate, rate_shock ):
    
    #CPR - annual conditional prepayment rate 
    #assume CPR adjustment is linear, ignoring other factors
    cpr_dict = {
         # residential mortgage loans
         # sensitivity of CPR to interest rate spread (in % per 100 bps of rate drop)
        'Mortgage': {'base_cpr': 0.08, 'sensitivity': 0.02}, 
        # auto loans (typically shorter-term and less rate-sensitive)
        'Auto_Loan': {'base_cpr': 0.05, 'sensitivity': 0.005},
        # general-purpose commercial or personal term loans
        # medium base CPR
        'Loan': {'base_cpr': 0.06, 'sensitivity': 0.01},
        # home equity line of credit
        'HELOC': {'base_cpr': 0.1, 'sensitivity': 0.015}
    }

    p = cpr_dict[product_type]
    
    #max 30% of principal prepaid per year
    max_cpr = 0.30   
    
    # rate differential : how much higher the loan's rate is compared to the new market rate
    rate_diff = (loan_rate - (market_rate + rate_shock))  
    
    adjustment = p['sensitivity'] * (rate_diff)*100
    #print ("adjustment: "+str(adjustment))
    cpr = p['base_cpr'] + adjustment
    return max(0, min(max_cpr, cpr))

In [7]:
cpr=prepayment_rate('Mortgage', 0.05, 0.06,  0.02 )
print ("cpr: "+str(cpr))

cpr: 0.020000000000000004


In [8]:
cpr=prepayment_rate('Mortgage', 0.05, 0.06,  -0.02 )
print ("cpr: "+str(cpr))

cpr: 0.10000000000000002


In [9]:
# For Non maturity deposit 
def nmd_decay_rate(product_type,  nmd_rate,   market_rate, rate_shock):
    # define behavior for each deposit type
        
    nmd_dict = {
        # 20% of balance assumed stable
        'Chequing_Account': {'core_proportion': 0.2, 'base_rate_core': 0.05, 'sensitivity_core': 0.01,
                             'base_rate_non_core': 0.30, 'sensitivity_non_core': 0.015},
         # 70% of balance assumed stable
        'Savings_Account': {'core_proportion': 0.7, 'base_rate_core': 0.03, 'sensitivity_core': 0.005,
                            'base_rate_non_core': 0.15, 'sensitivity_non_core': 0.02},
         # 50% of balance assumed stable
        'HISA': {'core_proportion': 0.5, 'base_rate_core': 0.05, 'sensitivity_core': 0.01,
                 'base_rate_non_core': 0.15, 'sensitivity_non_core': 0.03}

    }

    p = nmd_dict[product_type]
    # Upper cap on any decay rate
    max_rate = 0.30
    
    rate_diff = (market_rate + rate_shock - nmd_rate)
    #print ("rate_diff: "+str(rate_diff))
    adjustment_core = p['sensitivity_core'] *  rate_diff * 100
    #print ("adjustment_core: "+str(adjustment_core))
    adjustment_non_core = p['sensitivity_non_core'] * rate_diff  * 100 
    #print ("adjustment_non_core: "+str(adjustment_non_core))
    decay_core = max(0, min(max_rate, p['base_rate_core'] + adjustment_core))
    #print ("decay_core: "+str(decay_core))
    decay_non_core = max(0, min(max_rate, p['base_rate_non_core'] + adjustment_non_core))
    #print ("decay_non_core: "+str(decay_non_core))
    
    decay_rate = p['core_proportion'] * decay_core + (1 - p['core_proportion']) * decay_non_core
    #print ("decay_rate: "+str(decay_rate))
    # make surre it does not go beyond max rate
    return max(0, min(max_rate, decay_rate))

In [10]:
nmd_decay_rate("HISA",   0.03, 0.06,    0  )

0.16

In [11]:
def pvif(r, n):
    """Calculate Present Value Interest Factor for a single sum."""
    return (1 + r) ** -n
        

In [12]:
def calculate_eve(cashflows, market_rate):
    """
    EVE is computed using PVIF for each year of net cashflow (yearly NII),
    equivalent to valuing the earnings stream as if held to maturity.
    """
    eve = 0
    for t in range(len(cashflows)):
        year = t + 1  # because we start counting years from 1
        eve = eve+cashflows[t] * pvif(market_rate, year)
    return eve

In [13]:
np.round(np.arange(-0.02, 0.021, 0.0025), 4)

array([-0.02  , -0.0175, -0.015 , -0.0125, -0.01  , -0.0075, -0.005 ,
       -0.0025, -0.    ,  0.0025,  0.005 ,  0.0075,  0.01  ,  0.0125,
        0.015 ,  0.0175,  0.02  ])

In [14]:
np.arange(-0.2, 0.2 +0.025  , 0.025)

array([-2.00000000e-01, -1.75000000e-01, -1.50000000e-01, -1.25000000e-01,
       -1.00000000e-01, -7.50000000e-02, -5.00000000e-02, -2.50000000e-02,
       -5.55111512e-17,  2.50000000e-02,  5.00000000e-02,  7.50000000e-02,
        1.00000000e-01,  1.25000000e-01,  1.50000000e-01,  1.75000000e-01,
        2.00000000e-01])

In [15]:

current_market_rate = 0.045
shock_range =  np.arange(-0.02, 0.02 +0.0025, 0.0025) # interest rate shock range (-200 bps to +200 bps)
results = []

df = pd.read_csv("Balance_sheet.csv")

original_loans = df[df["Type"] == "Loan"]["Balance"].sum()
original_deposits = df[df["Type"] == "Deposit"]["Balance"].sum() 

# Loop through each shock scenario
for rate_shock in shock_range:
    market_rate = current_market_rate + rate_shock
    total_loan_repay = 0
    total_deposit_repay = 0
    loan_cashflows = []
    deposit_cashflows = []
    
    # year 1 indicator
    current_loan_balance_year1 = 0
    current_deposit_balance_year1 = 0

    interest_cf_year1 = 0   # net interest income in year 1
    loan_interest_year1 = 0  # loan interest earned in year 1
    deposit_interest_year1 = 0 # deposit interest paid in year 1
    prepaid_principal_year1 = 0 # amount of loan prepaid in year 1
    nmd_withdrawal_year1 = 0 # amount withdrawn from NMDs in year 1

    #loop throuh each balance sheet record
    for idx, row in df.iterrows():
        original_balance  =  row["Balance"] 
        var_rate = row["Rate"]
        var_product_type = row["Product_Type"]
        maturity_years =  row["Maturity_Years"] 
        var_rate_type = row.get("Rate_Type")
        
        if np.isnan(maturity_years):
            maturity_years=10         # chequing & saving, no maturity , so use 10 years                 
        else:
            maturity_years = int(maturity_years)  

        if row["Type"] == "Loan":
            cpr = prepayment_rate(var_product_type, var_rate, current_market_rate, rate_shock)
            var_balance = original_balance
            yearly_cf = []

            # simulate cashflows over time with prepayment
            for idx1 in range(maturity_years):
                if var_rate_type == "Floating":
                    rate_used=market_rate  # market rate
                else:
                    rate_used=var_rate     # original rate
                interest = var_balance * rate_used
                principal_paid = original_balance * (cpr * (1 - cpr) ** idx1)

                if idx1 == 0:
                    interest_cf_year1  = interest_cf_year1+interest
                    loan_interest_year1  = loan_interest_year1+ interest
                    prepaid_principal_year1 = prepaid_principal_year1+principal_paid
                    current_loan_balance_year1 = current_loan_balance_year1+original_balance * (1 - cpr)
                    
                yearly_cf.append(interest + principal_paid)
                var_balance = var_balance*(1 - cpr)
                
            loan_cashflows.append(yearly_cf)

        elif row["Type"] == "Deposit" :
            decay=0
            if var_product_type!="GIC":
                decay = nmd_decay_rate(var_product_type, var_rate, current_market_rate, rate_shock)
            var_balance = original_balance
            yearly_cf = []
            
            for idx1 in range(maturity_years):

                if var_rate_type == "Floating":
                    rate_used=market_rate  # market rate
                else:
                    rate_used=var_rate     # original rate
                interest = -var_balance * rate_used
                
                #apply geometric decay to model behavioral withdrawal
                principal_withdraw = original_balance * (decay * (1 - decay) ** idx1)
                 
                if idx1 == 0:
                    interest_cf_year1  = interest_cf_year1+interest
                    deposit_interest_year1 += -interest  #Flip sign to make it positive
                    current_deposit_balance_year1 =current_deposit_balance_year1+ original_balance * (1 - decay)
                    if var_product_type!="GIC":
                        nmd_withdrawal_year1  = nmd_withdrawal_year1+principal_withdraw
                        
                yearly_cf.append(interest + principal_withdraw)
                var_balance =var_balance* (1 - decay)
                
            deposit_cashflows.append(yearly_cf)


    max_years=int(df['Maturity_Years'].max())
    
    #Normalize lists to same length
    for lst in loan_cashflows:
        lst.extend([0] * (max_years - len(lst)))

    # Expand each deposit cashflow list to max_years
    for lst in deposit_cashflows:
        lst.extend([0] * (max_years - len(lst)))
        
    # combine loan and deposit cashflows for total NII and EVE
    total_cf = np.array(loan_cashflows).sum(axis=0) + np.array(deposit_cashflows).sum(axis=0)
    nii_12m  = interest_cf_year1  # year 1 net interest income
    eve = calculate_eve(total_cf, market_rate)

    # calculate net liquidity impact after 1 year
    net_cash_on_hand_y1 = (
        current_deposit_balance_year1   - current_loan_balance_year1   + loan_interest_year1 - deposit_interest_year1
    )
    
    results.append({
        "Market_Rate": market_rate,
        "Shock_bps": round(rate_shock * 10000),
        "Original_Loan": original_loans,
        "Original_Deposit": original_deposits,
        "Current_Loan_Y1": current_loan_balance_year1,
        "Current_Deposit_Y1": current_deposit_balance_year1,
        "NII_12M": nii_12m ,
        "EVE": eve,
        "Prepaid_Principal_Y1": prepaid_principal_year1,
        "NMD_Withdraw_Y1": nmd_withdrawal_year1,
        "Loan_Interest_Y1": loan_interest_year1,
        "Deposit_Interest_Y1": deposit_interest_year1,
        "Net_Cash_On_Hand_Y1": net_cash_on_hand_y1
    })

df_final = pd.DataFrame(results)
df_final.to_csv("Stress_Test_Output.csv", index=False)


