In [18]:
import numpy as np
from sklearn.cluster import KMeans
import pandas as pd
!pip install hmmlearn
from hmmlearn.hmm import GaussianHMM
import plotly.graph_objects as go
from plotly.graph_objs.scatter.marker import Line
from plotly.subplots import make_subplots
import plotly.express as px
from sklearn.cluster import AgglomerativeClustering
from sklearn.mixture import GaussianMixture
import warnings
import math

warnings.filterwarnings('ignore')



In [19]:
def prepare_data_for_model_input(prices, ma):
    '''
        Input:
        prices (df) - Dataframe of close prices
        ma (int) - legth of the moveing average

        Output:
        prices(df) - An enhanced prices dataframe, with moving averages and log return columns
        prices_array(nd.array) - an array of log returns
    '''

    intrument = prices.columns.name
    prices[f'{intrument}_ma'] = prices.rolling(ma).mean()
    prices[f'{intrument}_log_return'] = np.log(prices[f'{intrument}_ma']/prices[f'{intrument}_ma'].shift(1)).dropna()

    prices.dropna(inplace = True)
    prices_array = np.array([[q] for q in prices[f'{intrument}_log_return'].values])

    return prices, prices_array

In [20]:
import yfinance as yf
import numpy as np
import pandas as pd

# Fetch historical data for S&P 500 E-mini futures
data = yf.download('ES=F', start='2000-01-01', end='2024-03-30')

# Assuming 'Close' prices are what you're interested in
prices = data[['Close']]
prices.columns = ['TRDPRC_1']  # Renaming to match your original data structure

# Now you can call your existing function with this data
prices, prices_array = prepare_data_for_model_input(prices, 7)
print(prices)

[*********************100%%**********************]  1 of 1 completed

            TRDPRC_1      None_ma  None_log_return
Date                                              
2000-09-27   1446.75  1462.392857        -0.002025
2000-09-28   1476.00  1462.035714        -0.000244
2000-09-29   1454.00  1459.821429        -0.001516
2000-10-02   1456.25  1457.928571        -0.001297
2000-10-03   1441.50  1454.071429        -0.002649
...              ...          ...              ...
2024-03-22   5293.25  5227.774275         0.003429
2024-03-25   5278.25  5245.559989         0.003396
2024-03-26   5265.25  5268.928571         0.004445
2024-03-27   5308.25  5282.285714         0.002532
2024-03-28   5308.50  5291.821429         0.001804

[5936 rows x 3 columns]





In [21]:
class RegimeDetection:

    def get_regimes_hmm(self, input_data, params):
        hmm_model = self.initialise_model(GaussianHMM(), params).fit(input_data)
        return hmm_model

    def initialise_model(self, model, params):
        for parameter, value in params.items():
            setattr(model, parameter, value)
        return model


In [22]:
import plotly.graph_objects as go
import numpy as np

def plot_hidden_states(hidden_states, prices_df, column_name):
    '''
    Plot the hidden states on top of the price data using Plotly.

    Parameters:
    hidden_states (numpy.ndarray): The array of predicted hidden states.
    prices_df (pd.DataFrame): The dataframe containing the price data.
    column_name (str): The name of the column in prices_df to plot.
    '''
    colors = ['blue', 'green']  # You can extend this list if you have more than two states.
    n_components = len(np.unique(hidden_states))
    fig = go.Figure()

    for i in range(n_components):
        mask = hidden_states == i
        print('Number of observations for State ', i, ":", sum(mask))

        fig.add_trace(go.Scatter(x=prices_df.index[mask], y=prices_df[column_name][mask],
                                 mode='markers', name='State ' + str(i),
                                 marker=dict(size=4, color=colors[i])))

    fig.update_layout(title='Hidden States Visualization',title_x=0.5,
                      xaxis_title='Date',
                      yaxis_title='Price',
                      height=400, width=900,
                      legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01),
                      margin=dict(l=20, r=20, t=40, b=20))
    fig.show()


In [23]:
regime_detection = RegimeDetection()

In [24]:
params = {'n_components':2, 'covariance_type':"full", 'random_state':100}

hmm_model = regime_detection.get_regimes_hmm(prices_array, params)
hmm_states = hmm_model.predict(prices_array)
plot_hidden_states(np.array(hmm_states), prices, 'TRDPRC_1')

Number of observations for State  0 : 767
Number of observations for State  1 : 5169


In [25]:
import numpy as np
import math

def feed_forward_training(model, params, prices, split_index, retrain_step):
    '''
    Train and predict model states in a feed-forward manner using arrays.

    Parameters:
    model (callable): Model function to train (e.g., GMM or HMM).
    params (dict): Parameters for the model.
    prices (numpy.ndarray): Array containing the processed data for model training and prediction.
    split_index (int): Index to split initial training dataset and out-of-sample testing set.
    retrain_step (int): Number of observations after which the model is retrained.

    Returns:
    numpy.ndarray: Array of predicted states.
    '''
    # Handle input as a numpy array directly
    init_train_data = prices[:split_index]
    test_data = prices[split_index:]

    # Initialize the model with the initial training data
    rd_model = model(init_train_data, params)

    states_pred = []
    for i in range(len(test_data)):
        current_index = split_index + i
        # Predict using the current slice of data
        preds = rd_model.predict(prices[:current_index + 1]).tolist()
        states_pred.append(preds[-1])

        # Retrain the model at specified intervals, excluding the first iteration
        if i % retrain_step == 0 and i != 0:
            rd_model = model(prices[:current_index + 1], params)

    return np.array(states_pred)


In [26]:
# Ensure `split_index` calculation is based on the DataFrame's index
split_index = np.where(prices.index > '2018-01-01')[0][0]

# Your model function and parameters
model_hmm = regime_detection.get_regimes_hmm
params = {'n_components': 2, 'covariance_type': 'full', 'random_state': 100}

# Training and prediction
states_pred_hmm = feed_forward_training(model_hmm, params, prices_array, split_index, 20)

# Adjust the plot_hidden_states function call
# Note: You might need to adjust prices DataFrame to match the dates in states_pred_hmm
plot_hidden_states(states_pred_hmm, prices.iloc[split_index:], 'TRDPRC_1')

Number of observations for State  0 : 181
Number of observations for State  1 : 1390


In [27]:
current_state = hmm_model.predict(prices_array)[-1]
transition_matrix = hmm_model.transmat_

# Probabilities of transitioning to any state from the current state
next_state_probabilities = transition_matrix[current_state]

print("Current State:", current_state)
print("Next State Probabilities:", next_state_probabilities)

#the most likely state for the next period
most_likely_next_state = np.argmax(next_state_probabilities)
print("Most Likely Next State:", most_likely_next_state)

Current State: 1
Next State Probabilities: [0.01400197 0.98599803]
Most Likely Next State: 1


In [28]:
probability_matrix = hmm_model.transmat_
print("Transition Probability Matrix:")
print(probability_matrix)

Transition Probability Matrix:
[[0.91579396 0.08420604]
 [0.01400197 0.98599803]]


In [29]:
import numpy as np

def predict_future_states(model, initial_state, days=30):
    """
    Predict future states using a Hidden Markov Model over a specified number of days.

    Parameters:
    model (GaussianHMM): The trained HMM model.
    initial_state (int): The most recent known state.
    days (int): Number of future days to predict states for.

    Returns:
    list: A list of predicted states for the next 'days' days.
    """
    current_state = initial_state
    future_states = []

    for _ in range(days):
        state_probabilities = model.transmat_[current_state]
        next_state = np.argmax(model.transmat_[current_state])
        future_states.append(next_state)
        current_state = next_state

    return future_states

days_to_predict = 30
current_state = hmm_model.predict(prices_array)[-1]
future_predicted_states = predict_future_states(hmm_model, current_state, days=days_to_predict)

print("Future Predicted States for the Next 30 Days:", future_predicted_states)

Future Predicted States for the Next 30 Days: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


In [30]:
state_labels = ['State 0', 'State 1']
for i in range(len(probability_matrix)):
    for j in range(len(probability_matrix[i])):
        transition_probability = probability_matrix[i, j]
        print(f"Transition probability from {state_labels[i]} to {state_labels[j]}: {transition_probability}")

Transition probability from State 0 to State 0: 0.915793955485658
Transition probability from State 0 to State 1: 0.08420604451434197
Transition probability from State 1 to State 0: 0.014001971127488366
Transition probability from State 1 to State 1: 0.9859980288725115


In [31]:
# Print emission parameters
means = hmm_model.means_
covars = hmm_model.covars_
state_labels = ['State 1', 'State 0']

for i in range(len(means)):
    mean_value = means[i][0]
    covar_value = covars[i][0][0]
    print(f"Parameters for Log Return in {state_labels[i]}: Mean={mean_value}, Covariance={covar_value}")

Parameters for Log Return in State 1: Mean=-0.0036052757524554385, Covariance=7.061877202353521e-05
Parameters for Log Return in State 0: Mean=0.0008516066600415242, Covariance=9.176323139823505e-06


In [32]:
emission_probs = hmm_model.predict_proba(prices_array)

for i in range(emission_probs.shape[1]):
    print(f"Emission probabilities for State {i}: {emission_probs[:, i].mean()}")

Emission probabilities for State 0: 0.14612160569380664
Emission probabilities for State 1: 0.853878394306177
