# Markov Switching Dynamic Nelson-Siegel Model
### Implementation based on Hevia et al. (2012)
### Estimating and Forecasting the Yield Curve Using a Markov Switching Dynamic Nelson and Siegel Model

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import minimize, dual_annealing
from scipy.stats import multivariate_normal
from scipy.linalg import cholesky, solve
import warnings
warnings.filterwarnings('ignore')


# Markov Switching Dynamic Nelson-Siegel Model

## Core Equations

**Observation Equation:**
$$y_t = \Lambda_{x_t} f_t + \varepsilon_{x_t,t}$$

**State Equation:**
$$f_t = \mu_{x_t} + A_{x_t} f_{t-1} + \eta_{x_t,t}$$


## Expanded Nelson-Siegel Form

**Observation Equation in Matrix Form:**

$$\begin{bmatrix} R_t(\tau_1) \\ R_t(\tau_2) \\ \vdots \\ R_t(\tau_m) \end{bmatrix} = \begin{bmatrix} 1 & L_1^{(j)} & C_1^{(j)} \\ 1 & L_2^{(j)} & C_2^{(j)} \\ \vdots & \vdots & \vdots \\ 1 & L_m^{(j)} & C_m^{(j)} \end{bmatrix} \begin{bmatrix} \beta_{1t} \\ \beta_{2t} \\ \beta_{3t} \end{bmatrix} + \begin{bmatrix} \varepsilon_{jt}^1 \\ \varepsilon_{jt}^2 \\ \vdots \\ \varepsilon_{jt}^m \end{bmatrix}$$

Where:
- $L_i^{(j)} = \frac{1-e^{-\lambda_j \tau_i}}{\lambda_j \tau_i}$ (Slope loading)
- $C_i^{(j)} = \frac{1-e^{-\lambda_j \tau_i}}{\lambda_j \tau_i} - e^{-\lambda_j \tau_i}$ (Curvature loading)


## State Equation in Matrix Form

$$\begin{bmatrix} \beta_{1t} \\ \beta_{2t} \\ \beta_{3t} \end{bmatrix} = \begin{bmatrix} \mu_j^1 \\ \mu_j^2 \\ \mu_j^3 \end{bmatrix} + \begin{bmatrix} A_j^{11} & A_j^{12} & A_j^{13} \\ A_j^{21} & A_j^{22} & A_j^{23} \\ A_j^{31} & A_j^{32} & A_j^{33} \end{bmatrix} \begin{bmatrix} \beta_{1,t-1} \\ \beta_{2,t-1} \\ \beta_{3,t-1} \end{bmatrix} + \begin{bmatrix} \eta_{jt}^1 \\ \eta_{jt}^2 \\ \eta_{jt}^3 \end{bmatrix}$$

## Factor Interpretation

- $\beta_{1t}$: **Level factor** (long-term yield)
- $\beta_{2t}$: **Slope factor** (short-long spread) 
- $\beta_{3t}$: **Curvature factor** (medium-term hump)


#### Key Methods

- **nelson_siegel_loadings:** Computes the Nelson-Siegel factor loading matrix for any regime.
- **_unpack_parameters:** Converts the parameter vector into model components (λ, μ, A, H, Q, transition probabilities).
- **approximate_filter:** Implements the Section 4.1 approximate filtering algorithm, collapsing regime mixtures to a single Gaussian at each step.
- **_initialize_parameters:** Generates smart starting values via OLS and randomization.
- **_get_parameter_bounds:** Sets economically and numerically justified parameter bounds.
- **fit:** Multi-stage optimization (Simulated Annealing → Nelder-Mead → BFGS) for robust parameter estimation.
- **forecast:** Monte Carlo simulation for regime-uncertain forecasts.
- **summary:** Prints a comprehensive summary of estimated parameters and regime statistics.

---

## Example Usage



In [2]:
class MSDNS:
    """
    The model extends the three-factor Dynamic Nelson-Siegel model with a fourth latent variable
    that evolves according to a two-state Markov chain, capturing business cycle conditions.
    """
    
    def __init__(self, n_regimes=2):
        """
        Initialize the MSDNS model
        
        Parameters:
        -----------
        n_regimes : int, default=2
            Number of regimes (K+1 in the paper notation)
        """
        self.K = n_regimes - 1  # Following paper notation
        self.is_fitted = False
        
        # Model parameters
        self.lambda_params = None      # Decay parameters λ_j
        self.mu_params = None          # Drift parameters μ_j
        self.A_params = None           # VAR matrices A_j
        self.H_params = None           # State covariance H_j
        self.Q_params = None           # Observation covariance Q_j
        self.transition_probs = None   # Transition matrix P
        
        # Filtering results
        self.filtered_states = None
        self.filtered_probs = None
        self.filtered_covariances = None
        self.log_likelihood = None
        
    def nelson_siegel_loadings(self, maturities, lambda_param):
        """
        Compute Nelson-Siegel factor loadings matrix Λ_j
        
        From equation (6) in the paper:
        Loading matrix for regime j with decay parameter λ_j
        """
        tau = np.array(maturities)
        m = len(tau)
        Lambda = np.zeros((m, 3))
        
        # Level factor (β₁): always 1
        Lambda[:, 0] = 1.0
        
        # Slope factor (β₂): (1-e^(-λτ))/(λτ)
        Lambda[:, 1] = (1 - np.exp(-lambda_param * tau)) / (lambda_param * tau)
        
        # Curvature factor (β₃): (1-e^(-λτ))/(λτ) - e^(-λτ)
        Lambda[:, 2] = ((1 - np.exp(-lambda_param * tau)) / (lambda_param * tau)) - np.exp(-lambda_param * tau)
        
        return Lambda
    
    def _initialize_parameters(self, yields, maturities):
        """Initialize parameters using reasonable starting values"""
        T, m = yields.shape
        n_regimes = self.K + 1
        
        # Lambda parameters
        lambda_init = np.array([0.06, 0.12]) if n_regimes == 2 else np.linspace(0.05, 0.15, n_regimes)
        
        # Mu parameters - estimate from simple OLS
        mu_init = np.zeros((n_regimes, 3))
        lambda_mid = np.mean(lambda_init)
        Lambda_mid = self.nelson_siegel_loadings(maturities, lambda_mid)
        
        try:
            factors_ols = np.linalg.lstsq(Lambda_mid, yields.T, rcond=None)[0].T
            for j in range(n_regimes):
                mu_init[j] = np.mean(factors_ols, axis=0)
                mu_init[j] += np.random.normal(0, 0.1, 3)
        except:
            mu_init = np.random.normal(0, 1, (n_regimes, 3))
        
        # A parameters - near unit root
        A_init = np.zeros((n_regimes, 3, 3))
        for j in range(n_regimes):
            A_init[j] = np.eye(3) * 0.95
            A_init[j] += np.random.normal(0, 0.05, (3, 3)) * (1 - np.eye(3))
        
        # H and Q parameters
        # Q: Observation error covariance 
        # H: State change error covariance 
        H_diag_init = np.ones((n_regimes, 3)) * 0.1 # initial variance for the latent variables 
        Q_diag_init = np.ones(n_regimes) * 0.01 # same across maturities 
        # off-diagonal covariances initially set to 0 
        #Assumption : No correlation between H and Q
        
        # Transition probabilities
        trans_logits_init = np.zeros((n_regimes, n_regimes - 1))
        for i in range(n_regimes):
            trans_logits_init[i] = np.random.normal(-1, 0.1, n_regimes - 1) #logits
        
        return np.concatenate([
            lambda_init,
            mu_init.flatten(), 
            A_init.flatten(),
            H_diag_init.flatten(),
            Q_diag_init,
            trans_logits_init.flatten()
        ])
    
    def _get_parameter_bounds(self, n_params):
        """Define parameter bounds for optimization"""
        n_regimes = self.K + 1
        bounds = []
        
        # Lambda bounds
        bounds.extend([(0.01, 0.5)] * n_regimes)
        # Mu bounds
        bounds.extend([(-5, 5)] * (3 * n_regimes))
        # A bounds
        bounds.extend([(-1, 1)] * (9 * n_regimes))
        # H bounds
        bounds.extend([(0.001, 5)] * (3 * n_regimes))
        # Q bounds
        bounds.extend([(0.001, 1)] * n_regimes)
        # Transition logit bounds
        bounds.extend([(-5, 5)] * (n_regimes * (n_regimes - 1)))
        
        return bounds
    def _unpack_parameters(self, params):
        """Unpack parameter vector into model components"""
        n_regimes = self.K + 1
        idx = 0
        
        # Lambda parameters (decay)
        lambda_params = params[idx:idx + n_regimes]
        idx += n_regimes
        
        # Mu parameters (drift)
        mu_params = params[idx:idx + 3 * n_regimes].reshape(n_regimes, 3)
        idx += 3 * n_regimes
        
        # A parameters (VAR matrix)
        A_params = params[idx:idx + 9 * n_regimes].reshape(n_regimes, 3, 3)
        idx += 9 * n_regimes
        
        # H parameters (state covariance)
        H_diag = params[idx:idx + 3 * n_regimes].reshape(n_regimes, 3)
        H_params = np.array([np.diag(h) for h in H_diag])
        idx += 3 * n_regimes
        
        # Q parameters (observation covariance)
        Q_diag = params[idx:idx + n_regimes]
        Q_params = np.array([np.eye(len(self.maturities)) * q for q in Q_diag])
        idx += n_regimes
        
        # Transition probabilities
        trans_logits = params[idx:idx + n_regimes * (n_regimes - 1)].reshape(n_regimes, n_regimes - 1)
        transition_probs = np.zeros((n_regimes, n_regimes))
        for i in range(n_regimes):
            exp_logits = np.exp(trans_logits[i])
            transition_probs[i, :-1] = exp_logits / (1 + np.sum(exp_logits))
            transition_probs[i, -1] = 1 / (1 + np.sum(exp_logits))
        
        return {
            'lambda_params': lambda_params,
            'mu_params': mu_params,
            'A_params': A_params,
            'H_params': H_params,
            'Q_params': Q_params,
            'transition_probs': transition_probs
        }
    
    def approximate_filter(self, yields, maturities, params):
        """
        Implement the approximate filtering algorithm from Section 4.1
        
        Key innovation: collapse growing mixture distributions to single Gaussian
        """
        param_dict = self._unpack_parameters(params)
        self.maturities = maturities
        
        T = yields.shape[0]
        m = yields.shape[1]
        n_regimes = self.K + 1
        
        # Initialize storage
        log_likelihood = 0.0
        filtered_states = np.zeros((T, 3))
        filtered_covariances = np.zeros((T, 3, 3))
        regime_probs = np.zeros((T, n_regimes))
        
        # Initialize with steady-state values
        beta_init = np.mean(param_dict['mu_params'], axis=0)
        V_init = np.eye(3) * 0.1
        regime_prob_init = np.ones(n_regimes) / n_regimes
        
        filtered_states[0] = beta_init
        filtered_covariances[0] = V_init
        regime_probs[0] = regime_prob_init
        
        for t in range(1, T):
            # Prediction step for each regime
            pred_states = np.zeros((n_regimes, 3))
            pred_covariances = np.zeros((n_regimes, 3, 3))
            pred_regime_probs = np.zeros(n_regimes)
            
            # Compute regime prediction probabilities
            for j in range(n_regimes):
                pred_regime_probs[j] = np.sum(
                    param_dict['transition_probs'][:, j] * regime_probs[t-1]
                )
            
            # Predict states for each regime
            for j in range(n_regimes):
                pred_states[j] = (param_dict['mu_params'][j] + 
                                param_dict['A_params'][j] @ filtered_states[t-1])
                pred_covariances[j] = (param_dict['A_params'][j] @ filtered_covariances[t-1] @ 
                                     param_dict['A_params'][j].T + param_dict['H_params'][j])
            
            # Update step
            likelihood_contributions = np.zeros(n_regimes)
            updated_states = np.zeros((n_regimes, 3))
            updated_covariances = np.zeros((n_regimes, 3, 3))
            
            for j in range(n_regimes):
                # Factor loadings for regime j
                Lambda_j = self.nelson_siegel_loadings(maturities, param_dict['lambda_params'][j])
                
                # Predicted observation
                y_pred = Lambda_j @ pred_states[j]
                
                # Innovation covariance
                S_j = Lambda_j @ pred_covariances[j] @ Lambda_j.T + param_dict['Q_params'][j]
                
                # Innovation
                innovation = yields[t][1::] - y_pred
                
                # Likelihood contribution
                try:
                    likelihood_contributions[j] = multivariate_normal.pdf(
                        innovation, mean=np.zeros(m), cov=S_j
                    )
                except:
                    likelihood_contributions[j] = 1e-10
                
                # Kalman update
                try:
                    K_j = pred_covariances[j] @ Lambda_j.T @ np.linalg.inv(S_j)
                except:
                    K_j = np.zeros((3, m))
                
                updated_states[j] = pred_states[j] + K_j @ innovation
                updated_covariances[j] = pred_covariances[j] - K_j @ Lambda_j @ pred_covariances[j]
            
            # Update regime probabilities
            unnormalized_probs = pred_regime_probs * likelihood_contributions
            total_likelihood = np.sum(unnormalized_probs)
            
            if total_likelihood > 0:
                regime_probs[t] = unnormalized_probs / total_likelihood
                log_likelihood += np.log(total_likelihood)
            else:
                regime_probs[t] = regime_probs[t-1]
                log_likelihood += -1e10
            
            # Collapse mixture to single Gaussian (key approximation)
            filtered_states[t] = np.sum(regime_probs[t][:, None] * updated_states, axis=0)
            
            filtered_covariances[t] = np.zeros((3, 3))
            for j in range(n_regimes):
                diff = updated_states[j] - filtered_states[t]
                filtered_covariances[t] += regime_probs[t][j] * (
                    updated_covariances[j] + np.outer(diff, diff)
                )
        
        return log_likelihood, filtered_states, filtered_covariances, regime_probs
    
    def negative_log_likelihood(self, params, yields, maturities):
        """Objective function for optimization"""
        try:
            log_likelihood, _, _, _ = self.approximate_filter(yields, maturities, params)
            return -log_likelihood
        except:
            return 1e10
    

    
    def fit(self, yields, maturities, optimization_method='multi_stage', max_iterations=1000):
        """
        Fit the MSDNS model using multi-stage optimization approach from the paper
        """
        yields = np.array(yields)
        maturities = np.array(maturities)
        self.maturities = maturities
        
        print(f"Fitting MSDNS model: {yields.shape[0]} periods, {yields.shape[1]} maturities")
        
        params_init = self._initialize_parameters(yields, maturities)
        n_params = len(params_init)
        bounds = self._get_parameter_bounds(n_params)
        
        if optimization_method == 'multi_stage':
            # Stage 1: Simulated annealing
            print("Stage 1: Global optimization...")
            result_annealing = dual_annealing(
                self.negative_log_likelihood,
                bounds=bounds,
                args=(yields, maturities),
                maxiter=max_iterations // 3,
                seed=42
            )
            
            # Stage 2: Nelder-Mead
            print("Stage 2: Local refinement...")
            result_nelder = minimize(
                self.negative_log_likelihood,
                x0=result_annealing.x,
                args=(yields, maturities),
                method='Nelder-Mead',
                options={'maxiter': max_iterations // 3}
            )
            
            # Stage 3: BFGS
            print("Stage 3: Final convergence...")
            result_final = minimize(
                self.negative_log_likelihood,
                x0=result_nelder.x,
                args=(yields, maturities),
                method='BFGS',
                options={'maxiter': max_iterations // 3}
            )
            
            best_params = result_final.x
            
        else:  # Single stage
            result_final = minimize(
                self.negative_log_likelihood,
                x0=params_init,
                args=(yields, maturities),
                method='BFGS',
                options={'maxiter': max_iterations}
            )
            best_params = result_final.x
        
        # Store parameters
        param_dict = self._unpack_parameters(best_params)
        self.lambda_params = param_dict['lambda_params']
        self.mu_params = param_dict['mu_params']
        self.A_params = param_dict['A_params']
        self.H_params = param_dict['H_params']
        self.Q_params = param_dict['Q_params']
        self.transition_probs = param_dict['transition_probs']
        
        # Final filter
        self.log_likelihood, self.filtered_states, self.filtered_covariances, self.filtered_probs = \
            self.approximate_filter(yields, maturities, best_params)
        
        self.is_fitted = True
        
        print(f"✓ Fitting completed! Log-likelihood: {self.log_likelihood:.6f}")
        print(f"AIC: {2 * n_params - 2 * self.log_likelihood:.6f}")
        print(f"BIC: {n_params * np.log(yields.shape[0]) - 2 * self.log_likelihood:.6f}")
    
    def forecast(self, forecast_horizon=12, n_simulations=1000):
        """
        Generate forecasts with regime uncertainty
        Implementation follows Section 4.2 of the paper
        """
        if not self.is_fitted:
            raise ValueError("Model must be fitted before forecasting")
        
        print(f"Generating {forecast_horizon}-period forecasts...")
        
        T = len(self.filtered_states)
        n_regimes = self.K + 1
        m = len(self.maturities)
        
        current_state = self.filtered_states[-1]
        current_covariance = self.filtered_covariances[-1]
        current_regime_probs = self.filtered_probs[-1]
        
        # Storage
        forecast_yields = np.zeros((forecast_horizon, m, n_simulations))
        forecast_states = np.zeros((forecast_horizon,n_simulations,3))
        regime_sequences = np.zeros((forecast_horizon, n_simulations), dtype=int)
        
        # Monte Carlo simulation
        for sim in range(n_simulations):
            state = np.random.multivariate_normal(current_state, current_covariance)
            regime_prob = current_regime_probs.copy()
            
            for h in range(forecast_horizon):
                # Sample regime
                regime = np.random.choice(n_regimes, p=regime_prob)
                regime_sequences[h, sim] = regime
                
                # Evolve state
                state_mean = self.mu_params[regime] + self.A_params[regime] @ state
                state = np.random.multivariate_normal(state_mean, self.H_params[regime])
                forecast_states[h, sim] = state
                
                # Generate yields
                Lambda = self.nelson_siegel_loadings(self.maturities, self.lambda_params[regime])
                yield_mean = Lambda @ state
                yields = np.random.multivariate_normal(yield_mean, self.Q_params[regime])
                forecast_yields[h, :, sim] = yields
                
                # Update regime probabilities
                regime_prob = self.transition_probs[regime]
        
        # Compute statistics
        forecast_mean = np.mean(forecast_yields, axis=2)
        forecast_std = np.std(forecast_yields, axis=2)
        forecast_percentiles = np.percentile(forecast_yields, [5, 25, 75, 95], axis=2)
        
        regime_probs_forecast = np.zeros((forecast_horizon, n_regimes))
        for h in range(forecast_horizon):
            for j in range(n_regimes):
                regime_probs_forecast[h, j] = np.mean(regime_sequences[h] == j)
        
        return {
            'forecast_mean': forecast_mean,
            'forecast_std': forecast_std,
            'forecast_percentiles': forecast_percentiles,
            'regime_probs_forecast': regime_probs_forecast,
            'raw_forecasts': forecast_yields
        }
    
    def summary(self):
        """Print comprehensive model summary"""
        if not self.is_fitted:
            print("Model has not been fitted.")
            return
        
        n_regimes = self.K + 1
        n_obs = len(self.filtered_states)
        
        print("="*60)
        print("MARKOV SWITCHING DYNAMIC NELSON-SIEGEL MODEL")
        print("="*60)
        print(f"Observations: {n_obs}")
        print(f"Regimes: {n_regimes}")
        print(f"Factors: {3}")
        print(f"Maturities: {len(self.maturities)}")
        print(f"Log-likelihood: {self.log_likelihood:.6f}")
        
        print("\nREGIME PARAMETERS:")
        for j in range(n_regimes):
            print(f"\nRegime {j}:")
            print(f"  λ (decay): {self.lambda_params[j]:.6f}")
            print(f"  μ (drift): {self.mu_params[j]}")
            print(f"  A diagonal: {np.diag(self.A_params[j])}")
        
        print("\nTRANSITION PROBABILITIES:")
        for i in range(n_regimes):
            print(f"From regime {i}: {self.transition_probs[i]}")
        
        # Expected durations
        print("\nEXPECTED DURATIONS:")
        for j in range(n_regimes):
            duration = 1 / (1 - self.transition_probs[j, j])
            print(f"Regime {j}: {duration:.2f} periods")




In [None]:
# Usage Example:
"""
# Load yield curve data
yields = pd.read_csv('yield_data.csv').values  # T x m matrix
maturities = np.array([3, 6, 12, 24, 60, 120])  # months

# Fit model
model = MSDNS(n_regimes=2, n_factors=3)
model.fit(yields, maturities, optimization_method='multi_stage')

# Analyze results
model.summary()

# Generate forecasts
forecasts = model.forecast(forecast_horizon=12)
"""

yields= pd.read_csv('eom_data.csv') 
yields.drop(columns=['1 Mo','2 Mo','4 Mo','30 Yr'], inplace=True)
yields=yields[0:325].values
model = MSDNS(n_regimes=2)
maturities = np.array([3, 6, 12, 24, 36, 60, 84, 120, 240])  
model.fit(yields, maturities, optimization_method='multi_stage')
model.summary()
forecasts = model.forecast(forecast_horizon=12)
print(forecasts)

Fitting MSDNS model: 325 periods, 10 maturities
Stage 1: Global optimization...
