#### Hidden Markov Model

In [144]:
import pandas as pd
import numpy as np

from hmmlearn.hmm import GaussianHMM
from pandas_datareader.data import DataReader
import finpy_tse as fpy
import talib as ta
import hvplot.pandas

# library for save and load scikit-learn models
import pickle

## Data

In [122]:
def get_price(ticker: str, start_date, end_date):
    price_data = fpy.Get_Price_History(
        stock=ticker,
        start_date=start_date,
        end_date=end_date,
        ignore_date=False,
        adjust_price=True,
        show_weekday=False,
        double_date=True).rename(columns=str.lower).rename(columns={"adj final": "adj_final"}).reset_index()
    price_data = price_data[["date", "open", "high", "low", "adj_final"]]
    return price_data

In [123]:
# Add Returns and Range

df = get_price(ticker="فملی", start_date="1392-01-01",end_date="1402-01-16")
#df = data.copy()
df.set_index("date",inplace=True)
df["Returns"] = np.log(df["adj_final"]).diff()
df["Range"] = df.high.div(df.low) - 1

In [124]:
# Add Moving Average
short_ema = 13
long_ema = 21
df[f"EMA_{short_ema}"] = ta.EMA(df["adj_final"], short_ema)
df[f"EMA_{long_ema}"] = ta.EMA(df["adj_final"], long_ema)
df.dropna(inplace=True)

In [125]:
 # Structure data
train_pct = 0.6
X_train = df[["Returns", "Range"]].iloc[:int(len(df) * train_pct)]
X_test = df[["Returns", "Range"]].iloc[int(len(df) * train_pct):]

### HMM Learning

In [126]:
# Train Model
hmm_model = GaussianHMM(n_components=4, covariance_type="full", n_iter=100, random_state=100).fit(X_train)
print("Model Score:", hmm_model.score(X_train))



Model Score: 7423.570703414704


In [147]:
# file name, I'm using *.pickle as a file extension
filename = "HMM.pickle"

# save model
pickle.dump(hmm_model, open(filename, "wb"))

# load model
loaded_model = pickle.load(open(filename, "rb"))

# you can use loaded model to compute predictions
hidden_states_ = loaded_model.predict(X_train)
pd.DataFrame(hidden_states_).value_counts()

2    746
1    482
3    110
0      3
Name: count, dtype: int64

In [127]:
# Check results
hidden_states = hmm_model.predict(X_train)
pd.DataFrame(hidden_states).value_counts()

2    746
1    482
3    110
0      3
Name: count, dtype: int64

In [128]:
hmm_model.means_

array([[ 0.04220359,  0.04515633],
       [ 0.00617699,  0.03889976],
       [-0.00207115,  0.02288311],
       [ 0.00024153,  0.0413828 ]])

In [129]:
# Make Prediction on Test Date
df_main = df.iloc[int(len(df) * train_pct):].copy()

hmm_results = hmm_model.predict(X_test)
df_main["HMM"] = hmm_results

In [130]:
pd.DataFrame(hmm_results).value_counts()

1    474
2    356
3     58
0      6
Name: count, dtype: int64

In [131]:
df_main.groupby(by="HMM")["Returns"].mean()

HMM
0    0.057940
1    0.006827
2   -0.002428
3    0.003099
Name: Returns, dtype: float64

### Run Backtest

In [132]:
# Add EMA Signals
df_main["EMA_signal"] = df.apply(lambda x: 1 if x[f"EMA_{short_ema}"] > x[f"EMA_{long_ema}"] else 0, axis=1)

In [133]:
# Add HMM Signals
favourable_states_hmm = list(
    label if returns > 0 else np.nan for label, returns in enumerate(hmm_model.means_[:, 0]))
df_main["HMM_signal"] = df_main["HMM"].apply(lambda x: 1 if x in favourable_states_hmm else 0)

In [134]:
# Add Combined Signal
df_main["main_signal"] = df_main["EMA_signal"].mul(df_main["HMM_signal"])
df_main["main_signal"] = df_main["main_signal"].shift(1)

In [135]:
# Benchmark Return
df_main["bench_prod_exp"] = np.exp(df_main.Returns.cumsum()) - 1

In [136]:
# Strategy Return
df_main["strat_prod_exp"] = np.exp(df_main.Returns.mul(df_main.main_signal).cumsum()) - 1
df_main["EMA_prod_exp"] = np.exp(df_main.Returns.mul(df_main.EMA_signal.shift(1)).cumsum()) - 1
df_main["HMM_prod_exp"] = np.exp(df_main.Returns.mul(df_main.HMM_signal.shift(1)).cumsum()) - 1

In [137]:
df_main[["bench_prod_exp", "strat_prod_exp", "EMA_prod_exp", "HMM_prod_exp"]].hvplot.line(value_label='Return',
                                                                                          legend='top', height=520,
                                                                                          width=1000)

In [138]:
 # Structure prices for chart plotting
i = 0
labels_dict = {"labels_0": [],
               "labels_1": [],
               "labels_2": [],
               "labels_3": [],
               }
prices = df["adj_final"].iloc[:int(len(df) * 0.8)].values.astype(float)
#print("Correct Number of rows: ", len(prices) == len(hidden_states))
for s in hidden_states:
    if s == 0:
        labels_dict["labels_0"].append(prices[i])
        labels_dict["labels_1"].append(float('nan'))
        labels_dict["labels_2"].append(float('nan'))
        labels_dict["labels_3"].append(float('nan'))
    if s == 1:
        labels_dict["labels_0"].append(float('nan'))
        labels_dict["labels_1"].append(prices[i])
        labels_dict["labels_2"].append(float('nan'))
        labels_dict["labels_3"].append(float('nan'))
    if s == 2:
        labels_dict["labels_0"].append(float('nan'))
        labels_dict["labels_1"].append(float('nan'))
        labels_dict["labels_2"].append(prices[i])
        labels_dict["labels_3"].append(float('nan'))
    if s == 3:
        labels_dict["labels_0"].append(float('nan'))
        labels_dict["labels_1"].append(float('nan'))
        labels_dict["labels_2"].append(float('nan'))
        labels_dict["labels_3"].append(prices[i])

    i += 1

In [139]:
df_labels = pd.DataFrame(labels_dict)

In [140]:
df_labels.hvplot.step(value_label='Price', legend='top', height=520, width=1000, logy=True)

### Calculate Metrics

In [141]:
# Sharpe Ratio Function
def sharpe_ratio(returns, rf):
    N = 255
    r = returns.mean() * N
    sigma = returns.std() * np.sqrt(N)
    sharp_ratio_ = round((r - rf) / sigma , 2)

    return sharp_ratio_

In [142]:
# Metrics
bench_return = round(df_main["bench_prod_exp"].values[-1], 2)
strat_return= round(df_main["strat_prod_exp"].values[-1], 2)

bench_sharpe = sharpe_ratio(df_main.Returns, 0.02)
strat_sharpe = sharpe_ratio(df_main.Returns.mul(df_main.main_signal), 0.02)

In [143]:
print(f"Returns Benchmark: {bench_return}\n",f"Returns Strategy: {strat_return}")
print(f"Sharpe Benchmark: {bench_sharpe}\n",f"Sharpe Strategy: {strat_sharpe}")

Returns Benchmark: 17.15
 Returns Strategy: 16.89
Sharpe Benchmark: 1.92
 Sharpe Strategy: 2.45
