# Duration Hidden Markov Model for Market Regime Identification

Implementation of the Duration HMM from **Ntantamis (2009)** — *"A Duration Hidden Markov Model for the Identification of Regimes in Stock Market Returns"*.

**Core model:**
- 2-state HMM (bull/bear) with mixture-of-Normals emissions ($M=3$)
- Duration equation: $\log \tau = \kappa'Z + \zeta W$ — log-normal durations with covariates [intercept, T-bill, spread]
- Scaled forward/backward recursions (Devijver & Dekesel 1988) for numerical stability
- MPM state reconstruction via $\gamma_t(i)$
- Dietz-Böhning standard errors via deviance change

---

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Patch
from scipy.stats import norm, jarque_bera
from scipy.optimize import minimize
from statsmodels.tsa.stattools import acf
from hmmlearn.hmm import GaussianHMM
import warnings
warnings.filterwarnings('ignore')

plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({'figure.figsize': (14, 5), 'font.size': 11})

np.random.seed(42)

## 1. Data — S&P 500 Monthly Returns + FRED Rates

We pull:
- **S&P 500** monthly returns (via `yfinance`)
- **3-month T-bill** secondary market rate (FRED: `TB3MS`)
- **10-year Treasury** constant maturity rate (FRED: `GS10`)
- **Interest rate spread** = GS10 − TB3MS

In [None]:
import yfinance as yf
from pandas_datareader import data as pdr

# --- S&P 500 monthly returns ---
sp500 = yf.download('^GSPC', start='1989-12-01', end='2025-12-31', interval='1mo', auto_adjust=True)
sp500 = sp500[['Close']].dropna()
sp500.index = sp500.index.to_period('M').to_timestamp()
sp500 = sp500[~sp500.index.duplicated(keep='last')]
sp500['Return'] = sp500['Close'].pct_change()
sp500 = sp500.dropna()

# --- FRED interest rates ---
try:
    tbill = pdr.DataReader('TB3MS', 'fred', '1990-01-01', '2025-12-31').resample('MS').last()
    gs10 = pdr.DataReader('GS10', 'fred', '1990-01-01', '2025-12-31').resample('MS').last()
except Exception:
    # Fallback: use fred directly via pandas
    tbill = pd.read_csv(
        'https://fred.stlouisfed.org/graph/fredgraph.csv?bgcolor=%23e1e9f0&chart_type=line&drp=0&fo=open%20sans&graph_bgcolor=%23ffffff&height=450&mode=fred&recession_bars=on&txtcolor=%23444444&ts=12&tts=12&width=1168&nt=0&thu=0&trc=0&show_legend=yes&show_axis_titles=yes&show_tooltip=yes&id=TB3MS&scale=left&cosd=1990-01-01&coed=2025-12-31&line_color=%234572a7&link_values=false&line_style=solid&mark_type=none&mw=3&lw=2&ost=-99999&oet=99999&mma=0&fml=a&fq=Monthly&fam=avg&fgst=lin&fgsnd=2020-02-01&line_index=1&transformation=lin&vintage_date=2025-01-01&revision_date=2025-01-01',
        parse_dates=['DATE'], index_col='DATE'
    )
    gs10 = pd.read_csv(
        'https://fred.stlouisfed.org/graph/fredgraph.csv?bgcolor=%23e1e9f0&chart_type=line&drp=0&fo=open%20sans&graph_bgcolor=%23ffffff&height=450&mode=fred&recession_bars=on&txtcolor=%23444444&ts=12&tts=12&width=1168&nt=0&thu=0&trc=0&show_legend=yes&show_axis_titles=yes&show_tooltip=yes&id=GS10&scale=left&cosd=1990-01-01&coed=2025-12-31&line_color=%234572a7&link_values=false&line_style=solid&mark_type=none&mw=3&lw=2&ost=-99999&oet=99999&mma=0&fml=a&fq=Monthly&fam=avg&fgst=lin&fgsnd=2020-02-01&line_index=1&transformation=lin&vintage_date=2025-01-01&revision_date=2025-01-01',
        parse_dates=['DATE'], index_col='DATE'
    )

print(f'S&P 500 returns: {sp500.index[0].strftime("%Y-%m")} to {sp500.index[-1].strftime("%Y-%m")} ({len(sp500)} obs)')
print(f'T-Bill:          {tbill.index[0].strftime("%Y-%m")} to {tbill.index[-1].strftime("%Y-%m")} ({len(tbill)} obs)')
print(f'GS10:            {gs10.index[0].strftime("%Y-%m")} to {gs10.index[-1].strftime("%Y-%m")} ({len(gs10)} obs)')

In [None]:
# --- Merge and align ---
rates = pd.DataFrame({
    'tbill': tbill.iloc[:, 0],
    'gs10': gs10.iloc[:, 0]
})
rates.index = rates.index.to_period('M').to_timestamp()
rates = rates[~rates.index.duplicated(keep='last')]
rates['spread'] = rates['gs10'] - rates['tbill']

df = sp500[['Return']].join(rates, how='inner').dropna()
df.columns = ['return', 'tbill', 'gs10', 'spread']

# --- Train/test split (last 48 months = test) ---
TEST_MONTHS = 48
split_idx = len(df) - TEST_MONTHS
df_train = df.iloc[:split_idx].copy()
df_test = df.iloc[split_idx:].copy()

print(f'Full sample:  {df.index[0].strftime("%Y-%m")} to {df.index[-1].strftime("%Y-%m")} ({len(df)} months)')
print(f'Train:        {df_train.index[0].strftime("%Y-%m")} to {df_train.index[-1].strftime("%Y-%m")} ({len(df_train)} months)')
print(f'Test:         {df_test.index[0].strftime("%Y-%m")} to {df_test.index[-1].strftime("%Y-%m")} ({len(df_test)} months)')
df.head(10)

## 2. Exploratory Data Analysis

In [None]:
# --- Descriptive statistics (in-sample / out-of-sample) ---
def desc_stats(s, name=''):
    return pd.Series({
        'mean': s.mean(),
        'std': s.std(),
        'skewness': s.skew(),
        'kurtosis': s.kurtosis(),
        'min': s.min(),
        'max': s.max(),
        'JB stat': jarque_bera(s)[0],
        'JB p-val': jarque_bera(s)[1],
    }, name=name)

stats_in = pd.DataFrame([desc_stats(df_train[c], c) for c in ['return', 'tbill', 'gs10', 'spread']]).T
stats_out = pd.DataFrame([desc_stats(df_test[c], c) for c in ['return', 'tbill', 'gs10', 'spread']]).T

print('=== In-Sample Descriptive Statistics ===')
display(stats_in.round(5))
print('\n=== Out-of-Sample Descriptive Statistics ===')
display(stats_out.round(5))

In [None]:
# --- Time series plots ---
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

axes[0].plot(df.index, df['return'] * 100, color='firebrick', lw=0.9)
axes[0].axhline(0, color='k', lw=0.5)
axes[0].axvline(df_test.index[0], color='grey', ls='--', lw=1, label='Train/Test split')
axes[0].set_ylabel('Monthly Return (%)')
axes[0].set_title('S&P 500 Monthly Returns')
axes[0].legend()

axes[1].plot(df.index, df['tbill'], label='3M T-Bill', color='steelblue')
axes[1].plot(df.index, df['gs10'], label='10Y Treasury', color='darkorange')
axes[1].axvline(df_test.index[0], color='grey', ls='--', lw=1)
axes[1].set_ylabel('Rate (%)')
axes[1].set_title('Interest Rates')
axes[1].legend()

axes[2].fill_between(df.index, df['spread'], alpha=0.4, color='seagreen')
axes[2].axhline(0, color='k', lw=0.5)
axes[2].axvline(df_test.index[0], color='grey', ls='--', lw=1)
axes[2].set_ylabel('Spread (%)')
axes[2].set_title('Term Spread (10Y − 3M)')

plt.tight_layout()
plt.show()

In [None]:
# --- Return distribution and ACF ---
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# Histogram
ret = df_train['return'].values
axes[0].hist(ret, bins=50, density=True, alpha=0.6, color='steelblue', edgecolor='white')
x_grid = np.linspace(ret.min(), ret.max(), 200)
axes[0].plot(x_grid, norm.pdf(x_grid, ret.mean(), ret.std()), 'r-', lw=2, label='Normal fit')
axes[0].set_title('Return Distribution (In-Sample)')
axes[0].legend()

# QQ-plot
from scipy.stats import probplot
probplot(ret, dist='norm', plot=axes[1])
axes[1].set_title('Normal Q-Q Plot')

# ACF
acf_vals = acf(ret, nlags=20, fft=True)
axes[2].bar(range(21), acf_vals, color='steelblue', width=0.6)
axes[2].axhline(1.96/np.sqrt(len(ret)), color='r', ls='--', lw=0.8)
axes[2].axhline(-1.96/np.sqrt(len(ret)), color='r', ls='--', lw=0.8)
axes[2].set_title('ACF of Monthly Returns')
axes[2].set_xlabel('Lag')

plt.tight_layout()
plt.show()

jb_stat, jb_p = jarque_bera(ret)
print(f'Jarque-Bera test: statistic = {jb_stat:.3f}, p-value = {jb_p:.4f}')
print(f'  => {"Reject" if jb_p < 0.05 else "Fail to reject"} normality at 5% level')

## 3. Baseline HMM — Standard 2-State Gaussian (hmmlearn)

A standard 2-state Gaussian HMM as a benchmark. This model assumes:
- Time-homogeneous transition probabilities
- Single Gaussian emission per state
- No duration modelling, no exogenous covariates

In [None]:
# --- Fit standard 2-state Gaussian HMM ---
Y_train = df_train['return'].values.reshape(-1, 1)

baseline_hmm = GaussianHMM(
    n_components=2, covariance_type='full',
    n_iter=500, tol=1e-6, random_state=42
)
baseline_hmm.fit(Y_train)

# Identify states: bull = higher mean
means = baseline_hmm.means_.flatten()
if means[0] < means[1]:
    label_map = {0: 'Bear', 1: 'Bull'}
else:
    label_map = {0: 'Bull', 1: 'Bear'}

print('=== Baseline HMM Results ===')
print(f'Log-likelihood: {baseline_hmm.score(Y_train):.4f}')
print(f'\nInitial probs (pi): {baseline_hmm.startprob_}')
print(f'\nTransition matrix A:')
print(pd.DataFrame(baseline_hmm.transmat_,
                   index=[label_map[i] for i in range(2)],
                   columns=[label_map[i] for i in range(2)]).round(4))
print(f'\nEmission parameters:')
for i in range(2):
    mu = baseline_hmm.means_[i, 0]
    sigma = np.sqrt(baseline_hmm.covars_[i, 0, 0])
    print(f'  {label_map[i]:4s}: mean = {mu:.6f}, std = {sigma:.6f}')
    print(f'        E[duration] = {1/(1 - baseline_hmm.transmat_[i,i]):.1f} months')

In [None]:
# --- Baseline regime reconstruction ---
baseline_states = baseline_hmm.predict(Y_train)
baseline_probs = baseline_hmm.predict_proba(Y_train)

fig, axes = plt.subplots(2, 1, figsize=(14, 6), sharex=True)

# Determine which column is bull probability
bull_idx = 0 if label_map[0] == 'Bull' else 1
bull_prob = baseline_probs[:, bull_idx]

axes[0].plot(df_train.index, df_train['return'] * 100, color='k', lw=0.7)
axes[0].fill_between(df_train.index, df_train['return'].min()*100,
                     df_train['return'].max()*100,
                     where=bull_prob > 0.5, alpha=0.15, color='blue', label='Bull')
axes[0].fill_between(df_train.index, df_train['return'].min()*100,
                     df_train['return'].max()*100,
                     where=bull_prob <= 0.5, alpha=0.15, color='orange', label='Bear')
axes[0].set_ylabel('Return (%)')
axes[0].set_title('Baseline HMM — Regime Classification')
axes[0].legend()

axes[1].plot(df_train.index, bull_prob, color='steelblue', lw=1)
axes[1].axhline(0.5, color='r', ls='--', lw=0.8)
axes[1].set_ylabel('P(Bull)')
axes[1].set_title('Bull Market Posterior Probability')
axes[1].set_ylim(-0.05, 1.05)

plt.tight_layout()
plt.show()

## 4. Duration HMM — Full Implementation

### Model specification (Ntantamis 2009)

**Emissions:** Mixture of $M=3$ Normals per state $j$:
$$b_j(y_t) = \sum_{m=1}^{M} c_{jm} \, f(y_t; \mu_{jm}, \sigma_{jm}^2)$$

**Duration density:** Log-normal with covariates $Z = [1, \text{tbill}, \text{spread}]$:
$$d_j(\tau; Z) = \frac{1}{\tau} \phi\!\left(\frac{\ln \tau - \kappa_j' Z}{\zeta_j}\right)$$

**Transition matrix:** $A_{ii} = 0$ (no self-transitions; duration is explicit).

**Forward recursion (scaled, Devijver-Dekesel):**
$$\alpha_t(j) = \sum_{\tau \le t} \sum_{i \ne j} \alpha'_{t-\tau}(i) A_{ij} d_j(\tau; z_{t-\tau}) \left[\prod_{\theta=1}^{\tau-1} q_{t-\tau+\theta} b_j(y_{t-\tau+\theta})\right] b_j(y_t)$$

**Backward recursion (scaled):**
$$\beta_t(i) = \sum_{\tau \le T-t} \sum_{j \ne i} A_{ij} d_j(\tau; z_t) \left[\prod_{\theta=1}^{\tau-1} q_{t+\theta} b_j(y_{t+\theta})\right] b_j(y_{t+\tau}) \beta'_{t+\tau}(j)$$

where $q_t = \left[\sum_{j=1}^n \alpha_t(j)\right]^{-1}$ and $\alpha'_t(i) = \alpha_t(i) / \sum_j \alpha_t(j)$, $\beta'_t(i) = \beta_t(i) / \sum_j \alpha_t(j)$.

In [None]:
class DurationHMM:
    """
    Duration Hidden Markov Model (Ntantamis 2009).
    
    - N=2 states (bull/bear)
    - M=3 mixture-of-Normals emissions per state
    - Log-normal duration with exogenous covariates
    - Scaled forward/backward recursions (Devijver-Dekesel)
    """
    
    def __init__(self, n_states=2, n_mix=3, max_duration=None):
        self.N = n_states
        self.M = n_mix
        self.max_dur = max_duration  # set during fit
        
        # Parameters to be estimated
        self.pi = None       # initial state probs (N,)
        self.A = None        # transition matrix (N, N), A_ii = 0
        self.c = None        # mixing probs (N, M)
        self.mu = None       # mixture means (N, M)
        self.sigma2 = None   # mixture variances (N, M)
        self.kappa = None    # duration regression coeffs (N, K) where K = dim(Z)
        self.zeta = None     # duration std dev (N,)
        
    def _init_params(self, Y, Z):
        """Initialize parameters using simple heuristics."""
        T = len(Y)
        K = Z.shape[1]
        
        # Initial state probs
        self.pi = np.array([0.5, 0.5])
        
        # Transition matrix (A_ii = 0 for duration model)
        self.A = np.array([[0.0, 1.0],
                           [1.0, 0.0]])
        
        # Split data roughly in half by return magnitude for init
        median_ret = np.median(Y)
        high_mask = Y >= median_ret
        low_mask = ~high_mask
        
        # Mixture parameters — init with K-means-like split
        self.c = np.ones((self.N, self.M)) / self.M
        self.mu = np.zeros((self.N, self.M))
        self.sigma2 = np.zeros((self.N, self.M))
        
        for j, mask in enumerate([high_mask, low_mask]):
            subset = Y[mask]
            # Split subset into M roughly equal parts
            sorted_sub = np.sort(subset)
            chunk = max(1, len(sorted_sub) // self.M)
            for m in range(self.M):
                start = m * chunk
                end = min((m+1) * chunk, len(sorted_sub))
                if start < len(sorted_sub):
                    self.mu[j, m] = sorted_sub[start:end].mean()
                    self.sigma2[j, m] = max(sorted_sub[start:end].var(), 1e-6)
                else:
                    self.mu[j, m] = subset.mean()
                    self.sigma2[j, m] = max(subset.var(), 1e-6)
        
        # Duration parameters
        self.kappa = np.zeros((self.N, K))
        self.kappa[:, 0] = np.log(12)  # intercept => ~12 months expected duration
        self.zeta = np.ones(self.N) * 1.0
    
    def _emission_prob(self, y, j):
        """Mixture-of-normals emission probability b_j(y)."""
        prob = 0.0
        for m in range(self.M):
            prob += self.c[j, m] * norm.pdf(y, self.mu[j, m], np.sqrt(self.sigma2[j, m]))
        return max(prob, 1e-300)
    
    def _emission_prob_all(self, Y):
        """Compute b_j(y_t) for all t, j. Returns (T, N) array."""
        T = len(Y)
        B = np.zeros((T, self.N))
        for j in range(self.N):
            for m in range(self.M):
                B[:, j] += self.c[j, m] * norm.pdf(Y, self.mu[j, m], np.sqrt(self.sigma2[j, m]))
        B = np.maximum(B, 1e-300)
        return B
    
    def _duration_prob(self, tau, z, j):
        """Log-normal duration density d_j(tau; Z). Eq (7)."""
        if tau < 1:
            return 0.0
        log_tau = np.log(tau)
        mean = self.kappa[j] @ z
        std = self.zeta[j]
        return (1.0 / tau) * norm.pdf((log_tau - mean) / std) / std
    
    def _duration_prob_matrix(self, Z, max_dur):
        """
        Precompute d_j(tau; z_t) for all t, j, tau.
        Returns (T, N, max_dur) where index [t, j, d] = d_j(d+1; z_t).
        """
        T = Z.shape[0]
        D = np.zeros((T, self.N, max_dur))
        taus = np.arange(1, max_dur + 1, dtype=float)
        log_taus = np.log(taus)
        
        for j in range(self.N):
            for t in range(T):
                mean = self.kappa[j] @ Z[t]
                std = self.zeta[j]
                D[t, j, :] = (1.0 / taus) * norm.pdf((log_taus - mean) / std) / std
        
        return D
    
    def _forward_backward(self, Y, Z, B, D, max_dur):
        """
        Scaled forward-backward recursions (Devijver-Dekesel).
        Eqs (28)-(33) from the paper.
        
        Returns:
            alpha_prime: (T, N) scaled forward probs
            beta_prime: (T, N) scaled backward probs  
            q: (T,) scaling factors
            log_lik: total log-likelihood
        """
        T = len(Y)
        N = self.N
        
        alpha = np.zeros((T, N))       # unscaled forward
        alpha_prime = np.zeros((T, N)) # scaled forward = alpha / sum(alpha)
        q = np.zeros(T)                # q_t = 1 / sum_j alpha_t(j)
        log_lik = 0.0
        
        # --- Forward pass ---
        # t=0: alpha_1(j) = pi_j * b_j(y_1) * d_j(1; z_1)
        for j in range(N):
            alpha[0, j] = self.pi[j] * B[0, j] * D[0, j, 0]
        
        s = alpha[0].sum()
        if s > 0:
            q[0] = 1.0 / s
            alpha_prime[0] = alpha[0] / s
            log_lik += np.log(s)
        else:
            q[0] = 1.0
            alpha_prime[0] = 1.0 / N
        
        # t=1..T-1
        for t in range(1, T):
            for j in range(N):
                val = 0.0
                # Sum over durations tau=1..min(t+1, max_dur)
                max_tau = min(t + 1, max_dur)
                for tau in range(1, max_tau + 1):
                    t_start = t - tau  # time of transition
                    
                    # Product of scaled emission probs within the duration
                    # For theta=1..tau-1: q_{t-tau+theta} * b_j(y_{t-tau+theta})
                    # Plus the final b_j(y_t)
                    prod_b = B[t, j]  # the final emission
                    for theta in range(1, tau):
                        idx = t_start + theta
                        prod_b *= q[idx] * B[idx, j]
                    
                    # Duration prob at the start time
                    dur_p = D[max(t_start, 0), j, tau - 1]
                    
                    if t_start < 0:
                        # Comes from initial state
                        val += self.pi[j] * dur_p * prod_b
                    else:
                        # Comes from transition
                        for i in range(N):
                            if i != j:
                                val += alpha_prime[t_start, i] * self.A[i, j] * dur_p * prod_b
                
                alpha[t, j] = val
            
            s = alpha[t].sum()
            if s > 0:
                q[t] = 1.0 / s
                alpha_prime[t] = alpha[t] / s
                log_lik += np.log(s)
            else:
                q[t] = q[t-1]
                alpha_prime[t] = alpha_prime[t-1]
        
        # --- Backward pass ---
        beta = np.zeros((T, N))
        beta_prime = np.zeros((T, N))
        
        # t=T-1: beta_T(j) = 1
        beta[T-1] = 1.0
        s_alpha = alpha[T-1].sum()
        if s_alpha > 0:
            beta_prime[T-1] = beta[T-1] / s_alpha
        else:
            beta_prime[T-1] = 1.0 / N
        
        for t in range(T-2, -1, -1):
            for i in range(N):
                val = 0.0
                max_tau = min(T - t, max_dur)
                for tau in range(1, max_tau):
                    t_end = t + tau
                    for j_next in range(N):
                        if j_next != i:
                            dur_p = D[t, j_next, tau - 1]
                            
                            prod_b = B[t_end, j_next]  # b_j(y_{t+tau})
                            for theta in range(1, tau):
                                idx = t + theta
                                prod_b *= q[idx] * B[idx, j_next]
                            
                            val += self.A[i, j_next] * dur_p * prod_b * beta_prime[t_end, j_next]
                
                beta[t, i] = val
            
            s_alpha = alpha[t].sum()
            if s_alpha > 0:
                beta_prime[t] = beta[t] / s_alpha
            else:
                beta_prime[t] = beta_prime[t+1]
        
        return alpha_prime, beta_prime, q, log_lik
    
    def _compute_gamma(self, alpha_prime, beta_prime, q):
        """Compute gamma_t(i) = P(X_t = i | Y_{1:T}). Eq (13)."""
        T = alpha_prime.shape[0]
        gamma = alpha_prime * beta_prime / q[:, None]
        # Normalize
        row_sums = gamma.sum(axis=1, keepdims=True)
        row_sums = np.where(row_sums > 0, row_sums, 1.0)
        gamma = gamma / row_sums
        return gamma
    
    def _m_step_emissions(self, Y, gamma):
        """Update mixture emission parameters (c, mu, sigma2). Eqs (17)-(19)."""
        T = len(Y)
        
        for j in range(self.N):
            # Compute mixture responsibilities w_{jm}(t)
            w = np.zeros((T, self.M))
            for m in range(self.M):
                w[:, m] = self.c[j, m] * norm.pdf(Y, self.mu[j, m], np.sqrt(self.sigma2[j, m]))
            w_sum = w.sum(axis=1, keepdims=True)
            w_sum = np.where(w_sum > 0, w_sum, 1.0)
            w = w / w_sum
            
            # gamma_t(j, k) = gamma_t(j) * w_{jk}(t)
            gamma_jk = gamma[:, j:j+1] * w  # (T, M)
            
            # Update mixing probs
            gamma_jk_sum = gamma_jk.sum(axis=0)
            total = gamma_jk_sum.sum()
            if total > 0:
                self.c[j] = gamma_jk_sum / total
            
            # Update means and variances
            for m in range(self.M):
                denom = gamma_jk[:, m].sum()
                if denom > 1e-10:
                    self.mu[j, m] = (gamma_jk[:, m] * Y).sum() / denom
                    self.sigma2[j, m] = max(
                        (gamma_jk[:, m] * (Y - self.mu[j, m])**2).sum() / denom,
                        1e-8
                    )
    
    def _m_step_transition(self, alpha_prime, beta_prime, q, B, D, Z, max_dur):
        """Update transition matrix A. Eq (33)."""
        T = len(alpha_prime)
        num = np.zeros((self.N, self.N))
        denom = np.zeros(self.N)
        
        for t in range(T-1):
            for i in range(self.N):
                denom[i] += alpha_prime[t, i] * beta_prime[t, i] / q[t]
                for j in range(self.N):
                    if i != j:
                        # Sum over valid durations
                        max_tau = min(T - t, max_dur)
                        for tau in range(1, max_tau):
                            t_end = t + tau
                            dur_p = D[t, j, tau - 1]
                            
                            prod_b = B[t_end, j]
                            for theta in range(1, tau):
                                prod_b *= q[t + theta] * B[t + theta, j]
                            
                            num[i, j] += alpha_prime[t, i] * self.A[i, j] * dur_p * prod_b * beta_prime[t_end, j]
        
        # Update A (keep A_ii = 0)
        for i in range(self.N):
            for j in range(self.N):
                if i != j and denom[i] > 1e-10:
                    self.A[i, j] = num[i, j] / denom[i]
            # Normalize row (excluding diagonal)
            row_sum = sum(self.A[i, k] for k in range(self.N) if k != i)
            if row_sum > 0:
                for j in range(self.N):
                    if i != j:
                        self.A[i, j] /= row_sum
    
    def _m_step_duration(self, Y, Z, alpha_prime, beta_prime, q, B, D, max_dur):
        """
        MLE for duration parameters (kappa, zeta) via numerical optimization.
        Uses the gradient conditions from Eqs (22)-(25).
        """
        T = len(Y)
        K = Z.shape[1]
        
        for j in range(self.N):
            def neg_dur_loglik(params):
                kap = params[:K]
                zet = max(params[K], 0.01)
                
                ll = 0.0
                for t in range(T-1):
                    max_tau = min(T - t, max_dur)
                    for tau in range(1, max_tau):
                        log_tau = np.log(tau)
                        mean = kap @ Z[t]
                        d_val = (1.0/tau) * norm.pdf((log_tau - mean)/zet) / zet
                        
                        if d_val > 1e-300:
                            # Weight by forward-backward
                            t_end = t + tau
                            if t_end < T:
                                for i in range(self.N):
                                    if i != j:
                                        prod_b = B[t_end, j]
                                        for theta in range(1, tau):
                                            prod_b *= q[t+theta] * B[t+theta, j]
                                        
                                        weight = alpha_prime[t, i] * self.A[i, j] * prod_b * beta_prime[t_end, j]
                                        if weight > 0 and d_val > 0:
                                            ll += weight * np.log(max(d_val, 1e-300))
                
                return -ll
            
            x0 = np.concatenate([self.kappa[j], [self.zeta[j]]])
            bounds = [(None, None)] * K + [(0.01, 5.0)]
            
            try:
                res = minimize(neg_dur_loglik, x0, method='L-BFGS-B',
                             bounds=bounds, options={'maxiter': 50, 'ftol': 1e-6})
                if res.success or res.fun < neg_dur_loglik(x0):
                    self.kappa[j] = res.x[:K]
                    self.zeta[j] = max(res.x[K], 0.01)
            except Exception:
                pass
    
    def fit(self, Y, Z, max_iter=30, tol=1e-4, max_duration=None, verbose=True):
        """
        Fit Duration HMM via EM.
        
        Parameters:
            Y: (T,) observation sequence (returns)
            Z: (T, K) exogenous covariates (should include intercept column)
            max_iter: maximum EM iterations
            tol: convergence tolerance on log-likelihood
            max_duration: cap on duration (for computational tractability)
        """
        T = len(Y)
        if max_duration is None:
            max_duration = min(T // 3, 60)  # cap at 60 months or T/3
        self.max_dur = max_duration
        
        self._init_params(Y, Z)
        
        log_liks = []
        
        for iteration in range(max_iter):
            # --- E step ---
            B = self._emission_prob_all(Y)
            D = self._duration_prob_matrix(Z, self.max_dur)
            alpha_prime, beta_prime, q, log_lik = self._forward_backward(Y, Z, B, D, self.max_dur)
            gamma = self._compute_gamma(alpha_prime, beta_prime, q)
            
            log_liks.append(log_lik)
            
            if verbose:
                print(f'  Iter {iteration+1:3d}: log-lik = {log_lik:.4f}')
            
            # Check convergence
            if iteration > 0 and abs(log_liks[-1] - log_liks[-2]) < tol:
                if verbose:
                    print(f'  Converged at iteration {iteration+1}')
                break
            
            # --- M step ---
            # Update initial probs
            self.pi = gamma[0]
            self.pi = np.maximum(self.pi, 1e-6)
            self.pi /= self.pi.sum()
            
            # Update emissions
            self._m_step_emissions(Y, gamma)
            
            # Update transition matrix
            self._m_step_transition(alpha_prime, beta_prime, q, B, D, Z, self.max_dur)
            
            # Update duration parameters
            self._m_step_duration(Y, Z, alpha_prime, beta_prime, q, B, D, self.max_dur)
        
        # Final E-step for gamma
        B = self._emission_prob_all(Y)
        D = self._duration_prob_matrix(Z, self.max_dur)
        alpha_prime, beta_prime, q, log_lik = self._forward_backward(Y, Z, B, D, self.max_dur)
        self.gamma_ = self._compute_gamma(alpha_prime, beta_prime, q)
        self.log_liks_ = log_liks
        self.log_lik_ = log_lik
        
        return self
    
    def mpm_states(self):
        """Maximum Posterior Mode state sequence. Eq (41)."""
        return np.argmax(self.gamma_, axis=1)
    
    def viterbi(self, Y, Z):
        """Viterbi decoding for MAP state sequence."""
        T = len(Y)
        B = self._emission_prob_all(Y)
        D = self._duration_prob_matrix(Z, self.max_dur)
        
        # log-space Viterbi
        log_delta = np.full((T, self.N), -np.inf)
        psi = np.zeros((T, self.N), dtype=int)
        
        for j in range(self.N):
            if self.pi[j] > 0 and B[0, j] > 0 and D[0, j, 0] > 0:
                log_delta[0, j] = np.log(self.pi[j]) + np.log(B[0, j]) + np.log(D[0, j, 0])
        
        for t in range(1, T):
            for j in range(self.N):
                best_val = -np.inf
                best_i = 0
                max_tau = min(t + 1, self.max_dur)
                
                for tau in range(1, max_tau + 1):
                    t_start = t - tau
                    log_b_prod = np.log(max(B[t, j], 1e-300))
                    for theta in range(1, tau):
                        log_b_prod += np.log(max(B[t_start + theta, j], 1e-300))
                    
                    dur_p = D[max(t_start, 0), j, tau - 1]
                    log_dur = np.log(max(dur_p, 1e-300))
                    
                    if t_start < 0:
                        val = np.log(max(self.pi[j], 1e-300)) + log_dur + log_b_prod
                        if val > best_val:
                            best_val = val
                            best_i = j
                    else:
                        for i in range(self.N):
                            if i != j and self.A[i, j] > 0:
                                val = log_delta[t_start, i] + np.log(self.A[i, j]) + log_dur + log_b_prod
                                if val > best_val:
                                    best_val = val
                                    best_i = i
                
                log_delta[t, j] = best_val
                psi[t, j] = best_i
        
        # Backtrack
        states = np.zeros(T, dtype=int)
        states[T-1] = np.argmax(log_delta[T-1])
        for t in range(T-2, -1, -1):
            states[t] = psi[t+1, states[t+1]]
        
        return states
    
    def expected_duration(self, z, j):
        """Expected duration for state j given covariates z. Eq (39): E(tau) = exp(kappa'Z + zeta^2/2)."""
        return np.exp(self.kappa[j] @ z + self.zeta[j]**2 / 2)
    
    def forecast_paths(self, Z_future, n_paths=5000):
        """
        Simulate forecast paths per Section 2.5.
        
        Parameters:
            Z_future: (H, K) future covariates for H periods ahead
            n_paths: number of Monte Carlo paths
        
        Returns:
            paths: (n_paths, H) simulated returns
        """
        H = Z_future.shape[0]
        paths = np.zeros((n_paths, H))
        
        for v in range(n_paths):
            t = 0
            # Draw initial state
            state = np.random.choice(self.N, p=self.pi)
            
            while t < H:
                # Draw duration from log-normal
                mean = self.kappa[state] @ Z_future[min(t, H-1)]
                log_dur = mean + self.zeta[state] * np.random.randn()
                dur = max(1, int(np.round(np.exp(log_dur))))
                
                # Fill returns for this duration
                # Draw from mixture for this state
                for d in range(dur):
                    if t + d >= H:
                        break
                    # Pick mixture component
                    comp = np.random.choice(self.M, p=self.c[state])
                    paths[v, t + d] = np.random.normal(self.mu[state, comp],
                                                       np.sqrt(self.sigma2[state, comp]))
                
                t += dur
                
                # Transition to next state (A_ii = 0)
                probs = self.A[state].copy()
                probs[state] = 0
                if probs.sum() > 0:
                    probs /= probs.sum()
                    state = np.random.choice(self.N, p=probs)
                else:
                    state = 1 - state  # 2-state: just flip
        
        return paths
    
    def standard_errors_deviance(self, Y, Z):
        """
        Dietz-Böhning standard errors via deviance change. Eq (34).
        s.e.(lambda_i) = |lambda_i| / sqrt(2 * (l_full - l_restricted))
        """
        B = self._emission_prob_all(Y)
        D = self._duration_prob_matrix(Z, self.max_dur)
        _, _, _, ll_full = self._forward_backward(Y, Z, B, D, self.max_dur)
        
        se_dict = {}
        
        # Standard errors for duration parameters
        for j in range(self.N):
            K = Z.shape[1]
            for k in range(K):
                orig_val = self.kappa[j, k]
                self.kappa[j, k] = 0.0
                
                D_r = self._duration_prob_matrix(Z, self.max_dur)
                _, _, _, ll_r = self._forward_backward(Y, Z, B, D_r, self.max_dur)
                
                self.kappa[j, k] = orig_val
                
                dev_change = 2 * (ll_full - ll_r)
                if dev_change > 0:
                    se_dict[f'kappa_{j+1},{k+1}'] = abs(orig_val) / np.sqrt(dev_change)
                else:
                    se_dict[f'kappa_{j+1},{k+1}'] = np.nan
        
        return se_dict

In [None]:
# --- Prepare data ---
Y_tr = df_train['return'].values
Z_tr = np.column_stack([
    np.ones(len(df_train)),       # intercept
    df_train['tbill'].values,     # T-bill rate
    df_train['spread'].values     # interest rate spread
])

print(f'Training: T={len(Y_tr)}, K={Z_tr.shape[1]} covariates [intercept, tbill, spread]')
print(f'Max duration capped at {min(len(Y_tr)//3, 60)} months\n')

# --- Fit Duration HMM ---
dhmm = DurationHMM(n_states=2, n_mix=3)
dhmm.fit(Y_tr, Z_tr, max_iter=30, tol=1e-4, verbose=True)

In [None]:
# --- EM convergence ---
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(range(1, len(dhmm.log_liks_)+1), dhmm.log_liks_, 'o-', color='steelblue')
ax.set_xlabel('EM Iteration')
ax.set_ylabel('Log-Likelihood')
ax.set_title('Duration HMM — EM Convergence')
plt.tight_layout()
plt.show()

In [None]:
# --- Parameter tables ---

# Identify bull/bear: bull = higher mean return (weighted by mixture)
state_means = [(dhmm.c[j] * dhmm.mu[j]).sum() for j in range(2)]
state_vars = [(dhmm.c[j] * (dhmm.sigma2[j] + dhmm.mu[j]**2)).sum() - state_means[j]**2 for j in range(2)]
bull_state = int(np.argmax(state_means))
bear_state = 1 - bull_state
state_labels = {bull_state: 'Bull (State 1)', bear_state: 'Bear (State 2)'}

print('=== Markov Chain Parameters ===')
print(f'Initial probs (pi): Bull={dhmm.pi[bull_state]:.4f}, Bear={dhmm.pi[bear_state]:.4f}')
print(f'\nTransition matrix A (A_ii = 0 by construction):')
A_df = pd.DataFrame(dhmm.A, 
                    index=[state_labels[i] for i in range(2)],
                    columns=[state_labels[i] for i in range(2)])
display(A_df.round(4))

print('\n=== Mixture-of-Normals Emission Parameters ===')
for j in range(2):
    print(f'\n  {state_labels[j]}:')
    em_df = pd.DataFrame({
        'Mixing Prob': dhmm.c[j],
        'Mean': dhmm.mu[j],
        'Variance': dhmm.sigma2[j],
        'Std Dev': np.sqrt(dhmm.sigma2[j])
    }, index=[f'Component {m+1}' for m in range(dhmm.M)])
    display(em_df.round(6))

print(f'\n  Overall state characteristics:')
char_df = pd.DataFrame({
    'Mean': state_means,
    'Variance': state_vars,
    'Variability Coef': [np.sqrt(v)/abs(m) if abs(m) > 1e-8 else np.nan for m, v in zip(state_means, state_vars)]
}, index=[state_labels[i] for i in range(2)])
display(char_df.round(6))

In [None]:
# --- Duration parameter table ---
print('=== Duration Model Parameters ===')
print('log(tau) = kappa_1 + kappa_2 * tbill + kappa_3 * spread + zeta * W\n')

# Compute standard errors
se = dhmm.standard_errors_deviance(Y_tr, Z_tr)

dur_rows = []
param_names = ['kappa_1 (intercept)', 'kappa_2 (T-bill)', 'kappa_3 (spread)', 'zeta']
for j in range(2):
    row = {}
    for k in range(3):
        row[f'Coeff'] = None  # placeholder
    dur_rows.append(row)

for j in range(2):
    print(f'  {state_labels[j]}:')
    data = []
    for k in range(3):
        se_key = f'kappa_{j+1},{k+1}'
        se_val = se.get(se_key, np.nan)
        data.append({
            'Parameter': param_names[k],
            'Coefficient': dhmm.kappa[j, k],
            'Std. Error': se_val
        })
    data.append({
        'Parameter': param_names[3],
        'Coefficient': dhmm.zeta[j],
        'Std. Error': np.nan
    })
    display(pd.DataFrame(data).set_index('Parameter').round(4))

# Expected durations at sample mean covariates
z_mean = Z_tr.mean(axis=0)
print(f'\nExpected durations at sample-mean covariates (tbill={z_mean[1]:.2f}%, spread={z_mean[2]:.2f}%):')
for j in range(2):
    ed = dhmm.expected_duration(z_mean, j)
    print(f'  {state_labels[j]}: E[tau] = {ed:.1f} months')

## 5. Quantitative Analysis — Duration Elasticities & Policy Scenarios

In [None]:
# --- Duration elasticities ---
# E(tau) = exp(kappa'Z + zeta^2/2)
# d E(tau) / d Z_k = kappa_k * E(tau)
# % change in E(tau) for 1% change in Z_k = kappa_k (semi-elasticity)
# For 1 ppt increase in rate: % change = exp(kappa_k * 1) - 1

print('=== Duration Elasticities ===')
print('Effect of 1 percentage point increase in each covariate on expected duration:\n')

z_base = Z_tr.mean(axis=0)

for j in range(2):
    ed_base = dhmm.expected_duration(z_base, j)
    print(f'{state_labels[j]} (baseline E[tau] = {ed_base:.1f} months):')
    
    # T-bill +1%
    z_tbill = z_base.copy()
    z_tbill[1] += 1.0
    ed_tbill = dhmm.expected_duration(z_tbill, j)
    pct_tbill = (ed_tbill / ed_base - 1) * 100
    print(f'  T-bill +1%:   E[tau] = {ed_tbill:.1f} months ({pct_tbill:+.1f}%)')
    
    # Spread +1%
    z_spread = z_base.copy()
    z_spread[2] += 1.0
    ed_spread = dhmm.expected_duration(z_spread, j)
    pct_spread = (ed_spread / ed_base - 1) * 100
    print(f'  Spread +1%:   E[tau] = {ed_spread:.1f} months ({pct_spread:+.1f}%)')
    print()

In [None]:
# --- Policy scenarios: +/-25bp Fed rate change ---
# Assumption: 25bp cut in Fed rate => T-bill drops ~25bp, spread increases ~11bp
# (based on typical pass-through; Ntantamis uses full and partial pass-through)

print('=== Policy Scenarios: Fed Rate Changes ===')
print('Assumption: 25bp Fed cut => T-bill -25bp, spread +11bp\n')

scenarios = [
    ('Fed -25bp (easing)', -0.25, +0.11),
    ('Fed +25bp (tightening)', +0.25, -0.11),
    ('Fed -50bp (aggressive easing)', -0.50, +0.22),
    ('Fed +50bp (aggressive tightening)', +0.50, -0.22),
]

z_now = Z_tr[-1].copy()  # latest covariates
print(f'Current rates: T-bill = {z_now[1]:.2f}%, spread = {z_now[2]:.2f}%\n')

results = []
for name, d_tbill, d_spread in scenarios:
    row = {'Scenario': name}
    z_scen = z_now.copy()
    z_scen[1] += d_tbill
    z_scen[2] += d_spread
    
    for j in range(2):
        ed_base = dhmm.expected_duration(z_now, j)
        ed_scen = dhmm.expected_duration(z_scen, j)
        pct = (ed_scen / ed_base - 1) * 100
        row[f'{state_labels[j]} E[tau]'] = f'{ed_scen:.1f}m ({pct:+.1f}%)'
    results.append(row)

display(pd.DataFrame(results).set_index('Scenario'))

In [None]:
# --- Rate sensitivity surface ---
tbill_grid = np.linspace(0, 8, 50)
spread_grid = np.linspace(-1, 4, 50)
TB, SP = np.meshgrid(tbill_grid, spread_grid)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

for idx, j in enumerate([bull_state, bear_state]):
    ED = np.zeros_like(TB)
    for ii in range(50):
        for jj in range(50):
            z = np.array([1.0, TB[ii, jj], SP[ii, jj]])
            ED[ii, jj] = dhmm.expected_duration(z, j)
    
    # Clip for visualization
    ED = np.clip(ED, 0, 120)
    
    cs = axes[idx].contourf(TB, SP, ED, levels=20, cmap='RdYlGn' if j == bull_state else 'RdYlGn_r')
    axes[idx].set_xlabel('T-Bill Rate (%)')
    axes[idx].set_ylabel('Spread (%)')
    axes[idx].set_title(f'{state_labels[j]} — Expected Duration (months)')
    plt.colorbar(cs, ax=axes[idx])
    
    # Mark current position
    axes[idx].plot(z_now[1], z_now[2], 'k*', markersize=15, label='Current')
    axes[idx].legend()

plt.tight_layout()
plt.show()

## 6. Regime Reconstruction — MPM + Viterbi

In [None]:
# --- MPM state reconstruction ---
mpm_states = dhmm.mpm_states()
bull_prob_dhmm = dhmm.gamma_[:, bull_state]

# --- Viterbi ---
viterbi_states = dhmm.viterbi(Y_tr, Z_tr)

# --- Known crisis events for annotation ---
crises = [
    ('1990-07', '1991-03', 'Gulf War\nRecession'),
    ('1997-07', '1998-10', 'Asian/LTCM\nCrisis'),
    ('2000-03', '2002-10', 'Dot-com\nBust'),
    ('2007-12', '2009-06', 'GFC'),
    ('2020-02', '2020-04', 'COVID'),
    ('2022-01', '2022-10', 'Rate Hike\nSelloff'),
]

fig, axes = plt.subplots(3, 1, figsize=(16, 10), sharex=True)

# Panel 1: Returns with regime shading (MPM)
ax = axes[0]
ax.plot(df_train.index, df_train['return'] * 100, color='k', lw=0.6)
is_bull = mpm_states == bull_state
ymin, ymax = df_train['return'].min() * 100, df_train['return'].max() * 100
ax.fill_between(df_train.index, ymin, ymax, where=is_bull, alpha=0.12, color='blue', label='Bull')
ax.fill_between(df_train.index, ymin, ymax, where=~is_bull, alpha=0.12, color='red', label='Bear')
ax.axhline(0, color='grey', lw=0.5)
ax.set_ylabel('Monthly Return (%)')
ax.set_title('Duration HMM — MPM Regime Reconstruction')
ax.legend(loc='upper left')

# Add crisis annotations
for start, end, label in crises:
    try:
        s = pd.Timestamp(start)
        e = pd.Timestamp(end)
        if s >= df_train.index[0] and s <= df_train.index[-1]:
            ax.axvspan(s, e, alpha=0.08, color='grey')
            ax.annotate(label, xy=(s + (e-s)/2, ymax*0.85),
                       fontsize=7, ha='center', va='top', color='grey')
    except Exception:
        pass

# Panel 2: Bull probability
ax = axes[1]
ax.fill_between(df_train.index, 0, bull_prob_dhmm, alpha=0.4, color='steelblue')
ax.axhline(0.5, color='r', ls='--', lw=0.8)
ax.set_ylabel('P(Bull)')
ax.set_title('Posterior Bull Probability — γ_t(bull)')
ax.set_ylim(-0.05, 1.05)

# Panel 3: Viterbi vs MPM comparison
ax = axes[2]
viterbi_bull = (viterbi_states == bull_state).astype(int)
mpm_bull = (mpm_states == bull_state).astype(int)
ax.step(df_train.index, mpm_bull, where='mid', color='steelblue', lw=1.2, label='MPM', alpha=0.8)
ax.step(df_train.index, viterbi_bull * 0.95, where='mid', color='darkorange', lw=1.2, label='Viterbi', alpha=0.8)
ax.set_yticks([0, 1])
ax.set_yticklabels(['Bear', 'Bull'])
ax.set_title('MPM vs Viterbi State Sequence')
ax.legend()

plt.tight_layout()
plt.show()

# Agreement
agree = (mpm_bull == viterbi_bull).mean()
print(f'MPM / Viterbi agreement: {agree:.1%}')
print(f'Bull months (MPM): {mpm_bull.sum()} / {len(mpm_bull)} ({mpm_bull.mean():.1%})')
print(f'Bear months (MPM): {(1-mpm_bull).sum()} / {len(mpm_bull)} ({(1-mpm_bull).mean():.1%})')

## 7. Forecasting — Path Simulation vs Benchmarks

In [None]:
# --- Benchmark 1: AR(1)-GARCH(1,1) ---
from arch import arch_model

# Model 1: AR(1)-GARCH(1,1)
y_pct_train = df_train['return'].values * 100  # in percent
y_pct_test = df_test['return'].values * 100

am1 = arch_model(y_pct_train, mean='AR', lags=1, vol='GARCH', p=1, q=1)
res1 = am1.fit(disp='off')
print('=== Model 1: AR(1)-GARCH(1,1) ===')
print(res1.summary().tables[1])

# Model 2: Rate-regression GARCH
# r_t = c0 + c1*tbill_{t-1} + c2*spread_{t-1} + eps_t, eps_t ~ GARCH
exog_train = df_train[['tbill', 'spread']].shift(1).dropna().values
y_m2_train = y_pct_train[1:]  # align

am2 = arch_model(y_m2_train, x=exog_train, mean='ARX', lags=0, vol='GARCH', p=1, q=1)
res2 = am2.fit(disp='off')
print('\n=== Model 2: Rate-Regression GARCH ===')
print(res2.summary().tables[1])

In [None]:
# --- Duration HMM forecasts via path simulation (V=5000) ---
H = len(df_test)
Z_test = np.column_stack([
    np.ones(H),
    df_test['tbill'].values,
    df_test['spread'].values
])

print(f'Simulating {5000} forecast paths for {H} months...')
paths = dhmm.forecast_paths(Z_test, n_paths=5000)
dhmm_forecast = paths.mean(axis=0)  # mean forecast
dhmm_forecast_std = paths.std(axis=0)

# --- Benchmark forecasts (rolling 1-step) ---
# AR(1)-GARCH: use unconditional mean as multi-step forecast
ar1_forecast = np.full(H, res1.params['Const'] / (1 - res1.params.get('y[1]', 0))) / 100

# Rate-regression: use actual rates as if known (same assumption as in paper)
exog_test = df_test[['tbill', 'spread']].values
reg_forecast = np.full(H, res2.params['Const'])
if 'x1' in res2.params:
    reg_forecast += res2.params['x1'] * exog_test[:, 0]
if 'x2' in res2.params:
    reg_forecast += res2.params['x2'] * exog_test[:, 1]
reg_forecast /= 100

actual = df_test['return'].values

# --- RMSE at various horizons ---
def rolling_rmse(actual, forecast, horizon):
    """RMSE for rolling h-step ahead forecasts."""
    n = len(actual) - horizon + 1
    if n <= 0:
        return np.nan
    errors = []
    for i in range(n):
        errors.append((actual[i:i+horizon].mean() - forecast[i:i+horizon].mean())**2)
    return np.sqrt(np.mean(errors))

horizons = [1, 3, 6, 12]
rmse_table = []
for h in horizons:
    rmse_table.append({
        'Horizon': f'+{h}m',
        'AR(1)-GARCH': rolling_rmse(actual, ar1_forecast, h),
        'Rate-Reg GARCH': rolling_rmse(actual, reg_forecast, h),
        'Duration HMM': rolling_rmse(actual, dhmm_forecast, h),
    })

print('\n=== Forecasting RMSE Comparison ===')
display(pd.DataFrame(rmse_table).set_index('Horizon').round(6))

In [None]:
# --- Forecast fan chart ---
fig, ax = plt.subplots(figsize=(14, 5))

ax.plot(df_test.index, actual * 100, 'k-', lw=1.2, label='Actual')
ax.plot(df_test.index, dhmm_forecast * 100, 'b-', lw=1, label='DHMM Mean Forecast')
ax.fill_between(df_test.index,
                (dhmm_forecast - 2*dhmm_forecast_std) * 100,
                (dhmm_forecast + 2*dhmm_forecast_std) * 100,
                alpha=0.15, color='blue', label='DHMM ±2σ')
ax.plot(df_test.index, ar1_forecast * 100, 'r--', lw=0.8, label='AR(1)-GARCH')
ax.plot(df_test.index, reg_forecast * 100, 'g--', lw=0.8, label='Rate-Reg GARCH')
ax.axhline(0, color='grey', lw=0.5)
ax.set_ylabel('Monthly Return (%)')
ax.set_title('Out-of-Sample Forecast Comparison')
ax.legend()
plt.tight_layout()
plt.show()

## 8. VaR Overlay — Regime-Conditional Value at Risk

In [None]:
# --- Regime-conditional VaR ---
# For each state j, VaR from the mixture distribution
def mixture_var(c, mu, sigma2, alpha):
    """VaR at level alpha from mixture of normals via Monte Carlo."""
    n_sim = 100000
    samples = np.zeros(n_sim)
    for m in range(len(c)):
        mask = np.random.rand(n_sim) < c[m] if m == 0 else (
            np.random.rand(n_sim) < c[m] / (1 - sum(c[:m]))
        )
    # Simpler: just sample properly
    comps = np.random.choice(len(c), size=n_sim, p=c)
    for m in range(len(c)):
        mask = comps == m
        samples[mask] = np.random.normal(mu[m], np.sqrt(sigma2[m]), mask.sum())
    return np.percentile(samples, alpha * 100)

print('=== Regime-Conditional VaR ===')
var_results = []
for j in range(2):
    var_95 = mixture_var(dhmm.c[j], dhmm.mu[j], dhmm.sigma2[j], 0.05)
    var_99 = mixture_var(dhmm.c[j], dhmm.mu[j], dhmm.sigma2[j], 0.01)
    var_results.append({
        'State': state_labels[j],
        'VaR 95%': f'{var_95*100:.2f}%',
        'VaR 99%': f'{var_99*100:.2f}%',
        'Mean Return': f'{state_means[j]*100:.2f}%',
    })
    
display(pd.DataFrame(var_results).set_index('State'))

In [None]:
# --- Time-varying VaR using regime probabilities ---
# VaR_t = gamma_t(bull) * VaR_bull + gamma_t(bear) * VaR_bear
# (blended using posterior probabilities)

n_sim = 50000

def regime_blended_var(gamma_bull, dhmm, bull_state, bear_state, alpha, n_sim=50000):
    """Compute time-varying VaR blended by regime probability."""
    T = len(gamma_bull)
    var_series = np.zeros(T)
    
    for t in range(T):
        p_bull = gamma_bull[t]
        samples = np.zeros(n_sim)
        
        # For each simulation draw, first pick regime, then mixture component
        regime = np.random.binomial(1, p_bull, n_sim)  # 1=bull, 0=bear
        
        for j, state in [(1, bull_state), (0, bear_state)]:
            mask = regime == j
            n_j = mask.sum()
            if n_j > 0:
                comps = np.random.choice(dhmm.M, size=n_j, p=dhmm.c[state])
                for m in range(dhmm.M):
                    m_mask = comps == m
                    n_m = m_mask.sum()
                    if n_m > 0:
                        samples[mask][m_mask] = np.random.normal(
                            dhmm.mu[state, m], np.sqrt(dhmm.sigma2[state, m]), n_m
                        )
        
        var_series[t] = np.percentile(samples, alpha * 100)
    
    return var_series

var_95 = regime_blended_var(bull_prob_dhmm, dhmm, bull_state, bear_state, 0.05, n_sim=20000)
var_99 = regime_blended_var(bull_prob_dhmm, dhmm, bull_state, bear_state, 0.01, n_sim=20000)

fig, ax = plt.subplots(figsize=(14, 5))
ax.plot(df_train.index, df_train['return'] * 100, 'k-', lw=0.6, label='Actual Return')
ax.plot(df_train.index, var_95 * 100, 'r-', lw=1, label='VaR 95%')
ax.plot(df_train.index, var_99 * 100, 'darkred', lw=1, ls='--', label='VaR 99%')
ax.fill_between(df_train.index, var_99 * 100, var_95 * 100, alpha=0.15, color='red')
ax.set_ylabel('Monthly Return (%)')
ax.set_title('Regime-Conditional Time-Varying VaR')
ax.legend()
plt.tight_layout()
plt.show()

# VaR breach rates
breach_95 = (df_train['return'].values < var_95).mean()
breach_99 = (df_train['return'].values < var_99).mean()
print(f'VaR 95% breach rate: {breach_95:.1%} (expected: 5.0%)')
print(f'VaR 99% breach rate: {breach_99:.1%} (expected: 1.0%)')

## 9. Current Regime Assessment

In [None]:
# --- Run model on full sample to get current regime ---
Y_full = df['return'].values
Z_full = np.column_stack([
    np.ones(len(df)),
    df['tbill'].values,
    df['spread'].values
])

# Refit on full sample
dhmm_full = DurationHMM(n_states=2, n_mix=3)
dhmm_full.fit(Y_full, Z_full, max_iter=30, tol=1e-4, verbose=False)

# Identify states
full_means = [(dhmm_full.c[j] * dhmm_full.mu[j]).sum() for j in range(2)]
full_bull = int(np.argmax(full_means))
full_bear = 1 - full_bull

gamma_full = dhmm_full.gamma_
current_bull_prob = gamma_full[-1, full_bull]
current_bear_prob = gamma_full[-1, full_bear]

print('=' * 60)
print('CURRENT REGIME ASSESSMENT')
print('=' * 60)
print(f'Date:              {df.index[-1].strftime("%Y-%m")}')
print(f'Latest return:     {df["return"].iloc[-1]*100:.2f}%')
print(f'T-Bill rate:       {df["tbill"].iloc[-1]:.2f}%')
print(f'10Y rate:          {df["gs10"].iloc[-1]:.2f}%')
print(f'Spread:            {df["spread"].iloc[-1]:.2f}%')
print(f'\nRegime probabilities:')
print(f'  P(Bull) = {current_bull_prob:.4f}')
print(f'  P(Bear) = {current_bear_prob:.4f}')
print(f'  => Current regime: {"BULL" if current_bull_prob > 0.5 else "BEAR"}')

# Expected durations given current rates
z_current = Z_full[-1]
ed_bull = dhmm_full.expected_duration(z_current, full_bull)
ed_bear = dhmm_full.expected_duration(z_current, full_bear)
print(f'\nExpected regime durations at current rates:')
print(f'  Bull: {ed_bull:.1f} months')
print(f'  Bear: {ed_bear:.1f} months')

In [None]:
# --- Full-sample regime timeline ---
full_mpm = np.argmax(gamma_full, axis=1)
full_bull_prob = gamma_full[:, full_bull]

fig, axes = plt.subplots(2, 1, figsize=(16, 7), sharex=True)

ax = axes[0]
ax.plot(df.index, df['return'] * 100, 'k-', lw=0.6)
is_bull_full = full_mpm == full_bull
ymin, ymax = df['return'].min() * 100, df['return'].max() * 100
ax.fill_between(df.index, ymin, ymax, where=is_bull_full, alpha=0.12, color='blue', label='Bull')
ax.fill_between(df.index, ymin, ymax, where=~is_bull_full, alpha=0.12, color='red', label='Bear')

for start, end, label in crises:
    try:
        s, e = pd.Timestamp(start), pd.Timestamp(end)
        ax.axvspan(s, e, alpha=0.08, color='grey')
        ax.annotate(label, xy=(s + (e-s)/2, ymax*0.85),
                   fontsize=7, ha='center', va='top', color='grey')
    except Exception:
        pass

ax.axhline(0, color='grey', lw=0.5)
ax.set_ylabel('Monthly Return (%)')
ax.set_title('Duration HMM — Full Sample Regime Reconstruction')
ax.legend(loc='upper left')

ax = axes[1]
ax.fill_between(df.index, 0, full_bull_prob, alpha=0.5, color='steelblue')
ax.axhline(0.5, color='r', ls='--', lw=0.8)
ax.set_ylabel('P(Bull)')
ax.set_title('Bull Market Posterior Probability (Full Sample)')
ax.set_ylim(-0.05, 1.05)

plt.tight_layout()
plt.show()

In [None]:
# --- Summary dashboard ---
print('=' * 70)
print('DURATION HMM — MODEL SUMMARY')
print('=' * 70)
print(f'Sample:          {df.index[0].strftime("%Y-%m")} to {df.index[-1].strftime("%Y-%m")} ({len(df)} months)')
print(f'States:          {dhmm.N} (bull/bear)')
print(f'Mixture comps:   {dhmm.M} per state')
print(f'Duration covars: intercept, T-bill, spread')
print(f'Max duration:    {dhmm.max_dur} months')
print(f'Log-likelihood:  {dhmm.log_lik_:.4f}')
print(f'\n--- Current Assessment ({df.index[-1].strftime("%Y-%m")}) ---')
print(f'Regime:          {"BULL" if current_bull_prob > 0.5 else "BEAR"} (P={max(current_bull_prob, current_bear_prob):.2%})')
print(f'Expected bull duration: {ed_bull:.0f} months at current rates')
print(f'Expected bear duration: {ed_bear:.0f} months at current rates')
print(f'\n--- Key Findings ---')
print(f'Bull: higher T-bill => shorter duration (kappa_2 = {dhmm_full.kappa[full_bull, 1]:.4f})')
print(f'Bull: higher spread => longer duration  (kappa_3 = {dhmm_full.kappa[full_bull, 2]:.4f})')
print(f'Bear: higher T-bill => longer duration  (kappa_2 = {dhmm_full.kappa[full_bear, 1]:.4f})')
print(f'Bear: higher spread => shorter duration (kappa_3 = {dhmm_full.kappa[full_bear, 2]:.4f})')