In [1]:
import numpy as np
import pandas as pd
import time
from scipy.linalg import solve, kron
from scipy import stats
from scipy.stats import norm
import math
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

## Backtest by rolling window

### parameters

In [2]:
wsdata = '/Users/taka/Library/CloudStorage/GoogleDrive-ktaka8128@gmail.com/My Drive/Princeton/Spring2025/COS513/Final/'
inputfile = 'daily_smith_wilson_20000101_.csv'

In [None]:
# initial parameters
para0 = np.concatenate((
    np.array([0.26]),
    np.array([-3.46944695e-17,  7.50000000e-02, 1.80000000e-02,  7.60000000e-02,  9.30000000e-02,  6.40000000e-02, -3.46944695e-18,  6.00000000e-02,  1.20000000e-01,]),
    np.array([9.93753000e-01,9.92190000e-01,  9.97835000e-01]),
    np.array([2.37000000e+00, -2.29500000e+00,-2.65000000e+00]),
    np.array([8.40000000e-02, -8.10000000e-02,  2.80000000e-02,-1.19000000e-01, -1.10000000e-01,  1.36000000e-01])
))
paraStep = np.concatenate((
    np.array([-30]),
    np.repeat(-3, 6),
    np.repeat(-2, 3),
    np.repeat(-4, 3),
    np.repeat(-2, 3),
    np.repeat(-3, 6)
))
maturities = ['1','5','7','10','15','20','25','30','40']
maturities_float = np.array([float(m) for m in maturities])
maturities2 = np.arange(1, 51) # for generating yield curves from the calibrated parameters
maturities2_float = np.array([float(m) for m in maturities2])
grid = np.array([1, 5, 7, 10, 15, 20, 25, 30, 40])
gridn = len(grid)
lambdastep = np.arange(0.21, 0.32 + 0.01, 0.01)
maxstepn = 10000
initial_lambdastep = np.arange(0.28, 0.32, 0.1)

### Helper funcations

In [4]:
# utility functions
def lyapunov(N, phi, Q):
    # Lyapunov解： P = solve((I - kron(phi,phi)), vec(Q)) を N×N行列にリシェイプ
    I = np.eye(N * N)
    K_mat = np.kron(phi, phi)
    vecP = np.linalg.solve(I - K_mat, Q.flatten())
    return vecP.reshape(N, N)

def Nelson_Siegel_factor_loadings(l, m):
    # m はリストまたは配列
    m = np.array(m)
    column1 = np.ones(len(m))
    column2 = (1 - np.exp(-l * m)) / (l * m)
    column3 = column2 - np.exp(-l * m)
    lambmat = np.column_stack((column1, column2, column3))
    return lambmat

def Kfilter(logLik, N, T, Y, Z, a_t, P_t, H, a_tt, P_tt, v2, v1, phi, mu, Q, prev, M, Yf, lik):
    for t in range(T):
        v = Y[t, :] - Z @ a_t[t, :]
        F = Z @ P_t[t, :, :] @ Z.T + H
        detF = np.linalg.det(F)
        if detF <= 1e-30 or np.isnan(detF) or np.isinf(detF):
            logLik = -10**15
            break
        else:
            F_inv = np.linalg.inv(F)
            logLik = logLik - 0.5 * (np.log(detF) + v.T @ F_inv @ v)
        a_tt[t, :] = a_t[t, :] + P_t[t, :, :] @ Z.T @ F_inv @ v
        P_tt[t, :, :] = P_t[t, :, :] - P_t[t, :, :] @ Z.T @ F_inv @ Z @ P_t[t, :, :]
        v1[t, :] = Z @ a_tt[t, :]
        v2[t, :] = Y[t, :] - Z @ a_tt[t, :]
        a_t[t+1, :] = phi @ a_tt[t, :] + (np.eye(N) - phi) @ mu.flatten()
        P_t[t+1, :, :] = phi @ P_tt[t, :, :] @ phi.T + Q

    if prev:
        # ループが終了した時点は、最後の時点 t0 = T-1 であると考える
        t0 = T - 1
        for m_i in range(1, M+1):
            Yf[t0 + m_i, :] = Z @ a_t[t0 + m_i, :]
            a_tt[t0 + m_i, :] = a_t[t0 + m_i, :]
            P_tt[t0 + m_i, :, :] = P_t[t0 + m_i, :, :]
            a_t[t0 + m_i + 1, :] = phi @ a_tt[t0 + m_i, :] + (np.eye(N) - phi) @ mu.flatten()
            P_t[t0 + m_i + 1, :, :] = phi @ P_tt[t0 + m_i, :, :] @ phi.T + Q

    if lik:
        return -logLik
    else:
        return {'a_tt': a_tt, 'a_t': a_t, 'P_tt': P_tt, 'P_t': P_t, 'v2': v2, 'v1': v1, 'Yf': Yf}

def kalman(para, Y, lik, prev, ahead, grid):
    # para: パラメータベクトル
    l = para[0]
    m = grid  # gridはNelson–Siegel用の m の値のリスト
    mlen = len(m)
    M = ahead
    if prev:
        T = Y.shape[0]
        Yf = Y.copy()
        Yf[T-M:T, :] = np.nan
        Y = Y[:T-M, :]
        T = Y.shape[0]
    else:
        T = Y.shape[0]
        Yf = None
    
    pars = {}
    W = Y.shape[1]
    N = 3
    pars['mu'] = np.full((N, 1), np.nan)
    pars['phi'] = np.eye(N)
    pars['H'] = np.eye(W)
    pars['Q'] = np.eye(N)
    
    # Loading matrix
    pars['Z'] = Nelson_Siegel_factor_loadings(l, m)
    
    for i in range(mlen):
        pars['H'][i, i] = para[1 + i]
    H = pars['H'] ** 2
    
    # VAR(1) coefficient matrix
    pars['phi'][0, 0] = para[mlen+1]
    pars['phi'][0, 1] = 0
    pars['phi'][0, 2] = 0
    pars['phi'][1, 0] = 0
    pars['phi'][1, 1] = para[mlen+2]
    pars['phi'][1, 2] = 0
    pars['phi'][2, 0] = 0
    pars['phi'][2, 1] = 0
    pars['phi'][2, 2] = para[mlen+3]
    
    pars['mu'][0, 0] = para[mlen+4]
    pars['mu'][1, 0] = para[mlen+5]
    pars['mu'][2, 0] = para[mlen+6]
    
    pars['Q'][0, 0] = para[mlen+7]
    pars['Q'][1, 0] = para[mlen+8]
    pars['Q'][1, 1] = para[mlen+9]
    pars['Q'][2, 0] = para[mlen+10]
    pars['Q'][2, 1] = para[mlen+11]
    pars['Q'][2, 2] = para[mlen+12]
    
    Q = pars['Q'] @ pars['Q'].T
    
    # Initialize matrices
    if prev:
        a_t = np.full((T+M, N), np.nan)
        a_tt = np.full((T, N), np.nan)
        P_t = np.full((T+M, N, N), np.nan)
        P_tt = np.full((T, N, N), np.nan)
    else:
        a_t = np.full((T+1, N), np.nan)
        a_tt = np.full((T, N), np.nan)
        P_t = np.full((T+1, N, N), np.nan)
        P_tt = np.full((T, N, N), np.nan)
    v1 = np.full((T, W), np.nan)
    v2 = np.full((T, W), np.nan)
    
    if prev and Yf is None:
        Yf = Y.copy()
    
    a_t[0, :] = pars['mu'].flatten()
    P_t[0, :, :] = lyapunov(N, pars['phi'], Q)
    
    logLik = -0.5 * T * W * math.log(2 * math.pi)
    
    out = Kfilter(logLik, N, T, Y, pars['Z'], a_t, P_t, H, a_tt, P_tt, v2, v1, pars['phi'], pars['mu'], Q, prev, M, Yf, lik)
    return out

def ns_loadings(tau, lam):
    level = 1.0
    slope = (1 - np.exp(-lam * tau)) / (lam * tau) if tau > 1e-12 else 1.0
    curvature = slope - np.exp(-lam * tau)
    return np.array([level, slope, curvature])

def lyapunov(N, phi, Q):
    I = np.eye(N * N)
    K_mat = np.kron(phi, phi)
    vecP = np.linalg.solve(I - K_mat, Q.flatten())
    return vecP.reshape(N, N)

def Nelson_Siegel_factor_loadings(l, m):
    m = np.array(m)
    column1 = np.ones(len(m))
    column2 = (1 - np.exp(-l * m)) / (l * m)
    column3 = column2 - np.exp(-l * m)
    return np.column_stack((column1, column2, column3))

def compute_yield_curve(beta, lam, tau_array):
    return np.array([beta @ ns_loadings(t, lam) for t in tau_array])

def build_M(K, Sigma):
    SS = Sigma @ Sigma.T
    K_diag = np.array(K)
    out = np.zeros((3, 3))
    for i in range(3):
        for j in range(3):
            kij = K_diag[i] + K_diag[j]
            out[i, j] = (1 - np.exp(-kij)) / kij if kij > 1e-12 else 0.0
    SSout = SS * out
    M = np.linalg.cholesky(SSout)
    return M

def build_N_diag(LOT, a, b):
    return np.diag([LOT, a, b])

def sum_slope_curve(lam, LOT):
    s = 0.0
    for tau in range(1, LOT+1):
        s += ns_loadings(tau, lam)[1]
    return s

def sum_curvature_curve(lam, LOT):
    s = 0.0
    for tau in range(1, LOT+1):
        s += ns_loadings(tau, lam)[2]
    return s

def calc_theta(lam, LOT, M_, e1, e2):
    sum_h1 = 0.0
    sum_h2 = 0.0
    for tau in range(1, LOT+1):
        load = ns_loadings(tau, lam)
        h1 = load @ (M_ @ e1)
        h2 = load @ (M_ @ e2)
        sum_h1 += h1
        sum_h2 += h2
    if abs(sum_h1) < 1e-12:
        return np.pi/2 if sum_h2 > 0 else -np.pi/2
    return np.arctan(sum_h2/sum_h1)

def level_up_scenario(K, Sigma, lam, LOT):
    M_ = build_M(K, Sigma)
    a_ = sum_slope_curve(lam, LOT)
    b_ = sum_curvature_curve(lam, LOT)
    N_ = build_N_diag(LOT, a_, b_)
    NM = N_ @ M_
    NtN = NM @ NM.T
    eigvals, eigvecs = np.linalg.eig(NtN)
    idx = np.argsort(eigvals)[::-1]
    e1 = eigvecs[:, idx[0]]
    e2 = eigvecs[:, idx[1]]
    theta_ = calc_theta(lam, LOT, M_, e1, e2)
    Me1 = M_ @ e1
    Me2 = M_ @ e2
    sl_vec = np.cos(theta_) * Me1 + np.sin(theta_) * Me2
    load_LOT = ns_loadings(LOT, lam)
    shift_LOT = np.dot(sl_vec, load_LOT)
    s_ = 1.0 if shift_LOT >= 0 else -1.0
    return sl_vec, s_, theta_

def twist_scenario_parameters(K, Sigma, lam, LOT):
    # Level Upシナリオと同様に M, N, 固有値分解を行う
    M_ = build_M(K, Sigma)
    a_ = sum_slope_curve(lam, LOT)
    b_ = sum_curvature_curve(lam, LOT)
    N_ = build_N_diag(LOT, a_, b_)
    NM = N_ @ M_
    NtN = NM @ NM.T
    eigvals, eigvecs = np.linalg.eig(NtN)
    idx = np.argsort(eigvals)[::-1]
    e1 = eigvecs[:, idx[0]]
    e2 = eigvecs[:, idx[1]]
    theta_ = calc_theta(lam, LOT, M_, e1, e2)
    # twist_vector の推計： Level Up の式とは異なり、符号が逆転する
    twist_vector = np.cos(theta_)*(M_ @ e2) - np.sin(theta_)*(M_ @ e1)
    return twist_vector, theta_


In [12]:
# calibration utilizing previous lambda
def calibrate_model(train_data, prev_lambda=None, num_lambda=3):
    """
    DNSモデルのカルマンフィルタのキャリブレーション
      - 初回は固定のlambdastepを用いる
      - 前日の最適λが与えられた場合、その前後0.1の範囲内で num_lambda 個の候補を作成して探索する
    """
    resultsall = []
    results = []
    
    # λの探索候補の設定
    if prev_lambda is None:
        lambdastep = initial_lambdastep.copy()
    else:
        lambdastep = np.linspace(prev_lambda - 0.02, prev_lambda + 0.02,5)
    
    for k in lambdastep:
        para0[0] = k
        print("Trying Lambda:", k)
        startTime = time.time()
        NoP = len(para0)
        # 初期尤度計算
        lik0 = kalman(para=para0, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
        paraNext = para0.copy()
        likNext = lik0
        
        # 座標方向の更新ループ
        for j in range(1, maxstepn+1):
            if j % 1000 == 0:
                print("Iteration:", j)
            i = j % NoP  # 更新するパラメータのインデックス
            paraP = paraNext.copy()
            paraM = paraNext.copy()
            paraP[i] = paraNext[i] + 10.0**(paraStep[i])
            paraM[i] = paraNext[i] - 10.0**(paraStep[i])
            
            likP = kalman(para=paraP, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
            likM = kalman(para=paraM, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
            
            if likP < likNext:
                paraNext = paraP.copy()
                likNext = likP
            elif likM < likNext:
                paraNext = paraM.copy()
                likNext = likM
            
            resultsall.append(np.concatenate(([j], paraNext, [likNext])))
            # 収束判定：直前の更新と尤度が同じ場合
            if j > NoP+1 and abs(resultsall[-1][gridn+15-1] - resultsall[-NoP][gridn+15-1]) < 1e-2: #j > NoP+1 and resultsall[-1][gridn+15-1] == resultsall[-NoP][gridn+15-1]:
                break
        
        results.append(np.concatenate(([j], paraNext, [likNext])))
        endTime = time.time()
        print("Lambda", k, "calibration time:", endTime - startTime)
    
    resultsall = np.array(resultsall)
    results = np.array(results)
    min_index = np.argmin(results[:, gridn+15-1])
    minresults_test = results[min_index, :]
    return minresults_test

In [6]:
def calibrate_params_no_lambda(train_data, fixed_lambda):
    """
    estimate parameters other than lambda
    """
    para = para0.copy()
    para[0] = fixed_lambda
    likNext = kalman(para=para, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
    
    NoP = len(para)   # 22
    lik_history = []
    
    for j in range(1, maxstepn+1):
        i = (j % (NoP-1)) + 1
        
        paraP = para.copy()
        paraM = para.copy()
        paraP[i] += 10.0**(paraStep[i])
        paraM[i] -= 10.0**(paraStep[i])
        
        likP = kalman(para=paraP, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
        likM = kalman(para=paraM, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
        
        if likP < likNext:
            para, likNext = paraP, likP
        elif likM < likNext:
            para, likNext = paraM, likM
        
        lik_history.append(likNext)
        if j > (NoP-1)+1 and abs(lik_history[-1] - lik_history[-(NoP-1)]) < 1e-2:
            break
    
    return para


def online_update_params(prev_para, train_data):
    """
    weekly online update
    """
    para = prev_para.copy()
    likNext = kalman(para=para, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
    
    NoP = len(para)
    # 1～NoP-1 の各パラメータを一度だけ試行
    for i in range(1, NoP):
        paraP = para.copy()
        paraM = para.copy()
        paraP[i] += 10.0**(paraStep[i])
        paraM[i] -= 10.0**(paraStep[i])
        
        likP = kalman(para=paraP, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
        likM = kalman(para=paraM, Y=train_data, lik=True, prev=False, ahead=1, grid=grid)
        
        if likP < likNext:
            para, likNext = paraP, likP
        elif likM < likNext:
            para, likNext = paraM, likM
    
    return para


In [7]:
# calculate yield curves for ICS risk calculation
def calculate_yield_curves(para_opt, LSC_result, maturities):
    lambda_val = para_opt[0]
    K_est = (np.ones(3) - para_opt[10:13]) * tstep
    mu_est = para_opt[13:16] / 100
    
    # Q行列の構築（下三角パラメータから正定値行列へ）
    Q_mat = np.zeros((3, 3))
    Q_mat[0, 0] = para_opt[16] * np.sqrt(tstep) / 100
    Q_mat[1, 0] = para_opt[17] * np.sqrt(tstep) / 100
    Q_mat[1, 1] = para_opt[18] * np.sqrt(tstep) / 100
    Q_mat[2, 0] = para_opt[19] * np.sqrt(tstep) / 100
    Q_mat[2, 1] = para_opt[20] * np.sqrt(tstep) / 100
    Q_mat[2, 2] = para_opt[21] * np.sqrt(tstep) / 100

    beta_calibrated = LSC_result['a_tt'][-1, :] / 100
    base_yields = compute_yield_curve(beta_calibrated, lambda_val, maturities)
    
    # Mean Reversion シナリオ
    LOT = 40
    Delta = (1 - np.exp(-K_est)) * (mu_est - beta_calibrated)
    stressed_yields_mr = base_yields + np.array([np.dot(Delta, ns_loadings(t, lambda_val))
                                                 for t in maturities])
    
    # Level Up / Down シナリオ
    sl_vector, s_sign, theta_val = level_up_scenario(K_est, Q_mat, lambda_val, LOT)
    n_inv = norm.ppf(0.995)
    shift_curve_level = np.array([s_sign * n_inv * np.dot(sl_vector, ns_loadings(t, lambda_val))
                                  for t in maturities])
    stressed_yields_level_up   = base_yields + shift_curve_level
    stressed_yields_level_down = base_yields - shift_curve_level

    # Twist シナリオ
    twist_vector, theta_twist = twist_scenario_parameters(K_est, Q_mat, lambda_val, LOT)
    stressed_yields_twist_up   = base_yields + np.array([np.dot(twist_vector, ns_loadings(t, lambda_val))
                                                         for t in maturities])
    stressed_yields_twist_down = base_yields - np.array([np.dot(twist_vector, ns_loadings(t, lambda_val))
                                                         for t in maturities])
    yields_dict = {
        "base": base_yields,
        "mean_reversion": stressed_yields_mr,
        "level_up": stressed_yields_level_up,
        "level_down": stressed_yields_level_down,
        "twist_up": stressed_yields_twist_up,
        "twist_down": stressed_yields_twist_down,
    }
    return yields_dict

### 1-day risk

In [7]:
data0 = pd.read_csv(wsdata + inputfile, header=0, index_col=0, encoding='cp1252')
data0.index = pd.to_datetime(data0.index)
data0 = data0[maturities]
data = data0.values * 100

In [None]:
tstep = 1

backtest_start_date = '2000-01-04'
backtest_end_date   = '2024-12-31'
window_size       = 504
forecast_horizon  = 1 # should be 1 at the calibarion, time horizon of risk calculation is adjusted by risk calculation phase

backtest_dates = data0.loc[backtest_start_date:backtest_end_date].index
num_windows = len(backtest_dates) - window_size - forecast_horizon + 1


calibrate_lambda_freq = 63
num_lambda = 5

yield_curves_data = []
calibrated_params_list = []
prev_lambda = None

for i in range(num_windows):
    window_start = backtest_dates[i]
    window_end   = backtest_dates[i + window_size - 1]
    test_date    = backtest_dates[i + window_size]
    
    print(f"Window {i+1}/{num_windows}: Train {window_start.date()} to {window_end.date()}, Test: {test_date.date()}")
    train_data = data0.loc[window_start:window_end].values * 100
    
    if i % calibrate_lambda_freq == 0 or prev_lambda is None:
        # search λ based on the previous λ to reduce the search space
        minresults = calibrate_model(train_data, prev_lambda=prev_lambda, num_lambda=num_lambda)
        para_opt = minresults[1:23]
        prev_lambda = para_opt[0]
        print(f"Calibrated new lambda: {prev_lambda}")
    else:
        # reuse the previous λ to reduce the search space (λ does not change much and not having serious impact to the risk calculation)
        para_opt = calibrated_params_list[-1]
        print(f"Reusing previous lambda: {prev_lambda}")
    
    calibrated_params_list.append(para_opt)
    
    LSC_result = kalman(para=para_opt, Y=train_data, lik=False, prev=False, ahead=forecast_horizon, grid=grid)
    
    yields_dict = calculate_yield_curves(para_opt, LSC_result, maturities2_float)
    
    for j, mat in enumerate(maturities2_float):
         yield_curves_data.append({
             'Test_Date': test_date,
             'Maturity': mat,
             'Base Yield': yields_dict["base"][j],
             'Mean Reversion': yields_dict["mean_reversion"][j],
             'Level Up': yields_dict["level_up"][j],
             'Level Down': yields_dict["level_down"][j],
             'Twist Up-to-Down': yields_dict["twist_up"][j],
             'Twist Down-to-Up': yields_dict["twist_down"][j]
         })

yield_curves_df = pd.DataFrame(yield_curves_data)
yield_curves_df.to_csv("DNS40_ICS_1-day_risk_yields.csv", index=False)

Window 1/5625: Train 2000-01-04 to 2002-01-18, Test: 2002-01-21
Trying Lambda: 0.28
Iteration: 1000
Lambda 0.28 calibration time: 124.38686227798462
Calibrated new lambda: 0.28
Window 2/5625: Train 2000-01-05 to 2002-01-21, Test: 2002-01-22
Reusing previous lambda: 0.28
Window 3/5625: Train 2000-01-06 to 2002-01-22, Test: 2002-01-23
Reusing previous lambda: 0.28
Window 4/5625: Train 2000-01-07 to 2002-01-23, Test: 2002-01-24
Reusing previous lambda: 0.28
Window 5/5625: Train 2000-01-11 to 2002-01-24, Test: 2002-01-25
Reusing previous lambda: 0.28
Window 6/5625: Train 2000-01-12 to 2002-01-25, Test: 2002-01-28
Reusing previous lambda: 0.28
Window 7/5625: Train 2000-01-13 to 2002-01-28, Test: 2002-01-29
Reusing previous lambda: 0.28
Window 8/5625: Train 2000-01-14 to 2002-01-29, Test: 2002-01-30
Reusing previous lambda: 0.28
Window 9/5625: Train 2000-01-17 to 2002-01-30, Test: 2002-01-31
Reusing previous lambda: 0.28
Window 10/5625: Train 2000-01-18 to 2002-01-31, Test: 2002-02-01
Reusin

### 1-week risk (using daily data)

In [None]:
data0 = pd.read_csv(wsdata + inputfile, header=0, index_col=0, encoding='cp1252')
data0.index = pd.to_datetime(data0.index)
data0 = data0[maturities]
data = data0.values * 100

In [None]:
tstep = 5

backtest_start_date = '2000-01-04'
backtest_end_date   = '2024-12-31'
window_size       = 504
forecast_horizon  = 1 # should be 1 at the calibarion, time horizon of risk calculation is adjusted by risk calculation phase

backtest_dates = data0.loc[backtest_start_date:backtest_end_date].index
num_windows = len(backtest_dates) - window_size - forecast_horizon + 1


calibrate_lambda_freq = 63
num_lambda = 5

yield_curves_data = []
calibrated_params_list = []
prev_lambda = None

for i in range(num_windows):
    window_start = backtest_dates[i]
    window_end   = backtest_dates[i + window_size - 1]
    test_date    = backtest_dates[i + window_size]
    
    print(f"Window {i+1}/{num_windows}: Train {window_start.date()} to {window_end.date()}, Test: {test_date.date()}")
    train_data = data0.loc[window_start:window_end].values * 100
    
    if i % calibrate_lambda_freq == 0 or prev_lambda is None:
        # search λ based on the previous λ to reduce the search space
        minresults = calibrate_model(train_data, prev_lambda=prev_lambda, num_lambda=num_lambda)
        para_opt = minresults[1:23]
        prev_lambda = para_opt[0]
        print(f"Calibrated new lambda: {prev_lambda}")
    else:
        # reuse the previous λ to reduce the search space (λ does not change much and not having serious impact to the risk calculation)
        para_opt = calibrated_params_list[-1]
        print(f"Reusing previous lambda: {prev_lambda}")
    
    calibrated_params_list.append(para_opt)
    
    LSC_result = kalman(para=para_opt, Y=train_data, lik=False, prev=False, ahead=forecast_horizon, grid=grid)
    
    yields_dict = calculate_yield_curves(para_opt, LSC_result, maturities2_float)
    
    for j, mat in enumerate(maturities2_float):
         yield_curves_data.append({
             'Test_Date': test_date,
             'Maturity': mat,
             'Base Yield': yields_dict["base"][j],
             'Mean Reversion': yields_dict["mean_reversion"][j],
             'Level Up': yields_dict["level_up"][j],
             'Level Down': yields_dict["level_down"][j],
             'Twist Up-to-Down': yields_dict["twist_up"][j],
             'Twist Down-to-Up': yields_dict["twist_down"][j]
         })

yield_curves_df = pd.DataFrame(yield_curves_data)
yield_curves_df.to_csv("DNS40_ICS_1-week_risk_yields.csv", index=False)

Window 1/5625: Train 2000-01-04 to 2002-01-18, Test: 2002-01-21
Trying Lambda: 0.28
Iteration: 1000
Lambda 0.28 calibration time: 72.15206599235535
Calibrated new lambda: 0.28
Window 2/5625: Train 2000-01-05 to 2002-01-21, Test: 2002-01-22
Reusing previous lambda: 0.28
Window 3/5625: Train 2000-01-06 to 2002-01-22, Test: 2002-01-23
Reusing previous lambda: 0.28
Window 4/5625: Train 2000-01-07 to 2002-01-23, Test: 2002-01-24
Reusing previous lambda: 0.28
Window 5/5625: Train 2000-01-11 to 2002-01-24, Test: 2002-01-25
Reusing previous lambda: 0.28
Window 6/5625: Train 2000-01-12 to 2002-01-25, Test: 2002-01-28
Reusing previous lambda: 0.28
Window 7/5625: Train 2000-01-13 to 2002-01-28, Test: 2002-01-29
Reusing previous lambda: 0.28
Window 8/5625: Train 2000-01-14 to 2002-01-29, Test: 2002-01-30
Reusing previous lambda: 0.28
Window 9/5625: Train 2000-01-17 to 2002-01-30, Test: 2002-01-31
Reusing previous lambda: 0.28
Window 10/5625: Train 2000-01-18 to 2002-01-31, Test: 2002-02-01
Reusing

### monthly risk (using weekly data)

In [None]:
data1 = pd.read_csv(wsdata + inputfile, header=0, index_col=0, encoding='cp1252')
data1.index = pd.to_datetime(data1.index)
data1 = data1.resample("W-FRI").last()
data1 = data1.fillna(method='ffill')
data1 = data1[maturities]
data2 = data1.values * 100

In [None]:
tstep = 4

backtest_start_date = '2000-01-04'
backtest_end_date   = '2024-12-31'
window_size       = 104
forecast_horizon  = 1 # should be 1 at the calibarion, time horizon of risk calculation is adjusted by risk calculation phase

backtest_dates = data1.loc[backtest_start_date:backtest_end_date].index
num_windows = len(backtest_dates) - window_size - forecast_horizon + 1


calibrate_lambda_freq = 12
num_lambda = 5

yield_curves_data = []
calibrated_params_list = []
prev_lambda = None

for i in range(num_windows):
    window_start = backtest_dates[i]
    window_end   = backtest_dates[i + window_size - 1]
    test_date    = backtest_dates[i + window_size]
    
    print(f"Window {i+1}/{num_windows}: Train {window_start.date()} to {window_end.date()}, Test: {test_date.date()}")
    train_data = data1.loc[window_start:window_end].values * 100
    
    if i % calibrate_lambda_freq == 0 or prev_lambda is None:
        # search λ based on the previous λ to reduce the search space
        minresults = calibrate_model(train_data, prev_lambda=prev_lambda, num_lambda=num_lambda)
        para_opt = minresults[1:23]
        prev_lambda = para_opt[0]
        print(f"Calibrated new lambda: {prev_lambda}")
    else:
        # reuse the previous λ to reduce the search space (λ does not change much and not having serious impact to the risk calculation)
        para_opt = calibrated_params_list[-1]
        print(f"Reusing previous lambda: {prev_lambda}")
    
    calibrated_params_list.append(para_opt)
    
    LSC_result = kalman(para=para_opt, Y=train_data, lik=False, prev=False, ahead=forecast_horizon, grid=grid)
    
    yields_dict = calculate_yield_curves(para_opt, LSC_result, maturities2_float)
    
    for j, mat in enumerate(maturities2_float):
         yield_curves_data.append({
             'Test_Date': test_date,
             'Maturity': mat,
             'Base Yield': yields_dict["base"][j],
             'Mean Reversion': yields_dict["mean_reversion"][j],
             'Level Up': yields_dict["level_up"][j],
             'Level Down': yields_dict["level_down"][j],
             'Twist Up-to-Down': yields_dict["twist_up"][j],
             'Twist Down-to-Up': yields_dict["twist_down"][j]
         })

yield_curves_df = pd.DataFrame(yield_curves_data)
yield_curves_df.to_csv("DNS40_ICS_1-month_risk_yields.csv", index=False)

Window 1/1200: Train 2000-01-07 to 2001-12-28, Test: 2002-01-04
Trying Lambda: 0.28
Iteration: 1000
Lambda 0.28 calibration time: 14.22026014328003
Calibrated new lambda: 0.28
Window 2/1200: Train 2000-01-14 to 2002-01-04, Test: 2002-01-11
Reusing previous lambda: 0.28
Window 3/1200: Train 2000-01-21 to 2002-01-11, Test: 2002-01-18
Reusing previous lambda: 0.28
Window 4/1200: Train 2000-01-28 to 2002-01-18, Test: 2002-01-25
Reusing previous lambda: 0.28
Window 5/1200: Train 2000-02-04 to 2002-01-25, Test: 2002-02-01
Reusing previous lambda: 0.28
Window 6/1200: Train 2000-02-11 to 2002-02-01, Test: 2002-02-08
Reusing previous lambda: 0.28
Window 7/1200: Train 2000-02-18 to 2002-02-08, Test: 2002-02-15
Reusing previous lambda: 0.28
Window 8/1200: Train 2000-02-25 to 2002-02-15, Test: 2002-02-22
Reusing previous lambda: 0.28
Window 9/1200: Train 2000-03-03 to 2002-02-22, Test: 2002-03-01
Reusing previous lambda: 0.28
Window 10/1200: Train 2000-03-10 to 2002-03-01, Test: 2002-03-08
Reusing

In [11]:
yield_curves_df.to_csv("DNS40_ICSmonthly_risk_yields.csv", index=False)

### 1-year (using weekly data)

In [17]:
# load the data
data_y = pd.read_csv(wsdata + inputfile, header=0, index_col=0, encoding='cp1252')
data_y.index = pd.to_datetime(data_y.index)
data_y = data_y.resample("W-FRI").last().ffill()[maturities]

# set initial calibration period
init_start = '2000-01-01'
init_end   = '2018-03-31'
init_data  = data_y.loc[init_start:init_end].values * 100

print(f"Initial full calibration: {init_start} to {init_end}")

init_minres   = calibrate_model(init_data,
                                 prev_lambda=None,
                                 num_lambda=5)
prev_para     = init_minres[1:23].copy()
prev_lambda   = prev_para[0]


bt_start = pd.to_datetime(init_end) - pd.Timedelta(days=1)
data_bt   = data_y.loc[bt_start:]
dates     = data_bt.index
forecast_horizon = 1

quarterly_freq = 13
annual_freq    = 52

yield_curves = []
calibrated   = []
tstep = 52

for i in range(len(dates) - forecast_horizon):
    train_end = dates[i]
    train_data = data_y.loc[init_start:train_end].values * 100
    test_date  = dates[i + forecast_horizon]

    if i % annual_freq == 0:
        print(f"[{test_date.date()}] Annual full calibration")
        minres     = calibrate_model(train_data,
                                     prev_lambda=prev_lambda,
                                     num_lambda=5)
        para_opt   = minres[1:23].copy()
        prev_lambda = para_opt[0]
    elif i % quarterly_freq == 0:
        print(f"[{test_date.date()}] Quarterly calibration (λ={prev_lambda:.4f})")
        para_opt   = calibrate_params_no_lambda(train_data,
                                                fixed_lambda=prev_lambda)
    else:
        print(f"[{test_date.date()}] Weekly online update")
        para_opt   = online_update_params(prev_para, train_data)

    prev_para = para_opt.copy()
    calibrated.append(para_opt)

    lsc = kalman(para=para_opt,
                 Y=train_data,
                 lik=False,
                 prev=False,
                 ahead=forecast_horizon,
                 grid=grid)
    yields = calculate_yield_curves(para_opt, lsc, maturities2_float)

    for j, mat in enumerate(maturities2_float):
        yield_curves.append({
            'Test_Date':        test_date,
            'Maturity':         mat,
            'Base Yield':       yields["base"][j],
            'Mean Reversion':   yields["mean_reversion"][j],
            'Level Up':         yields["level_up"][j],
            'Level Down':       yields["level_down"][j],
            'Twist Up-to-Down': yields["twist_up"][j],
            'Twist Down-to-Up': yields["twist_down"][j],
        })

pd.DataFrame(yield_curves).to_csv("DNS40_ICS_1-year_risk_yields_v2.1.csv", index=False)
pd.DataFrame(calibrated).to_csv("DNS40_calibrated_params_v2.1.csv", index=False)


Initial full calibration: 2000-01-01 to 2018-03-31
Trying Lambda: 0.28
Lambda 0.28 calibration time: 170.65362882614136
[2018-04-06] Annual full calibration
Trying Lambda: 0.26
Lambda 0.26 calibration time: 209.75116896629333
Trying Lambda: 0.27
Lambda 0.27 calibration time: 152.41616415977478
Trying Lambda: 0.28
Lambda 0.28 calibration time: 172.85258889198303
Trying Lambda: 0.29000000000000004
Lambda 0.29000000000000004 calibration time: 134.2913327217102
Trying Lambda: 0.30000000000000004
Lambda 0.30000000000000004 calibration time: 175.6195011138916
[2018-04-13] Weekly online update
[2018-04-20] Weekly online update
[2018-04-27] Weekly online update
[2018-05-04] Weekly online update
[2018-05-11] Weekly online update
[2018-05-18] Weekly online update
[2018-05-25] Weekly online update
[2018-06-01] Weekly online update
[2018-06-08] Weekly online update
[2018-06-15] Weekly online update
[2018-06-22] Weekly online update
[2018-06-29] Weekly online update
[2018-07-06] Quarterly calibrati