In [5]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Set the device to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [None]:
# Define the Markov Switching Model using PyTorch
class MarkovSwitchingModel(nn.Module):
    def __init__(self, num_states=2):
        super(MarkovSwitchingModel, self).__init__()
        self.num_states = num_states
        # Unconstrained means
        self.mu = nn.Parameter(torch.zeros(num_states, device=device))
        # Unconstrained log variances
        self.log_sigma2 = nn.Parameter(torch.zeros(num_states, device=device))
        # Unconstrained transition logits
        self.transition_logits = nn.Parameter(torch.zeros(num_states, num_states, device=device))
        # Initial state logits
        self.initial_logits = nn.Parameter(torch.zeros(num_states, device=device))
    
    def forward(self, y):
        T = y.shape[0]
        mu = self.mu  # Shape: [num_states]
        sigma2 = torch.exp(self.log_sigma2)  # Ensure variances are positive
        
        # Compute log emission probabilities
        emission_log_probs = -0.5 * (torch.log(2 * torch.pi * sigma2[:, None]) + ((y - mu[:, None]) ** 2) / sigma2[:, None])
        
        # Compute log transition probabilities
        log_transition_probs = F.log_softmax(self.transition_logits, dim=1)  # Shape: [num_states, num_states]
        # Initial state log probabilities
        log_initial_probs = F.log_softmax(self.initial_logits, dim=0)  # Shape: [num_states]
        
        # Forward algorithm in log-space
        log_alpha = log_initial_probs + emission_log_probs[:, 0]  # Shape: [num_states]
        for t in range(1, T):
            log_alpha = emission_log_probs[:, t] + torch.logsumexp(log_alpha[:, None] + log_transition_probs, dim=0)
        # Compute log-likelihood
        log_likelihood = torch.logsumexp(log_alpha, dim=0)
        return -log_likelihood  # Negative log-likelihood

In [None]:
# Function to compute smoothed state probabilities
def compute_forward_backward(model, y):
    T = y.shape[0]
    num_states = model.num_states
    mu = model.mu
    sigma2 = torch.exp(model.log_sigma2)
    
    emission_log_probs = -0.5 * (torch.log(2 * torch.pi * sigma2[:, None]) + ((y - mu[:, None]) ** 2) / sigma2[:, None])
    log_transition_probs = F.log_softmax(model.transition_logits, dim=1)
    log_initial_probs = F.log_softmax(model.initial_logits, dim=0)
    
    # Forward pass
    log_alpha = log_initial_probs + emission_log_probs[:, 0]
    log_alpha_list = [log_alpha]
    for t in range(1, T):
        log_alpha = emission_log_probs[:, t] + torch.logsumexp(log_alpha[:, None] + log_transition_probs, dim=0)
        log_alpha_list.append(log_alpha)
    log_alpha_tensor = torch.stack(log_alpha_list, dim=1)
    
    # Backward pass
    log_beta = torch.zeros(num_states, device=device)
    log_beta_list = [log_beta]
    for t in reversed(range(T - 1)):
        log_beta = torch.logsumexp(log_transition_probs + emission_log_probs[:, t + 1] + log_beta[:, None], dim=0)
        log_beta_list.insert(0, log_beta)
    log_beta_tensor = torch.stack(log_beta_list, dim=1)
    
    # Compute smoothed probabilities
    log_gamma = log_alpha_tensor + log_beta_tensor
    log_gamma -= torch.logsumexp(log_gamma, dim=0, keepdim=True)
    gamma = torch.exp(log_gamma)
    return gamma

# Function to fit the model to the data
def fit_model(y, num_states=2, num_epochs=100, lr=0.01):
    model = MarkovSwitchingModel(num_states=num_states).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    y_tensor = torch.tensor(y, dtype=torch.float32, device=device)
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        nll = model(y_tensor)
        nll.backward()
        optimizer.step()
    return model

In [None]:
# Function to process each date
def process_date(args):
    i, date = args
    # Expand the training data to include up to the current date
    recursive_train = data.loc[data.index <= date, 'Index_Returns'].values
    y = torch.tensor(recursive_train, dtype=torch.float32, device=device)
    
    # Fit the Markov Switching Model on the expanded training data
    try:
        model = fit_model(y, num_states=2, num_epochs=100, lr=0.01)
    except Exception as e:
        print(f"Model failed to converge at date {date}: {e}")
        return None  # Skip this date if the model fails to fit
    
    # Compute smoothed probabilities
    gamma = compute_forward_backward(model, y)  # Shape: [num_states, T]
    # Get last known state probabilities
    last_probs = gamma[:, -1].cpu().detach().numpy()
    
    # Extract transition probabilities from the model parameters
    with torch.no_grad():
        transition_probs = F.softmax(model.transition_logits, dim=1).cpu().numpy()
        sigma2 = torch.exp(model.log_sigma2).cpu().numpy()
    
    # Update state probabilities to predict the next day's regime
    state_probs = last_probs @ transition_probs  # Shape: [num_states]
    
    # Determine the most likely regime at t+1
    regime_labels = list(range(model.num_states))  # Should be [0, 1]
    most_likely_regime = regime_labels[np.argmax(state_probs)]
    
    # Get the next date for prediction
    if i + 1 < len(test.index):
        next_date = test.index[i + 1]
    else:
        # Predicting beyond the available data; estimate next date
        next_date = date + pd.Timedelta(days=1)
        # Add next_date to the DataFrame if it doesn't exist
        if next_date not in data.index:
            data.loc[next_date] = np.nan  # Initialize with NaNs
    
    # Determine which regime corresponds to low and high volatility
    if sigma2[0] < sigma2[1]:
        regime_mapping = {0: 'Low Volatility', 1: 'High Volatility'}
    else:
        regime_mapping = {0: 'High Volatility', 1: 'Low Volatility'}
    
    # Map the predicted regime to labels
    predicted_label = regime_mapping[most_likely_regime]
    
    # Return the results
    return next_date, most_likely_regime, predicted_label


In [None]:
# Prepare the arguments for processing
test_dates = test.index
args_list = [(i, date) for i, date in enumerate(test_dates)]

In [None]:
# Process the dates and collect results
# ****THIS TAKES FOREVER TO RUN ***
results = []
for args in args_list:
    res = process_date(args)
    if res is not None:
        results.append(res)

# Update the data DataFrame with the results
for next_date, most_likely_regime, predicted_label in results:
    data.at[next_date, 'Recursive_Predictions'] = most_likely_regime
    data.at[next_date, 'Recursive_Predicted_Regime_Label'] = predicted_label

# Display the recursive predictions
print(data.loc[test.index, ['Index_Returns', 'Recursive_Predictions', 'Recursive_Predicted_Regime_Label']].head())