In [35]:
import pandas as pd
import numpy as np

In [36]:
data = pd.read_csv('/Users/vittoriomanfriani/Desktop/BSIC/Backtesting series - Part 3/CMT 2015-2024')

In [37]:
data.head()

Unnamed: 0,timestamp,US_CMT_10Y,US_CMT_2Y,US_CMT_30Y,US_CMT_5Y,US_CMT_7Y
0,2015-01-01,2.172,0.666,2.752,1.654,1.972
1,2015-01-02,2.111,0.667,2.688,1.608,1.915
2,2015-01-05,2.033,0.659,2.599,1.565,1.846
3,2015-01-06,1.941,0.627,2.503,1.479,1.758
4,2015-01-07,1.969,0.611,2.529,1.479,1.775


In [38]:
# Rename columns to match maturities in years
data.columns = [col.split('_')[-1].replace("Y", "") for col in data.columns]

# Define maturities
maturities = np.array(data.columns[1:], dtype = float)

In [40]:
# Apply Nelson-Siegel Model
def nelson_siegel(params, maturities):
    beta0, beta1, beta2, lambd = params
    t = maturities
    return beta0 + beta1 * (1 - np.exp(-lambd * t)) / (lambd * t) + beta2 * ((1 - np.exp(-lambd * t)) / (lambd * t) - np.exp(-lambd * t))


In [41]:
# Error function to minimize to find optimal params
def error_function(params, maturities, data):
    data_hat = nelson_siegel(params, maturities)
    return np.sum((data - data_hat) ** 2)

In [120]:
from scipy.optimize import minimize

# Initial parameters for Nelson-Siegel model
initial_params = [0.03, -0.01, 0.01, 0.1]

# # Store results in a DataFrame
fitted_results = []

for _, row in data.iterrows():
    date = row['timestamp']
    yields = row[1:].values 
    
    # minimize error function
    result = minimize(
        error_function,
        initial_params,
        args=(maturities, yields),
        method="L-BFGS-B",
        bounds=[(0, 10), (-10, 10), (-10, 10), (0.01, 1)],
    )
    
    # find fitted params and compute residuals
    fitted_params = result.x
    residuals = yields - nelson_siegel(fitted_params, maturities)
    
    # store results in a dictionary
    fitted_results.append({
        "Date": date,
        "Beta0 (Level)": fitted_params[0],
        "Beta1 (Slope)": fitted_params[1],
        "Beta2 (Curvature)": fitted_params[2],
        "Lambda": fitted_params[3],
        "Residuals": residuals.tolist()
    })

# convert results to a DataSet
fitted_results_df = pd.DataFrame(fitted_results)


In [121]:
fitted_results_df

Unnamed: 0,Date,Beta0 (Level),Beta1 (Slope),Beta2 (Curvature),Lambda,Residuals
0,2015-01-01,2.972459,-3.599186,-0.000436,0.487436,"[-0.06762248871993926, -0.007142844066070131, ..."
1,2015-01-02,2.909103,-3.432089,0.000164,0.465031,"[-0.06715627810926295, -0.007853430214975199, ..."
2,2015-01-05,2.810050,-3.270550,0.000448,0.458035,"[-0.07042239463105826, -0.009361082757208261, ..."
3,2015-01-06,2.719071,-3.109015,-0.000006,0.431486,"[-0.0671657823334979, -0.009381428750120269, 0..."
4,2015-01-07,2.751933,-3.199886,0.000028,0.437066,"[-0.060066818322533466, -0.007605156763876297,..."
...,...,...,...,...,...,...
2516,2024-08-26,4.259525,0.734367,-3.010710,0.491643,"[-0.004964065860943556, 0.00025702320184306515..."
2517,2024-08-27,4.264242,0.775733,-3.146151,0.521919,"[-0.006553001917954937, 0.0002889991734384978,..."
2518,2024-08-28,4.278190,0.506771,-2.787718,0.494254,"[-0.0038850982911320386, 0.0001965312268779051..."
2519,2024-08-29,4.356656,-0.068016,-2.024356,0.379519,"[-0.0012320604113513767, 0.05790459701858719, ..."


In [122]:
# get dataset of bond prices
prices_df = pd.DataFrame(index = data.timestamp)
for i in data.columns:
    if i == ('timestamp'):
        continue
    prices_df[i] = np.array(100/(1 + data[i]/100)**float(i))

In [123]:
# clean the dataset of factors
factors_df = pd.DataFrame(index = fitted_results_df.Date)
factors_df['Beta0 (Level)'] = np.array(fitted_results_df['Beta0 (Level)']/100)
factors_df['Beta1 (Slope)'] = np.array(fitted_results_df['Beta1 (Slope)']/100)
factors_df['Beta2 (Curvature)'] = np.array(fitted_results_df['Beta2 (Curvature)']/100)

# get returns dataset
returns_df = prices_df.pct_change().dropna()

# align the index of two dataset
factors_df = factors_df[1:]

In [124]:
import statsmodels.api as sm

# regress returns over the factors
def rolling_regression(returns_df, factors_df, window_size=252):
    # Initialize a dictionary to store rolling loadings
    loadings = {col: [] for col in ['const'] + list(factors_df.columns)}
    
    # List to store dates corresponding to each regression
    dates = []
    
    # Iterate over each asset
    for col in returns_df.columns:
        y = returns_df[col]  # Dependent variable
        
        # Iterate over the data using a rolling window approach
        for i in range(window_size, len(returns_df)):  # Adjusted range
            # Select the current rolling window of data
            y_window = y.iloc[i - window_size:i]  
            X_window = factors_df.iloc[i - window_size:i]
            X_window = sm.add_constant(X_window)  # Add constant for intercept
            
            # Perform regression
            model = sm.OLS(y_window, X_window).fit() 
            
            # Use the end date of the current rolling window
            if col == returns_df.columns[0]:  # Append dates only once
                dates.append(returns_df.index[i])
            
            # Store coefficients for each loading
            for factor, loading in model.params.items():
                loadings[factor].append(loading)

    # Create DataFrames for each loading
    loading_datasets = {}
    for factor, loading in loadings.items():
        loading_datasets[factor] = pd.DataFrame(
            np.array(loading).reshape(len(dates), len(returns_df.columns)),
            index=dates,
            columns=returns_df.columns
        )

    return loading_datasets

In [125]:
loading_datasets = rolling_regression(returns_df, factors_df)