In [1]:
import numpy as np
import pandas as pd
import os
import math
from datetime import date as pdate
import statistics

In [2]:
file_path = os.path.abspath("data/hist_data.xlsm")
notional_stock = 1000000
notional_swap = 100000000
swap_rate = 0.042
day_count_frac = 1

In [3]:
def get_swap_data(file_path):
    """
    read SOFR curve data from excel file 

    Args:
    file_path(str): path to the excel file

    Returns:
    pd.DataFrame: preprocessed SOFR curve data
    """
    df_sofr_curve_raw = pd.read_excel(file_path, sheet_name="SofrCurve", engine = 'openpyxl', parse_dates=False)
    # clean up column names (timestamp)
    df_sofr_curve_raw.columns = [str(col) for col in df_sofr_curve_raw.columns]
    df_sofr_curve_raw.columns = [col.split()[0] if '-' in col else col for col in df_sofr_curve_raw.columns]
    df_sofr_curve_raw.drop(columns=['Tenor'], inplace=True)
    df_sofr_curve_raw['T'] = df_sofr_curve_raw['T'].astype(int) # convert T to int
    # extract out relevant zero curves (1Y-10Y)
    df_sofr_curve = df_sofr_curve_raw.iloc[6:16].copy()
    
    return df_sofr_curve


In [4]:
def get_stocks_data(file_path):
    """
    read stock data from excel file

    Args:
    file_path(str): path to the excel file

    Returns: 
    pd.DataFrame: preprocessed stock data
    """
    sheet_names = ["AAPL", "MSFT", "F", "BAC"]
    df_stocks = {}
    # read stock data in sheet names lst 
    for sheet_name in sheet_names:
        df_stocks[sheet_name] = pd.read_excel(file_path, sheet_name=sheet_name, engine = 'openpyxl', parse_dates=False)
        # rename columns in sheet to avoid dupes 
        df_stocks[sheet_name].rename(columns = {"Adj Close": f"Adj Close_{sheet_name}"}, inplace = True)
    # merge stock data into dataframe based on date
    df_stocks_merged = pd.merge(df_stocks["AAPL"], df_stocks["MSFT"], on="Date", how="inner")
    df_stocks_merged = pd.merge(df_stocks_merged, df_stocks["F"], on="Date", how="inner")
    df_stocks_merged = pd.merge(df_stocks_merged, df_stocks["BAC"], on="Date", how="inner")

    df_stocks_merged['Date'] = pd.to_datetime(df_stocks_merged['Date'])

    # calculate relative return for each stock
    for col in df_stocks_merged.columns:
        if "Adj Close" in col:  # Apply only to price columns
            df_stocks_merged[f"Return_{col.split('_')[-1]}"] = df_stocks_merged[col].pct_change()
    
    df_stocks_merged = df_stocks_merged.iloc[:,5:] # filter returns only
    df_stocks_merged = df_stocks_merged.iloc[1:] # remove the first row since it is NaN

    return df_stocks_merged

In [5]:
def get_discount_curve(zero_rates, tenors):
    """
    discount factors based on zero rates

    Args:
    zero_rates(list): list of zero rates
    tenors(list): list of tenors

    Returns:
    np.array: discount factors
    """
    assert len(zero_rates) == len(tenors), f"Expect {len(tenors)} zero rates, got {len(zero_rates)}."
    Z = np.array(zero_rates)
    T = np.array(tenors)
    return np.exp(-Z*T)


def get_forward_curve(zero_rates, tenors, day_count_frac=1):
    """
    forward rates based on zero rates

    Args:
    zero_rates(list): list of zero rates
    tenors(list): list of tenors
    day_count_frac(float): day count fraction   

    Returns:
    np.array: forward rates
    """
    DF = get_discount_curve(zero_rates, tenors)
    DF_start = np.concatenate([[1], DF[:-1]]) # first DF=1. we dont consider forward swap here
    DF_end = DF
    F = (DF_start - DF_end) / (DF_end * day_count_frac)
    return F
    

def get_payer_swap_pv(zero_rates, tenors, forward_rates=None, swap_rate=0.042, day_count_frac=1, notional=100000000):
    """
    calculate the present value of a payer swap

    Args:
    zero_rates(list): list of zero rates
    tenors(list): list of tenors
    forward_rates(list): list of forward rates
    swap_rate(float): fixed swap rate
    day_count_frac(float): day count fraction
    notional(float): notional amount

    Returns:
    float: present value of a swap
    """
    DF = get_discount_curve(zero_rates, tenors)
    F = get_forward_curve(zero_rates, tenors) if forward_rates is None else np.array(forward_rates)
    pv_fix = swap_rate*sum(day_count_frac*DF)
    pv_flt = sum(day_count_frac*F*DF)
    return notional*(pv_flt - pv_fix)


def swap_pnl_1d_full(zero_rates_t0, zero_rates_t1, tenors):
    """
    1d PNL of swap - full revaluation

    Args:
    zero_rates_t0(list): list of zero rates at t0 (current zero rates)
    zero_rates_t1(list): list of zero rates at t1 (previous historical zero rates)
    tenors(list): list of tenors

    Returns:
    float: 1d PNL of swap
    """

    pv_t0 = get_payer_swap_pv(zero_rates_t0, tenors)
    pv_t1 = get_payer_swap_pv(zero_rates_t1, tenors)
    return pv_t1 - pv_t0


def swap_pnl_1d_sens(zero_rates_t0, zero_rates_chng, tenors, swap_rate = 0.042, notional=100*1000000):
    """
    1d PNL of swap - sensitivity based approach

    Args:
    zero_rates_t0(list): list of zero rates at t0 
    zero_rates_chng(list): list of zero rates change
    tenors(list): list of tenors
    swap_rate(float): fixed swap rate
    notional(float): notional amount

    Returns:
    float: 1d PNL of swap
    """ 
    zero_rates_chng = np.array(zero_rates_chng)
    tenors = np.array(tenors)
    DF_t0 = get_discount_curve(zero_rates_t0, tenors) # exponential discount factor
    W = (notional * swap_rate * tenors * DF_t0)# weight of risk factors in PnL sensitivity
    W[-1] = notional * (1+swap_rate) * tenors[-1] * DF_t0[-1]
    return W @ zero_rates_chng

In [6]:
df_stocks_merged = get_stocks_data(file_path)
df_sofr_curve = get_swap_data(file_path)
df_sofr_curve.set_index(df_sofr_curve.iloc[:,0].values)
ten_year_sofr_curve = df_sofr_curve.drop(columns=["T"]).T
ten_year_sofr_curve.columns = df_sofr_curve['T']

In [7]:
tenors = df_sofr_curve.iloc[:,0].values
sofr_curve_t0 = ten_year_sofr_curve.iloc[-1]
pct_change_sofr_rates = ten_year_sofr_curve.diff().dropna().values

swap_hist_full_pnl = []
swap_hist_sensi_pnl = []

for current_sofr_curve in pct_change_sofr_rates:
    zero_rates_delta = np.array(current_sofr_curve)
    zero_rates_t1 = np.array(sofr_curve_t0) + zero_rates_delta
    pnl_full_evaulation = swap_pnl_1d_full(
        zero_rates_t0=sofr_curve_t0,
        zero_rates_t1=zero_rates_t1,
        tenors=tenors
    )
    pnl_swap_evaluation = swap_pnl_1d_sens(
        zero_rates_t0=sofr_curve_t0,
        zero_rates_chng=zero_rates_delta,
        tenors=tenors
    )
    swap_hist_full_pnl.append(pnl_full_evaulation)
    swap_hist_sensi_pnl.append(pnl_swap_evaluation)

In [8]:
# stocks full revaluation 1d pnl evaluation 
def stocks_pnl1d_full(stocks_returns, w = [1e6, 1e6, 1e6, 1e6]):
    """
    stocks_returns: DataFrame of stocks returns
    w: list of notional weights
    """
    return (w[0]*((1+stocks_returns[0])-1) + w[1]*((1+stocks_returns[1])-1) + w[2]*((1+stocks_returns[2])-1) + w[3]*((1+stocks_returns[3])-1))

# stocks sensitivity based 1d pnl evaluation 
def stocks_pnl1d_sensi(stocks_returns, w = [1e6, 1e6, 1e6, 1e6]):
    """
    stocks_returns: DataFrame of stocks returns
    w: list of notional weights
    """
    return stocks_returns @ w

In [9]:
# historical VaR
confidence_level = 95

stocks_pnl1d_full_hist = [stocks_pnl1d_full(s) for s in df_stocks_merged.values]   
stocks_pnl1d_sensi_hist = [stocks_pnl1d_sensi(s) for s in df_stocks_merged.values]

pnl1d_full_hist = np.array(stocks_pnl1d_full_hist) + np.array(swap_hist_full_pnl)
var1d_full_hist = np.abs(np.percentile(pnl1d_full_hist, 100-confidence_level))

pnl1d_sensi_hist = np.array(stocks_pnl1d_sensi_hist) + (np.array(swap_hist_sensi_pnl).reshape(-1,1)) 
var1d_sensi_hist = np.abs(np.percentile(pnl1d_sensi_hist, 100-confidence_level))

print("")
print("")
print("============================================================================================================================")
print("Historical VaR:")
print(f"VaR [1d, {confidence_level}%], Full Revaluation: {var1d_full_hist:,.0f}") 
print(f"VaR [1d, {confidence_level}%], Sensitivity: {var1d_sensi_hist:,.0f}") 
print("============================================================================================================================")




Historical VaR:
VaR [1d, 95%], Full Revaluation: 966,173
VaR [1d, 95%], Sensitivity: 972,734
