------This Jupyter notebook is authored by J. Antonio Sidaoui, jas2545@columbia.edu------

In [None]:
import statsmodels.api as sm
import numpy as np

def fit_multivariate_regression(x, y):
  """
  Fits multiple linear regression models, one for each column of y on x.

  Parameters:
  - x: Independent variables (n x p matrix)
  - y: Dependent variables (n x q matrix)

  Returns:
  - res: List of fitted model objects
  """

  res = []
  for i in range(y.shape[1]):
    # Add a constant to the independent variables
    #x = sm.add_constant(x)

    # Create and fit the linear regression model for the i-th column of y
    mod = sm.OLS(y[:, i], x)
    res_i = mod.fit()

    # Append the fitted model to the list
    res.append(res_i)
  return res

def predict_multivariate_regression(fitted_eq, x):
  """
  Predicts the entire vector y given a new point x
  from the fitted equations.

  Parameters:
  - fitted_eq: List of fitted model objects
  - x: New data point (1 x p matrix)

  Returns:
  - y_pred: Predicted y vector (1 x q matrix)
  """

  #x = sm.add_constant(x)
  y_pred = np.zeros(len(fitted_eq))
  for i, model in enumerate(fitted_eq):
    y_pred[i] = model.predict(x)[0]
  return y_pred

# Dynamic PCA Historical Backtest

In [None]:
def pca_kalman_filter(x_data, W, F, Q, R_x, z0, P0):
    T, n = x_data.shape
    k = W.shape[1]

    z_est = np.zeros((T, k))
    P_est = np.zeros((T, k, k))

    z_prev = z0
    P_prev = P0

    for t in range(T):
        # Prediction Step
        z_pred = F @ z_prev
        P_pred = F @ P_prev @ F.T + Q

        # Update Step
        y_t = x_data[t]
        K = P_pred @ W.T @ np.linalg.inv(W @ P_pred @ W.T + R_x)
        z_post = z_pred + K @ (y_t - W @ z_pred)
        P_post = P_pred - K @ W @ P_pred

        # Store estimates
        z_est[t] = z_post
        P_est[t] = P_post

        # Update for next iteration
        z_prev = z_post
        P_prev = P_post

    return z_est, P_est

In [None]:
def historical_backtest_dynamic_pca(x, y, scenario_vars, crisis_dates, dynamic_pca_dim, start_row):
    """
    Historical Backtest with Kalman filter re-fitted every refit_interval years (input in months).
    Scenario stress variables are the FED stress test variables.

    Parameters:
    - x: Input data (T x features)
    - y: Response data (T x features)
    - scenario_vars: Indices of stress scenario variables
    - crisis_dates: List of dates indicating crisis periods
    - DMdim: Dimension of the diffusion map
    - start_row: Starting row for the out-of-sample prediction

    Returns:
    - abs_error_pca: Absolute errors for Dynamic PCA predictions
    - pca_val: Dynamic pCA portfolio values
    - true_val: Actual portfolio values
    """

    T = crisis_dates[-1] - start_row  # Predict out-of-sample up to the end of the crisis
    s = crisis_dates[0] - start_row  # Train on all past dates up to the first crisis date
    K = 10000  # Monte Carlo samples
    refit_interval = 50  # refit_interval years of monthly data (in months)

    abs_error_pca = np.zeros(T - s)
    pca_val = np.zeros(T - s)
    true_val = np.zeros(T - s)

    iter = 0
    z_est_full = np.zeros((T, dynamic_pca_dim))  # Store full z_hat for all periods

    for refit_start in range(0, T, refit_interval):
        refit_end = min(refit_start + refit_interval + s, T)

        # ===== Re-fit the Kalman filter every refit_interval years =====

        # PCA
        scaler=preprocessing.StandardScaler(with_std=False).fit(x[refit_start:refit_end])
        factors_centered=scaler.transform(x[refit_start:refit_end])
        pca = PCA(n_components=dynamic_pca_dim)
        z_data = pca.fit_transform(factors_centered)  # PC embeddings
        W_pca = pca.components_.T  # PCA loading matrix
        # Fit a VAR(1) model to the PC embeddings
        model = VAR(z_data)
        results = model.fit(1)
        F_pca = results.coefs[0]  # State transition matrix (k x k)
        Q_pca = np.cov(results.resid.T)  # Process noise covariance (k x k)
        # Initialize parameters
        R_x_pca = np.diag(np.var(x[refit_start:refit_end], axis=0))
        z0 = np.zeros(W_pca.shape[1])  # Initial state
        P0_pca = np.eye(W_pca.shape[1])  # Initial state covariance
        # Run the Kalman filter
        z_est, P_est = pca_kalman_filter(x[refit_start:refit_end], W_pca, F_pca, Q_pca, R_x_pca, z0, P0_pca)

        z_est_full[refit_start:refit_end] = z_est

        # ===== Backtest for the next refit_interval years =====

        for t in range(max(s, refit_start + s), refit_end):
            # PCA
            z_est_t=z_est_full[t-s:t] # Use precomputed latent state estimates
            fitted_eq=fit_multivariate_regression(x[t-s:t], y[t-s:t]) # Fit multivariate regression
            portfolio_values_pca=np.zeros(K)

            for k in range(K):
              z_est_t1=conditional_sample_H(F_pca, Q_pca, W_pca, z_est_t[-1], scenario_vars, np.array([x[t-s:t+1][-1][i] for i in scenario_vars])) # Sample z_t+1|scenario
              x_t1_pca=W_pca @ z_est_t1 # lift predicted PC embedding vector to original factor space
              y_t1_pca=predict_multivariate_regression(fitted_eq, x_t1_pca) # predict returns given the estimated factor
              weights=1/y_t1_pca.shape[0]*np.ones(y_t1_pca.shape[0]) # construct portfolio with 1/N rule
              portfolio_values_pca[k]=np.dot(weights, y_t1_pca) # find portfolio value

            val_pca=np.mean(portfolio_values_pca) # average over Monte Carlo samples
            pca_val[iter]=val_pca

            # Actual value
            y_t1_actual = y[t-s:t+1][-1]
            weights = 1 / y_t1_actual.shape[0] * np.ones(y_t1_actual.shape[0])
            val_actual = np.dot(weights, y_t1_actual)
            true_val[iter] = val_actual

            # Absolute error Dynamic PCA
            abs_error_pca[iter] = np.abs(val_actual - val_pca)

            print(f"Iteration {iter}/{T-s}")
            iter += 1

    return abs_error_pca, pca_val, true_val

# SSA Historical Backtest

In [None]:
def historical_backtest_ssa(x, y, scenario_vars, crisis_dates, start_row):
    """
    Historical Backtest with Kalman filter re-fitted every 3 years (36 months).
    Scenario stress variables are the FED stress test variables.

    Parameters:
    - x: Input data (T x features)
    - y: Response data (T x features)
    - scenario_vars: Indices of stress scenario variables
    - crisis_dates: List of dates indicating crisis periods
    - DMdim: Dimension of the diffusion map
    - start_row: Starting row for the out-of-sample prediction

    Returns:
    - abs_error_dmk: Absolute errors for SSA predictions
    - ssa_val: SSA portfolio values
    - true_val: Actual portfolio values
    """

    T = crisis_dates[-1] - start_row # Predict out-of-sample up to the end of the crisis
    s = crisis_dates[0] - start_row  # Train on all past dates up to the first crisis date

    abs_error_ssa = np.zeros(T - s)
    ssa_val = np.zeros(T - s)
    true_val = np.zeros(T - s)
    iter=0

    for t in range(s, T):

      # SSA
      x_t1_ssa=x[t-s:t][-1] # set the unstressed factors equal to previous value (change is set to zero)
      for i in scenario_vars:
        x_t1_ssa[i]=x[t-s:t+1][-1][i] # set to actual scenario in t+1

      fitted_eq=fit_multivariate_regression(x[t-s:t], y[t-s:t])

      y_t1_ssa=predict_multivariate_regression(fitted_eq, x_t1_ssa) # predict returns given the x_t1
      weights=1/y_t1_ssa.shape[0]*np.ones(y_t1_ssa.shape[0]) # construct portfolio with 1/N rule (add code for value weighted as well)
      val_ssa=np.dot(weights, y_t1_ssa)
      ssa_val[iter]=val_ssa

      # Actual value
      y_t1_actual = y[t-s:t+1][-1]
      weights = 1 / y_t1_actual.shape[0] * np.ones(y_t1_actual.shape[0])
      val_actual = np.dot(weights, y_t1_actual)
      true_val[iter] = val_actual

      # Absolute error SSA
      abs_error_ssa[iter] = np.abs(val_actual - val_ssa)

      print(f"Iteration {iter}/{T-s}")
      iter += 1

    return abs_error_ssa, ssa_val, true_val

# Static PCA Historical Backtest

In [None]:
def historical_backtest_with_static_pca(x, y, scenario_vars, PCDims, crisis_dates, start_row):
    """
    Perform historical backtest with dynamic PCA and static PCA projections.

    Parameters:
    - x: Factor matrix (T x N)
    - y: Return matrix (T x M)
    - scenario_vars: Indices of stressed scenario variables
    - crisis_dates: List of dates indicating crisis periods
    - DMdim: Dimensionality for diffusion maps
    - PCDims: List of PCA dimensions to evaluate (e.g., [5, 10, 30, 50, "all"])

    Returns:
    - Dictionary containing results for static PCA with different dimensions
    """

    T = crisis_dates[-1] - start_row  # Predict out-of-sample up to the end of the crisis
    s = crisis_dates[0] - start_row  # Train on all past dates up to the first crisis date
    K = 10000  # Monte Carlo samples
    abs_error_static_pca_results = {}
    static_pca_val_results = {}

    iter = 0
    for t in range(s, T):
        # Scale the data (center only)
        x_diff = np.diff(x[t-s:t], axis=0)  # Shape: (s-1, N)
        scaler = preprocessing.StandardScaler(with_std=False).fit(x_diff)#x[t-s:t])
        factors_centered = scaler.transform(x_diff)#x[t-s:t])

        # Fit multivariate regression
        fitted_eq = fit_multivariate_regression(x[t-s:t], y[t-s:t])

        # Static PCA for each specified dimensionality
        for pca_dim in PCDims:
            # Adjust PCA dimensions
            if pca_dim == "all":
              pca_dim = x.shape[1]  # Use all components

            # Perform PCA
            pca = PCA(n_components=pca_dim)
            pca.fit(factors_centered)
            W = pca.components_.T  # PCA loading matrix

            x_t1_ssa=x[t-s:t][-1] # set the unstressed factors equal to previous value (change is set to zero)
            for i in scenario_vars:
              x_t1_ssa[i]=x[t-s:t+1][-1][i] # set to actual scenario in t+1

            # Static PCA stress test
            delta_x=x_t1_ssa-x[t-s:t][-1] # change/perturbation vector
            delta_x_centered = scaler.transform(delta_x.reshape(1, -1)).flatten()  # Center perturbation vector
            proj_delta = W @ (W.T @ delta_x_centered)  # Project stress onto PCA space
            x_t1_static_pca = x[t-s:t][-1] + proj_delta + scaler.mean_  # Apply perturbation
            y_t1_static_pca = predict_multivariate_regression(fitted_eq, x_t1_static_pca)  # Predict returns

            # Construct portfolio using equal weights
            weights = 1 / y_t1_static_pca.shape[0] * np.ones(y_t1_static_pca.shape[0])
            val_static_pca = np.dot(weights, y_t1_static_pca)

            # Save results
            if pca_dim not in static_pca_val_results:
                static_pca_val_results[pca_dim] = np.zeros(T - s)
                abs_error_static_pca_results[pca_dim] = np.zeros(T - s)

            static_pca_val_results[pca_dim][iter] = val_static_pca

        # Actual returns
        y_t1_actual = y[t-s:t+1][-1]
        weights_actual = 1 / y_t1_actual.shape[0] * np.ones(y_t1_actual.shape[0])
        val_actual = np.dot(weights_actual, y_t1_actual)

        # Absolute error for static PCA
        for pca_dim in PCDims:
            dim = pca_dim if pca_dim != "all" else x.shape[1]
            abs_error_static_pca_results[dim][iter] = np.abs(val_actual - static_pca_val_results[dim][iter])

        print(iter)
        iter += 1

    return abs_error_static_pca_results, static_pca_val_results

# JDKF Historical Backtest

In [None]:
def historical_backtest_jdkf(x, y, scenario_vars, crisis_dates, DMdim, start_row):
    """
    Historical Backtest with Kalman filter re-fitted every refit_interval years (in months).
    Scenario stress variables are the FED stress test variables.

    Parameters:
    - x: Input data (T x features)
    - y: Response data (T x features)
    - scenario_vars: Indices of stress scenario variables
    - crisis_dates: List of dates indicating crisis periods
    - DMdim: Dimension of the diffusion map
    - start_row: Starting row for the out-of-sample prediction

    Returns:
    - abs_error_dmk: Absolute errors for JDKF predictions
    - dmk_val: JDKF portfolio values
    - true_val: Actual portfolio values
    """

    T = crisis_dates[-1] - start_row # Predict out-of-sample up to the end of the crisis
    s = crisis_dates[0] - start_row  # Train on all past dates up to the first crisis date
    K = 10000  # Monte Carlo samples
    refit_interval = 50  # refit_interval years of monthly data (in months)

    psi_dim = DMdim
    abs_error_dmk = np.zeros(T - s)
    dmk_val = np.zeros(T - s)
    true_val = np.zeros(T - s)

    iter = 0
    psi_hat_full = np.zeros((T, psi_dim))  # Store full psi_hat for all periods

    for refit_start in range(0, T, refit_interval):
        refit_end = min(refit_start + refit_interval + s, T)

        # ===== Re-fit the Kalman filter every refit_interval years =====
        scaler = preprocessing.StandardScaler(with_std=False).fit(x[refit_start:refit_end])
        factors_centered = scaler.transform(x[refit_start:refit_end])
        D = compute_distances(factors_centered)
        W = compute_affinity_matrix(D, 'gaussian', sigma=np.median(D))
        diff_vec, diff_eig = diff_map_info(W)

        lambda_k = -(2 / (np.median(D) ** 2)) * np.log(diff_eig[:psi_dim])
        F = np.eye(psi_dim) + np.diag(-lambda_k)
        H_x = x[refit_start:refit_end].T @ diff_vec[:, :psi_dim]
        Q = np.diag(np.var(diff_vec[:, :psi_dim] @ np.diag(diff_eig[:psi_dim]), axis=0))
        R_x = np.diag(np.var(x[refit_start:refit_end], axis=0))
        R_xy=((x[refit_start:refit_end]-x[refit_start:refit_end].mean(axis=0)).T @ (y[refit_start:refit_end]-y[refit_start:refit_end].mean(axis=0)))/y[refit_start:refit_end].shape[0]
        x0 = diff_vec[:, :psi_dim][0]
        P0 = np.eye(psi_dim)

        # Run the Kalman filter for the refit_inverval-year window
        A_opt, R_z_opt, R_y_opt, R_yz_opt, x_smooth, psi_hat, log_likelihoods = em_kalman_filter(x[refit_start:refit_end], y[refit_start:refit_end], F, H_x, Q, x0, P0)

        # Store psi_hat in the full array
        psi_hat_full[refit_start:refit_end] = psi_hat

        # ===== Backtest for the next refit_interval years =====
        for t in range(max(s, refit_start + s), refit_end):
            psi_hat_t = psi_hat_full[t-s:t]  # Use precomputed latent state estimates

            portfolio_values_dmk = np.zeros(K)

            for k in range(K):
                # Sample psi_t+1|scenario using the last psi_hat from the training window
                psi_t1 = conditional_sample_H(F, Q, H_x, psi_hat_t[-1], scenario_vars, np.array([x[t-s:t+1][-1][i] for i in scenario_vars]))

                # Predict returns given the estimated factor vector
                y_t1 = A_opt @ H_x @ psi_t1

                # Construct portfolio with 1/N rule
                weights = 1 / y_t1.shape[0] * np.ones(y_t1.shape[0])
                portfolio_values_dmk[k] = np.dot(weights, y_t1)

            val_dmk = np.mean(portfolio_values_dmk)  # Average over Monte Carlo samples
            dmk_val[iter] = val_dmk

            # Actual value
            y_t1_actual = y[t-s:t+1][-1]
            weights = 1 / y_t1_actual.shape[0] * np.ones(y_t1_actual.shape[0])
            val_actual = np.dot(weights, y_t1_actual)
            true_val[iter] = val_actual

            # Absolute error DMK
            abs_error_dmk[iter] = np.abs(val_actual - val_dmk)

            print(f"Iteration {iter}/{T-s}")
            iter += 1

    return abs_error_dmk, dmk_val, true_val