<a href="https://colab.research.google.com/github/jhenningsen/Equity_Analysis/blob/main/SPY_HMM_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install hmmlearn

Collecting hmmlearn
  Downloading hmmlearn-0.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Downloading hmmlearn-0.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (165 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m166.0/166.0 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: hmmlearn
Successfully installed hmmlearn-0.3.3


In [23]:
import numpy as np
import pandas as pd
import yfinance as yf
from hmmlearn import hmm
from sklearn.preprocessing import StandardScaler


## SPY Only


In [76]:
# --- Configuration ---
# Tickers
TICKERS = ['SPY']
# Date Range (Adjust as needed)
START_DATE = '2022-01-01'
END_DATE = '2025-10-25'
# HMM Parameters
N_COMPONENTS = 3 # Number of hidden states (regimes)

# 1. Download Data
# Download the data and select the 'Close' price.
data_series = yf.download(TICKERS, start=START_DATE, end=END_DATE)['Close']

# --- THE FIX ---
# Explicitly convert the resulting Series (or DataFrame) into a new DataFrame.
data = pd.DataFrame(data_series)

# Rename the column (which may be named 'SPY' or 'Close') to a consistent 'SPY'.
data.columns = ['SPY']

# 2. Feature Engineering
# SPY returns are the ONLY feature now
data['SPY_Returns'] = data['SPY'].pct_change() * 100

# Drop the first row which contains NaN from the pct_change calculation
data.dropna(inplace=True)

# 3. Prepare Observation Data (X)
# We use only SPY_Returns as the feature
X = data[['SPY_Returns']].values

print(f"Data shape (Samples, Features): {X.shape}")


  data_series = yf.download(TICKERS, start=START_DATE, end=END_DATE)['Close']
[*********************100%***********************]  1 of 1 completed

Data shape (Samples, Features): (956, 1)





In [72]:
# 4. Initialize and Train the HMM
# GaussianHMM is suitable for continuous data
model = hmm.GaussianHMM(
    n_components=N_COMPONENTS,
    # Must change to 'diag' or 'spherical' since you have only 1 feature.
    # 'full' expects a 2x2 or larger covariance matrix.
    covariance_type="diag",
    n_iter=100
)

# Fit the model to the observation data
print("\nTraining HMM...")
model.fit(X)
print("Training complete.")

# 5. Predict the Hidden States
# Predict the most likely sequence of hidden states (regimes)
hidden_states = model.predict(X)

# Add the states back to the original DataFrame for analysis
data['Regime'] = hidden_states


Training HMM...
Training complete.


### Manual Parameter Initialization

In [77]:
# --- Apply Scaling ---
# 3. Prepare and Scale Observation Data (X)
X = data[['SPY_Returns']].values # Shape (N, 1)

# Standard Scaling is essential for HMMs
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

print(f"Scaled Data shape (Samples, Features): {X_scaled.shape}")

# 4. Initialize and Train the HMM with Manual Parameters
model = hmm.GaussianHMM(
    n_components=N_COMPONENTS,
    covariance_type="diag",  # Correct for single feature
    n_iter=200,             # Increased iterations
    init_params=""          # CRITICAL: Disable random initialization
)

# --- Manual Parameter Initialization to Force Regime Separation ---

# Initialize Means (Shape: [3, 1])
# We are forcing the model to start with clear directional expectations:
model.means_ = np.array([
    [ 1.0],  # Regime 0 (Bull): High positive mean on scaled data
    [ 0.0],  # Regime 1 (Neutral): Mean near zero
    [-1.0]   # Regime 2 (Bear): Low negative mean
])

# Initialize Covariances (Shape: [3, 1])
# We are forcing the model to start with clear volatility expectations (variance = std_dev^2):
model.covars_ = np.array([
    [0.2],  # Regime 0 (Bull): LOW Volatility
    [0.5],  # Regime 1 (Neutral): MEDIUM Volatility
    [3.0]   # Regime 2 (Bear): HIGH Volatility
])

# Initialize Transition and Start Probabilities (Optional)
model.transmat_ = np.full((N_COMPONENTS, N_COMPONENTS), 1.0 / N_COMPONENTS)
model.startprob_ = np.full(N_COMPONENTS, 1.0 / N_COMPONENTS)


# Fit the model to the SCALED observation data
print("\nTraining HMM with initial parameters...")
model.fit(X_scaled)
print("Training complete.")

# 5. Predict the Hidden States
hidden_states = model.predict(X_scaled)
data['Regime'] = hidden_states



Scaled Data shape (Samples, Features): (956, 1)

Training HMM with initial parameters...
Training complete.


In [78]:
print("### Daily Regime vs. SPY Return Comparison (Last 30 Days) ###")
# We select the Date (Index), SPY_Returns, and the predicted Regime
daily_comparison = data[['SPY_Returns', 'Regime']].tail(30)
print(daily_comparison)

# Optional: To save this to a file for external analysis
# daily_comparison.to_csv('daily_hmm_regime_vs_spy_return.csv')

### Daily Regime vs. SPY Return Comparison (Last 30 Days) ###
            SPY_Returns  Regime
Date                           
2025-09-15     0.532388       0
2025-09-16    -0.137687       0
2025-09-17    -0.124236       0
2025-09-18     0.467245       0
2025-09-19     0.495284       0
2025-09-22     0.473108       0
2025-09-23    -0.544359       0
2025-09-24    -0.318157       0
2025-09-25    -0.461350       0
2025-09-26     0.572908       0
2025-09-29     0.281041       0
2025-09-30     0.376688       0
2025-10-01     0.340752       0
2025-10-02     0.115186       0
2025-10-03    -0.001487       0
2025-10-06     0.358626       0
2025-10-07    -0.370749       0
2025-10-08     0.596304       0
2025-10-09    -0.289702       0
2025-10-10    -2.702776       1
2025-10-13     1.534403       1
2025-10-14    -0.122164       1
2025-10-15     0.443955       1
2025-10-16    -0.681024       1
2025-10-17     0.567631       1
2025-10-20     1.040048       1
2025-10-21    -0.001491       1
2025-10-22

In [79]:
# --- Final Analysis ---
print("\n### 📊 Regime Analysis (Mean and Volatility of SPY Returns) ###")
regime_stats = data.groupby('Regime')['SPY_Returns'].agg(
    Count='size',
    Mean='mean',
    Std_Dev='std'
)
print(regime_stats.round(4))



### 📊 Regime Analysis (Mean and Volatility of SPY Returns) ###
        Count    Mean  Std_Dev
Regime                        
0         337  0.1709   0.5154
1         338  0.0392   0.9928
2         281 -0.0867   1.7231


## SPY and VIX


In [53]:
# --- Configuration ---
# Tickers
TICKERS = ['SPY', '^VIX']
# Date Range (Adjust as needed)
START_DATE = '2015-01-01'
END_DATE = '2025-10-01'
# HMM Parameters
N_COMPONENTS = 3 # Number of hidden states (regimes)

# 1. Download Data
data = yf.download(TICKERS, start=START_DATE, end=END_DATE)['Close']
data.columns = ['SPY', 'VIX']

# 2. Feature Engineering
# SPY returns are key for direction/magnitude of movement
data['SPY_Returns'] = data['SPY'].pct_change() * 100

# VIX change is a good measure of increasing/decreasing fear/volatility
data['VIX_Change'] = data['VIX'].pct_change() * 100

# Drop the first row which contains NaN from the pct_change calculation
data.dropna(inplace=True)

# 3. Prepare Observation Data (X)
# We combine the two features: SPY Returns and VIX Change
# The HMM input (X) must be a 2D numpy array
X = data[['SPY_Returns', 'VIX_Change']].values

print(f"Data shape (Samples, Features): {X.shape}")

  data = yf.download(TICKERS, start=START_DATE, end=END_DATE)['Close']
[                       0%                       ][*********************100%***********************]  2 of 2 completed

Data shape (Samples, Features): (2701, 2)





### Feature Scaling

In [54]:
# 3. Prepare Observation Data (X) - BEFORE SCALING
X = data[['SPY_Returns', 'VIX_Change']].values

# --- NEW STEP: Apply Scaling ---
scaler = StandardScaler()
# Fit the scaler to the data and transform it
X_scaled = scaler.fit_transform(X)
# The HMM will now be trained on X_scaled

In [37]:
# ... (Previous code for downloading data and creating 'data' DataFrame) ...

# 3. Prepare Observation Data (X) - BEFORE SCALING
X = data[['SPY_Returns', 'VIX_Change']].values

# --- NEW STEP: Apply Scaling ---
scaler = StandardScaler()
# Fit the scaler to the data and transform it
X_scaled = scaler.fit_transform(X)
# The HMM will now be trained on X_scaled

# 4. Initialize and Train the HMM using the SCALED data
model = hmm.GaussianHMM(
    n_components=N_COMPONENTS,
    covariance_type="full",
    n_iter=100,
    # Add n_init for better results (see section 2)
    #n_init=10
)

# Fit the model to the SCALED observation data
print("\nTraining HMM on scaled data...")
model.fit(X_scaled)
print("Training complete.")

# 5. Predict the Hidden States
hidden_states = model.predict(X_scaled) # Predict using the scaled data
data['Regime'] = hidden_states # Add states to the original data frame

# Now run your analysis again (Regime Analysis, Daily Comparison)
# The output should show multiple regimes.


Training HMM on scaled data...
Training complete.


In [48]:
def train_best_hmm(X_scaled, n_components, n_attempts=10, n_iter=100):
    """
    Trains the GaussianHMM multiple times and returns the model
    with the highest log-likelihood score.
    """
    import numpy as np
    from hmmlearn import hmm

    best_score = -np.inf
    best_model = None

    print(f"Attempting HMM fit {n_attempts} times...")
    for i in range(n_attempts):
        # Initialize a new model for each attempt
        current_model = hmm.GaussianHMM(
            n_components=n_components,
            covariance_type="full",
            n_iter=n_iter
        )

        try:
            # Fit the model to the scaled data
            current_model.fit(X_scaled)
            current_score = current_model.score(X_scaled)

            if current_score > best_score:
                best_score = current_score
                best_model = current_model
                # print(f"  Attempt {i+1}: New Best Score = {best_score:.2f}")

        except Exception as e:
            # Handle potential training failures (e.g., singular matrix)
            # print(f"  Attempt {i+1}: Failed with error: {e}")
            continue

    if best_model is None:
        raise RuntimeError("HMM training failed for all attempts. Check data for issues.")

    print(f"Training complete. Best score: {best_score:.2f}")
    return best_model

# --- REPLACE YOUR ORIGINAL TRAINING BLOCK WITH THIS ---
# X_scaled must be available from your previous scaling step
model = train_best_hmm(X_scaled, n_components=N_COMPONENTS, n_attempts=20, n_iter=200)

# 5. Predict the Hidden States using the best model
hidden_states = model.predict(X_scaled)
data['Regime'] = hidden_states
# Now run your regime analysis and daily comparison

Attempting HMM fit 20 times...
Training complete. Best score: -5154.93


In [50]:
### Assuming your 'data' DataFrame looks like this after training:
# data['SPY_Returns'] = data['SPY'].pct_change() * 100
# data['Regime'] = model.predict(X)

print("### Daily Regime vs. SPY Return Comparison (Last 30 Days) ###")
# We select the Date (Index), SPY_Returns, and the predicted Regime
daily_comparison = data[['SPY_Returns', 'VIX_Change', 'Regime']].tail(30)
print(daily_comparison)

# Optional: To save this to a file for external analysis
# daily_comparison.to_csv('daily_hmm_regime_vs_spy_return.csv')

### Daily Regime vs. SPY Return Comparison (Last 30 Days) ###
            SPY_Returns  VIX_Change  Regime
Date                                       
2025-08-19    -0.542514    3.869246       1
2025-08-20    -0.265706    0.770712       1
2025-08-21    -0.401184    5.799878       1
2025-08-22     1.535680  -14.337350       1
2025-08-25    -0.440109    4.008437       1
2025-08-26     0.418705   -1.149426       1
2025-08-27     0.227851    1.573191       1
2025-08-28     0.354138   -2.828283       1
2025-08-29    -0.596368    6.444902       1
2025-09-02    -0.741028   11.783857       1
2025-09-03     0.541956   -4.775770       1
2025-09-04     0.835739   -6.422019       1
2025-09-05    -0.289625   -0.784313       1
2025-09-08     0.245663   -0.461137       1
2025-09-09     0.231185   -0.463267       1
2025-09-10     0.289086    2.061173       1
2025-09-11     0.831023   -4.169383       1
2025-09-12    -0.033458    0.339906       1
2025-09-15     0.532388    6.300809       1
2025-09-16    

In [51]:
# 6. Analyze the Regimes
print("\n--- Regime Analysis (Mean of Features) ---")
regime_analysis = data.groupby('Regime')[['SPY_Returns', 'VIX_Change']].mean()
print(regime_analysis)

# 7. Directional Prediction Insight
# Identify the most "Bullish" and "Bearish" regimes
# Sort by SPY_Returns to find the order
sorted_regimes = regime_analysis.sort_values(by='SPY_Returns', ascending=False)

print("\n### 📊 Comprehensive Regime Analysis (Count, Mean, and Volatility) ###")

# 1. Group the data by the 'Regime' column
regime_groups = data.groupby('Regime')

# 2. Calculate Count, Mean, and Standard Deviation for SPY_Returns and VIX_Change
regime_stats = regime_groups[['SPY_Returns', 'VIX_Change']].agg(
    # Count of samples (days) in the regime
    Count=('SPY_Returns', 'size'),
    # Mean of SPY Returns for directional insight
    SPY_Returns_Mean=('SPY_Returns', 'mean'),
    # Standard Deviation of SPY Returns for volatility insight
    SPY_Returns_Std_Dev=('SPY_Returns', 'std'),
    # Mean of VIX Change (for confirming volatility behavior)
    VIX_Change_Mean=('VIX_Change', 'mean')
)

# 3. Print the resulting statistics
print(regime_stats.round(4))

print("\n--- Directional Insight ---")
print(f"The most **Bullish Regime** (highest positive SPY Return) is Regime **{sorted_regimes.index[0]}**")
print(f"The most **Bearish Regime** (highest negative SPY Return) is Regime **{sorted_regimes.index[-1]}**")

# 8. Making a Forward Prediction (The core step)
# To predict the next day's regime, you use the HMM's transition matrix and the current state.
# Let's get the state of the *last* day in the dataset
current_regime = data['Regime'].iloc[-1]
transition_matrix = model.transmat_

# Get the transition probabilities from the current regime to all others
next_state_probs = transition_matrix[current_regime]

print("\n--- Next Day Regime Probability ---")
print(f"Current Regime (Today's Close): **Regime {current_regime}**")
print("Probability of transitioning to each regime tomorrow:")
for i in range(N_COMPONENTS):
    # Lookup the mean SPY return for this predicted state
    mean_return = regime_analysis.loc[i, 'SPY_Returns']
    print(f"  - Regime {i}: {next_state_probs[i]:.2f} (Avg SPY Return: {mean_return:.2f}%)")

# The regime with the highest probability is your prediction
predicted_next_regime = np.argmax(next_state_probs)
predicted_return = regime_analysis.loc[predicted_next_regime, 'SPY_Returns']

print(f"\n**Prediction:** The most likely regime for tomorrow is **Regime {predicted_next_regime}** with an expected average SPY return of **{predicted_return:.2f}%**.")


--- Regime Analysis (Mean of Features) ---
        SPY_Returns  VIX_Change
Regime                         
0          0.065975   -0.464485
1          0.110717   -0.114986
2         -1.047688   14.619184

### 📊 Comprehensive Regime Analysis (Count, Mean, and Volatility) ###
        Count  SPY_Returns_Mean  SPY_Returns_Std_Dev  VIX_Change_Mean
Regime                                                               
0         697            0.0660               1.3927          -0.4645
1        1905            0.1107               0.6316          -0.1150
2          99           -1.0477               3.4944          14.6192

--- Directional Insight ---
The most **Bullish Regime** (highest positive SPY Return) is Regime **1**
The most **Bearish Regime** (highest negative SPY Return) is Regime **2**

--- Next Day Regime Probability ---
Current Regime (Today's Close): **Regime 1**
Probability of transitioning to each regime tomorrow:
  - Regime 0: 0.00 (Avg SPY Return: 0.07%)
  - Regime 1: 0.97 

---
## Initialize the Variance Manually
---


In [55]:
def train_hmm_with_initial_volatility(X_scaled, n_components, stable_volatility=0.2, crash_volatility=2.0):
    """Initializes HMM with distinct volatility parameters to force separation."""
    from hmmlearn import hmm
    import numpy as np

    # Initialize the model, but tell it NOT to randomize parameters (init_params="")
    model = hmm.GaussianHMM(
        n_components=n_components,
        covariance_type="full",
        n_iter=200,
        init_params="" # Important: Prevents default random initialization
    )

    # 1. Initialize Covariance Matrix (Variance)
    # This matrix is [n_components, n_features, n_features]
    # We assume Regime 0 (Bull) is low vol, Regime 2 (Bear) is high vol.
    covars_init = np.zeros((n_components, X_scaled.shape[1], X_scaled.shape[1]))

    # Set initial low variance for the 'stable' regimes (e.g., Regime 0 and 1)
    # We use stable_volatility for the diagonal of the covariance matrix
    # (since the data is scaled, 0.2 and 2.0 are relative values)
    stable_cov_val = stable_volatility ** 2
    for i in range(n_components - 1): # Apply to regimes 0 and 1
        np.fill_diagonal(covars_init[i], stable_cov_val)

    # Set initial high variance for the 'crisis' regime (e.g., Regime 2)
    crash_cov_val = crash_volatility ** 2
    np.fill_diagonal(covars_init[n_components - 1], crash_cov_val)

    model.covars_ = covars_init

    # 2. Initialize Transition and Start Probabilities (optional, but good practice)
    # You can leave these random, but explicitly setting them is safer.
    model.transmat_ = np.full((n_components, n_components), 1.0 / n_components)
    model.startprob_ = np.full(n_components, 1.0 / n_components)

    # 3. Fit the model
    # Note: Mean (means_) is still randomly initialized and learned in the fit process
    model.fit(X_scaled)

    return model

# --- RERUN YOUR TRAINING WITH THE NEW FUNCTION ---
# You can try a new set of volatile starting values if the default 0.2/2.0 fails
model = train_hmm_with_initial_volatility(
    X_scaled,
    n_components=N_COMPONENTS,
    stable_volatility=0.2,
    crash_volatility=2.0
)

# Predict and check results...
hidden_states = model.predict(X_scaled)
data['Regime'] = hidden_states

In [56]:
### Assuming your 'data' DataFrame looks like this after training:
# data['SPY_Returns'] = data['SPY'].pct_change() * 100
# data['Regime'] = model.predict(X)

print("### Daily Regime vs. SPY Return Comparison (Last 30 Days) ###")
# We select the Date (Index), SPY_Returns, and the predicted Regime
daily_comparison = data[['SPY_Returns', 'VIX_Change', 'Regime']].tail(30)
print(daily_comparison)

# Optional: To save this to a file for external analysis
# daily_comparison.to_csv('daily_hmm_regime_vs_spy_return.csv')

### Daily Regime vs. SPY Return Comparison (Last 30 Days) ###
            SPY_Returns  VIX_Change  Regime
Date                                       
2025-08-19    -0.542514    3.869246       0
2025-08-20    -0.265706    0.770712       0
2025-08-21    -0.401184    5.799878       0
2025-08-22     1.535680  -14.337350       0
2025-08-25    -0.440109    4.008437       0
2025-08-26     0.418705   -1.149426       0
2025-08-27     0.227851    1.573191       0
2025-08-28     0.354138   -2.828283       0
2025-08-29    -0.596368    6.444902       0
2025-09-02    -0.741028   11.783857       0
2025-09-03     0.541956   -4.775770       0
2025-09-04     0.835739   -6.422019       0
2025-09-05    -0.289625   -0.784313       0
2025-09-08     0.245663   -0.461137       0
2025-09-09     0.231185   -0.463267       0
2025-09-10     0.289086    2.061173       0
2025-09-11     0.831023   -4.169383       0
2025-09-12    -0.033458    0.339906       0
2025-09-15     0.532388    6.300809       0
2025-09-16    