#### Hidden Markov Model

### imports

In [579]:
import pandas as pd
import numpy as np

from hmmlearn.hmm import GaussianHMM
from pandas_datareader.data import DataReader
import finpy_tse as fpy
import talib as ta
import hvplot.pandas

### Data

In [580]:
# Data Extraction

data = DataReader(name="AAPL", data_source="yahoo", start="2016-01-01", end="2022-11-01").rename(
    columns=str.lower).rename(columns={"adj close": "adj_final"})
data = data[["open", "high", "low", "adj_final", "volume"]]

In [581]:
def get_price(ticker: str, start_date, end_date):
    price_data = fpy.Get_Price_History(
        stock=ticker,
        start_date=start_date,
        end_date=end_date,
        ignore_date=False,
        adjust_price=True,
        show_weekday=False,
        double_date=True).rename(columns=str.lower).rename(columns={"adj final": "adj_final"}).reset_index()
    price_data = price_data[["date", "open", "high", "low", "adj_final"]]
    return price_data

In [582]:
# Add Returns and Range

#df = get_price(ticker="زاگرس", start_date="1399-01-01",end_date="1401-09-30")
df = data.copy()
#df.set_index("date",inplace=True)
df["Returns"] = np.log(df["adj_final"]).diff()
df["Range"] = df.high.div(df.low) - 1

In [583]:
# Add Moving Average
short_ema = 13
long_ema = 21
df[f"EMA_{short_ema}"] = ta.EMA(df["adj_final"], short_ema)
df[f"EMA_{long_ema}"] = ta.EMA(df["adj_final"], long_ema)
df.dropna(inplace=True)

In [584]:
 # Structure data
train_pct = 0.6
X_train = df[["Returns", "Range"]].iloc[:int(len(df) * train_pct)]
X_test = df[["Returns", "Range"]].iloc[int(len(df) * train_pct):]

  ### HMM Learning

In [585]:
# Train Model
hmm_model = GaussianHMM(n_components=4, covariance_type="full", n_iter=100).fit(X_train)
print("Model Score:", hmm_model.score(X_train))

Model Score: 6644.646206016538


In [586]:
# Check results
hidden_states = hmm_model.predict(X_train)
pd.DataFrame(hidden_states).value_counts()

0    771
2    180
1     49
3     20
dtype: int64

In [587]:
hmm_model.means_

array([[ 0.0030496 ,  0.0127434 ],
       [ 0.02293334,  0.02496161],
       [-0.00780141,  0.02457044],
       [-0.02801413,  0.04332938]])

In [588]:
# Make Prediction on Test Date
df_main = df.iloc[int(len(df) * train_pct):].copy()

hmm_results = hmm_model.predict(X_test)
df_main["HMM"] = hmm_results

In [589]:
pd.DataFrame(hmm_results).value_counts()

2    289
0    260
1     90
3     42
dtype: int64

In [590]:
df_main.groupby(by="HMM")["Returns"].mean()

HMM
0    0.004679
1    0.034553
2   -0.008025
3   -0.032344
Name: Returns, dtype: float64

### Run Backtest

In [591]:
# Add EMA Signals
df_main["EMA_signal"] = df.apply(lambda x: 1 if x[f"EMA_{short_ema}"] > x[f"EMA_{long_ema}"] else 0, axis=1)

In [592]:
# Add HMM Signals
favourable_states_hmm = list(label if returns > -0.002 else np.nan for label, returns in enumerate(hmm_model.means_[:, 0]))
df_main["HMM_signal"] = df_main["HMM"].apply(lambda x: 1 if x in favourable_states_hmm else 0)

In [593]:
# Add Combined Signal
df_main["main_signal"] = df_main["EMA_signal"].mul(df_main["HMM_signal"])
df_main["main_signal"] = df_main["main_signal"].shift(1)

In [594]:
# Benchmark Return
df_main["bench_prod_exp"] = np.exp(df_main.Returns.cumsum()) - 1

In [595]:
# Strategy Return
df_main["strat_prod_exp"] = np.exp(df_main.Returns.mul(df_main.main_signal).cumsum()) - 1
df_main["EMA_prod_exp"] = np.exp(df_main.Returns.mul(df_main.EMA_signal.shift(1)).cumsum()) - 1
df_main["HMM_prod_exp"] = np.exp(df_main.Returns.mul(df_main.HMM_signal.shift(1)).cumsum()) - 1


In [596]:
df_main[["bench_prod_exp", "strat_prod_exp", "EMA_prod_exp", "HMM_prod_exp"]].hvplot.line(value_label='Return', legend='top', height=620, width=1200)

In [597]:
df_main

Unnamed: 0_level_0,open,high,low,adj_final,volume,Returns,Range,EMA_13,EMA_21,HMM,EMA_signal,HMM_signal,main_signal,bench_prod_exp,strat_prod_exp,EMA_prod_exp,HMM_prod_exp
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
2020-02-21,79.654999,80.112503,77.625000,76.853973,129554000.0,-0.022895,0.032045,78.479039,77.965611,2,1,0,,-0.022635,,,
2020-02-24,74.315002,76.044998,72.307503,73.203369,222195200.0,-0.048666,0.051689,77.725372,77.532680,3,1,0,0.0,-0.069060,0.000000,-0.047501,0.000000
2020-02-25,75.237503,75.632500,71.532501,70.723839,230673600.0,-0.034459,0.057317,76.725153,76.913694,3,0,0,0.0,-0.100593,0.000000,-0.079763,0.000000
2020-02-26,71.632500,74.470001,71.625000,71.845764,198054800.0,0.015739,0.039721,76.028097,76.452973,2,0,0,0.0,-0.086325,0.000000,-0.079763,0.000000
2020-02-27,70.275002,71.500000,68.239998,67.149330,320605600.0,-0.067603,0.047773,74.759702,75.607188,3,0,0,0.0,-0.146051,0.000000,-0.079763,0.000000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2022-10-26,150.960007,151.990005,148.039993,149.102661,88194300.0,-0.019822,0.026682,145.962382,146.217507,2,0,0,0.0,0.896164,0.526651,0.780512,0.390749
2022-10-27,148.070007,149.050003,144.130005,144.560196,109180200.0,-0.030939,0.034136,145.762070,146.066843,2,0,0,0.0,0.838396,0.526651,0.780512,0.390749
2022-10-28,148.199997,157.500000,147.820007,155.482086,164762400.0,0.072835,0.065485,147.150644,146.922774,1,1,1,0.0,0.977292,0.526651,0.780512,0.390749
2022-10-31,153.160004,154.240005,151.919998,153.086044,97943200.0,-0.015530,0.015271,147.998558,147.483071,2,1,0,1.0,0.946821,0.503124,0.753074,0.369317


In [598]:
 # Structure prices for chart plotting
i = 0
labels_0 = []
labels_1 = []
labels_2 = []
labels_3 = []
labels_dict = {"labels_0": [],
               "labels_1": [],
               "labels_2": [],
               "labels_3": [],
               }
prices = df["adj_final"].iloc[:int(len(df) * 0.8)].values.astype(float)
#print("Correct Number of rows: ", len(prices) == len(hidden_states))
for s in hidden_states:
    if s == 0:
        labels_dict["labels_0"].append(prices[i])
        labels_dict["labels_1"].append(float('nan'))
        labels_dict["labels_2"].append(float('nan'))
        labels_dict["labels_3"].append(float('nan'))
    if s == 1:
        labels_dict["labels_0"].append(float('nan'))
        labels_dict["labels_1"].append(prices[i])
        labels_dict["labels_2"].append(float('nan'))
        labels_dict["labels_3"].append(float('nan'))
    if s == 2:
        labels_dict["labels_0"].append(float('nan'))
        labels_dict["labels_1"].append(float('nan'))
        labels_dict["labels_2"].append(prices[i])
        labels_dict["labels_3"].append(float('nan'))
    if s == 3:
        labels_dict["labels_0"].append(float('nan'))
        labels_dict["labels_1"].append(float('nan'))
        labels_dict["labels_2"].append(float('nan'))
        labels_dict["labels_3"].append(prices[i])

    i += 1

In [599]:
df_labels = pd.DataFrame(labels_dict)

In [600]:
df_labels.hvplot.step(value_label='Price', legend='top', height=620, width=1200, logy=True)