<a href="https://colab.research.google.com/github/yy0750/Stock-Price-Prediction-using-HMM/blob/main/GMM_HMM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Appendix A.1. Preparation for Model Construction/Modeling

#### Appendix A.1.1. Importing Necessary Modeules

##### install hmmlearn

In [None]:
# hmmlearn 패키지 설치
!pip install hmmlearn

##### Import library

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sklearn.metrics as metrics
import yfinance as yf
from hmmlearn import hmm
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

In [None]:
# 시작 날짜와 종료 날짜 설정
start_date = '2021-12-10'
end_date = '2023-11-17'

# 주식 코드 설정 (삼성전자 - '005930.KS', Sk하이닉스 - '00660.KS')
stock_code = '005930.KS'

# Yahoo Finance에서 주식 데이터 다운
df = yf.download(stock_code, start=start_date, end=end_date)

# 데이터프레임 인덱스 재설정
df.reset_index(inplace=True)

# 'ID' 열을 데이터프레임의 첫 번째 열로 추가
df.insert(0, 'ID', df.index)

# 'Adj Close' 열 삭제
df.drop('Adj Close', axis=1, inplace=True)

### Appendix A.1.3 Data formatting and Normalization

In [None]:
# 'Avg' 열 추가
df['Avg'] = pd.Series([((row.Open + row.High + row.Low + row.Close) / 4) for index, row in df.iterrows()])

# 'Date' 열의 데이터 형식을 날짜로 변환
df['Date'] = pd.to_datetime(df['Date'], format='%m/%d/%Y')

# 데이터프레임에서 'ID'와 'Avg' 열을 선택
X = df.values[:, [0, 7]]

# NaN을 0으로 대체
X = np.nan_to_num(X)

# 데이터를 표준화
X = StandardScaler().fit_transform(X)

#### Appendix A.1.4 Clustering
###### K-Means Clustering
###### Preliminary Clustering

In [None]:
# 클러스터 개수 설정
clusterNum = 3

# K-평균 클러스터링, n_clusters=clusterNum 클러스터 개수를 3으로 설정, n_init=12: 12번의 다른 초기화를 실행
k_means = KMeans(init='k-means++', n_clusters=clusterNum, n_init=12)
k_means.fit(X)

# 각 데이터 포인트의 클러스터 레이블을 labels 변수에 저장
labels = k_means.labels_

# 데이터프레임에 'State' 열을 추가
df['State'] = labels

# 클러스터링 결과를 산점도로 시각화
plt.scatter(X[:, 0], X[:, 1], c=labels.astype(np.float64), alpha=0.5)
plt.xlabel('S.no')  # x축 레이블 설정
plt.ylabel('Price')  # y축 레이블 설정


##### Calculates  the mean and covariance for each cluster


In [None]:
pre_means = []
pre_covars = []

for i in [0, 1, 2]:
    tdf = df[df.State == i]

    # 평균 계산
    pre_means.append(tdf['Avg'].sum() / tdf['Avg'].count())

    # 분산 계산
    pre_covars.append(tdf['Avg'].std() ** 2)

In [None]:
# # 'Avg' 열의 데이터를 numpy 배열로 변환하고 NaN 값을 제거한 후 1열 형태로 변경
# data = np.array(df['Avg'].dropna()).reshape(-1, 1)

# subclusters = {}

# # 고유한 클러스터 각각에 대해 반복문 실행
# for cluster_id in np.unique(labels):
#     # 해당 클러스터에 속하는 데이터를 선택
#     cluster_data = data[labels == cluster_id]

#     # 클러스터 내 데이터의 평균을 계산
#     cluster_mean = np.mean(cluster_data, axis=0)

#     # 클러스터 내 데이터의 공분산을 계산
#     cluster_covariance = np.cov(cluster_data, rowvar=False)

#     # 클러스터 평균과 공분산을 딕셔너리에 저장
#     subclusters[cluster_id] = {
#         'means': cluster_mean,
#         'covariances': cluster_covariance
#     }

# # 각 클러스터의 평균과 공분산을 저장
# pre_means = np.empty(3)
# pre_means[0] = subclusters[0]['means']
# pre_means[1] = subclusters[1]['means']
# pre_means[2] = subclusters[2]['means']

# pre_covars = np.empty(3)
# pre_covars[0] = subclusters[0]['covariances']
# pre_covars[1] = subclusters[1]['covariances']
# pre_covars[2] = subclusters[2]['covariances']


##### Clustering the log-likelihoods


In [None]:
# 'Avg' 열을 리스트로 변환
pre_prices = df['Avg'].tolist()

# 로그 우도를 저장할 리스트를 초기화
pre_training_ll = [np.nan] * len(pre_prices)


for i in range(3, len(pre_prices)):
    print('Datapoint: ', i)

    # GMM-HMM 모델 초기화
    pre_model = hmm.GMMHMM(n_components=3, n_mix=1, n_iter=100, covariance_type='diag', init_params='')

    # 모델의 초기 파라미터 값을 설정
    pre_model.startprob_ = np.array([1/3, 1/3, 1/3])
    pre_model.transmat_ = np.array([[1/3, 1/3, 1/3], [1/3, 1/3, 1/3], [1/3, 1/3, 1/3]])
    pre_model.means_ = np.array([pre_means[0], pre_means[1], pre_means[2]]).reshape(3, 1)
    pre_model.covars_ = np.array([pre_covars[0], pre_covars[1], pre_covars[2]]).reshape(3, 1)
    pre_model.weights_ = np.array([1, 1, 1]).reshape(3, 1)

    # 최근 4일치 주가 데이터를 가져와서 배열 형태로 변환
    X = np.array(pre_prices[max(i-4, 0):i]).reshape(i - max(i-4, 0), 1)

    # NaN을 0으로 대체
    X = np.nan_to_num(X)

    # 모델 훈련
    try:
        pre_model.fit(X)
    except ValueError:
        continue

    # 훈련 로그 우도를 리스트에 저장합니다.
    pre_training_ll[i] = pre_model.monitor_.history[0]

# 데이터프레임에 'pre_ll' 열을 추가하여 로그 우도를 저장
df['pre_ll'] = np.nan_to_num(pre_training_ll)


##### Refining the clusters (Clustering the log_likelihoods):

In [None]:
# 데이터프레임에서 'ID'와 'Avg' 열을 선택
X = df.values[:, [0, 9]]

# NaN을 0으로 대체
X = np.nan_to_num(X)

# 표준화
X = StandardScaler().fit_transform(X)


# 클러스터의 개수 설정
clusterNum = 3

# K-평균 클러스터링, n_clusters=clusterNum 클러스터 개수를 3으로 설정, n_init=12: 12번의 다른 초기화를 실행
k_means = KMeans(init='k-means++', n_clusters=clusterNum, n_init=12)
k_means.fit(X)

# 각 데이터 포인트의 클러스터 레이블을 labels 변수에 저장
labels = k_means.labels_

# 데이터프레임에 'State' 열을 추가
df['State'] = labels


# 클러스터링 결과를 산점도로 시각화
plt.scatter(df['ID'], df['Avg'], c=df['State'].astype(np.float64), alpha=0.5)
plt.xlabel('S.no')  # x축 레이블 설정
plt.ylabel('Price')  # y축 레이블 설정


## Appendix A.2. Model Construction/Modeling


#### Appendix A.2.1 Mixture Model
##### Low Economy

In [None]:
# 'State' 열 값이 0인 행들로 이루어진 데이터프레임을 생성
low_df = df[df.State == 0]


# 데이터프레임의 인덱스를 재설정
low_df.reset_index(inplace=True)

# 'ID' 열을 삭제
low_df.drop(['ID'], axis=1, inplace=True)

# low_df의 인덱스 값을 할당
low_df['ID'] = low_df.index


# 클러스터링에 사용할 데이터를 선택
X = low_df.values[:, [10, 7]]

# NaN을 0으로 대체
X = np.nan_to_num(X)

# 표준화
X = StandardScaler().fit_transform(X)


# 클러스터의 개수 설정
clusterNum = 2

low_k_means = KMeans(init='k-means++', n_clusters=clusterNum, n_init=12)
low_k_means.fit(X)

labels = low_k_means.labels_

# 데이터프레임에 'Mix' 열을 추가하여 클러스터 레이블을 저장
low_df['Mix'] = labels


# 클러스터링 결과를 산점도로 시각화
plt.scatter(X[:, 0], X[:, 1], c=labels.astype(np.float64), alpha=0.5)
plt.xlabel('S.no')  # x축 레이블 설정
plt.ylabel('Price')  # y축 레이블 설정


# 각 클러스터에 대한 평균, 분산, 가중치를 계산
low_means = []
low_covars = []
low_weights = []

for i in [0, 1]:
    tdf = low_df[low_df.Mix == i]

    # 평균 계산
    low_means.append(tdf['Avg'].sum() / tdf['Avg'].count())

    # 분산 계산
    low_covars.append(tdf['Avg'].std() ** 2)

    # 가중치 계산
    tdf = low_df[low_df.Mix == 0]
    low_weights.append(tdf['Avg'].count() / low_df['Avg'].count())
    low_weights.append(1 - low_weights[0])


In [None]:
# print("Low Economy")
# print(low_means)
# print(low_covars)
# print(low_weights, end='\n\n')

# print("Medium Economy")
# print(medium_means)
# print(medium_covars)
# print(medium_weights, end='\n\n')

# print("High Economy")
# print(high_means)
# print(high_covars)
# print(high_weights)



##### Medium Economy

In [None]:
medium_df = df[df.State == 2]

medium_df.reset_index(inplace=True)
medium_df.drop(['ID'], axis=1, inplace=True)
medium_df['ID'] = medium_df.index

X = medium_df.values[:, [10, 7]]
X = np.nan_to_num(X)
X = StandardScaler().fit_transform(X)

clusterNum = 2
medium_k_means = KMeans(init='k-means++', n_clusters=clusterNum, n_init=12)
medium_k_means.fit(X)
labels = medium_k_means.labels_
medium_df['Mix'] = labels

plt.scatter(X[:, 0], X[:, 1], c=labels.astype(np.float64), alpha=0.5)
plt.xlabel('S.no')
plt.ylabel('Price')

medium_means = []
medium_covars = []
medium_weights = []

for i in [0, 1]:
    tdf = medium_df[medium_df.Mix == i]

    #Mean
    medium_means.append(tdf['Avg'].sum() / tdf['Avg'].count())

    #Variance
    medium_covars.append(tdf['Avg'].std() ** 2)

    #Weights
    tdf = medium_df[medium_df.Mix == 0]
    medium_weights.append(tdf['Avg'].count() / medium_df['Avg'].count())
    medium_weights.append(1 - medium_weights[0])

##### High Economy

In [None]:
# 'State' 열 값이 1인 행들로 이루어진 데이터프레임을 생성
high_df = df[df.State == 1]


# 데이터프레임의 인덱스를 재설정
high_df.reset_index(inplace=True)

# 'ID' 열을 삭제
high_df.drop(['ID'], axis=1, inplace=True)

# high_df의 인덱스 값을 할당
high_df['ID'] = high_df.index


# 클러스터링에 사용할 데이터를 선택
X = high_df.values[:, [10, 7]]

# NaN을 0으로 대체
X = np.nan_to_num(X)

# 표준화
X = StandardScaler().fit_transform(X)


# 클러스터의 개수를 설정
clusterNum = 2

# K-Means 클러스터링 모델을 초기화하고 데이터에 적합
high_K_means = KMeans(init='k-means++', n_clusters=clusterNum, n_init=12)
high_K_means.fit(X)

labels = high_K_means.labels_

high_df['Mix'] = labels


# 클러스터링 결과를 산점도로 시각화
plt.scatter(X[:, 0], X[:, 1], c=labels.astype(np.float64), alpha=0.5)
plt.xlabel('S.no')  # x축 레이블 설정
plt.ylabel('Price')  # y축 레이블 설정


# 각 클러스터에 대한 평균, 분산, 가중치 계산
high_means = []
high_covars = []
high_weights = []

for i in [0, 1]:
    tdf = high_df[high_df.Mix == i]

    # 평균 계산
    high_means.append(tdf['Avg'].sum() / tdf['Avg'].count())

    # 분산 계산
    high_covars.append(tdf['Avg'].std() ** 2)

    # 가중치 계산
    tdf = high_df[high_df.Mix == 0]
    high_weights.append(tdf['Avg'].count() / high_df['Avg'].count())
    high_weights.append(1 - high_weights[0])


#### Appendix A.2.2 HMM Training and Testing

In [None]:
# 'Close' 열을 리스트로 변환
prices = df['Close'].tolist()

# prices의 길이와 동일한 길이로 초기화
training_ll = [None] * len(prices)
lls = [None] * len(prices)

# 미래의 가격 예측을 위해 prices의 길이보다 하나 더 긴 길이로 초기화
predicted_prices = [None] * (len(prices) + 1)

# 엡실론 값과 현재 로그 우도 값 설정
epsilon = 0.2
curr_ll = 0

for i in range(3, len(prices)):
    print('Datapoint: ', i)

    # GMM-HMM 모델 초기화
    model = hmm.GMMHMM(n_components=3, n_mix=2, n_iter=100, covariance_type='diag', init_params='')

    # 모델의 초기 파라미터 값을 설정
    model.startprob_ = np.array([1/3, 1/3, 1/3])
    model.transmat_ = np.array([[1/3, 1/3, 1/3], [1/3, 1/3, 1/3], [1/3, 1/3, 1/3]])

    # 클러스터별 평균, 분산, 가중치 값을 설정
    model.means_ = np.array([[medium_means[0], medium_means[1]], [high_means[0], high_means[1]], [low_means[0], low_means[1]]]).reshape(3, 2, 1)
    model.covars_ = np.array([[medium_covars[0], medium_covars[1]], [high_covars[0], high_covars[1]], [low_covars[0], low_covars[1]]]).reshape(3, 2, 1)
    model.weights_ = np.array([[medium_weights[0], medium_weights[1]], [high_weights[0], high_weights[1]], [low_weights[0], low_weights[1]]])

    # 최근 4일치 종가 데이터를 가져와서 배열 형태로 변환
    X = np.array(prices[max(i-4, 0):i]).reshape(i - max(i-4, 0), 1)

    # NaN을 0으로 대체
    X = np.nan_to_num(X)

    # 모델 훈련
    try:
        model.fit(X)

    # 값 오류가 발생하면 예외 처리
    except ValueError:
        pass

    # 로그 우도의 배열을 생성
    ll_arr = np.array(model.monitor_.history)
    # NaN 값 제거
    ll_arr = ll_arr[~np.isnan(ll_arr)]

    # 로그 우도 계산
    try:
        #curr_ll = ll_arr[0]  # 첫 번째 반복
        #curr_ll = np.median(ll_arr)  # 중간 반복
        curr_ll = ll_arr[-1]  # 마지막 반복

    # 값 오류가 발생하면 예외 처리
    except ValueError:
        pass

    lls[i] = curr_ll

    # # i가 427 지점 이후부터 수행 예측 수행
    if i > 427:

        # 차이 값 변수 생성
        diff = []

        #  3부터 i까지의 범위에 대해 반복
        for j in range(3, i):

            #lls 배열의 j번째 요소가 None이 아닐 때만 내부의 코드를 실행
            if lls[j] is not None:

                # lls[j] 값이 curr_ll +- 엡실론 범위일 경우
                if lls[j] < (curr_ll + epsilon) and lls[j] > (curr_ll - epsilon):

                    # prices 배열의 연속된 값의 차이를 계산하여 diff 리스트에 추가
                    diff.append(prices[j+1] - prices[j])

        if len(diff) > 0:
            min_diff = diff[np.argmin(np.abs(diff))]
            predicted_prices[i+1] = prices[i] + min_diff

# 데이터프레임에 'PredictedPrices' 열을 추가
df['PredictedPrices'] = pd.Series(predicted_prices)


#### Appendix A.2.3 Model Performance

In [None]:
print()

In [None]:
# 주가 예측 시각화
plt.plot(df[df['Date'] > '31-08-2023']['Close'], label='Actual Prices', c='b')  # 'Actual Prices'라는 레이블과 함께 파란색으로 실제 주가 데이터를 그림
plt.plot(df['PredictedPrices'], label='GMM-HMM Prices', c='r')  # 'GMM-HMM Prices'라는 레이블과 함께 빨간색으로 GMM-HMM 모델로 예측한 주가 데이터를 그림
plt.title('Stock Price Prediction with GMM-HMM')  # 그래프 제목
plt.ylabel('Prices')  # y축 레이블
plt.legend()  # 두 레이블('Actual Prices'와 'GMM-HMM Prices')이 그래프에 범례로 표시됨
plt.show()  # 그래프를 화면에 출력

# NaN이 아닌 값들에서 'Close', 'PredictedPrices' 열의 값을 리스트 형태로 변환
y_true = df[~np.isnan(df['PredictedPrices'])]['Close'].tolist()  # 실제 주가
y_pred = df[~np.isnan(df['PredictedPrices'])]['PredictedPrices'].tolist()  # GMM-HMM 모델로 예측한 주가

# 평균 절대 백분율 오차(Mean Absolute Percentage Error) 계산
print('MAE%: ', metrics.mean_absolute_percentage_error(y_true, y_pred))


In [None]:
df0 = df[df.State == 0]

In [None]:
df0 = df[df.State == 0]
df1 = df[df.State == 1]
df2 = df[df.State == 2]

In [None]:
import matplotlib.pyplot as plt

bins = 50

edgecolor = 'black'

plt.hist(df0['Close'], bins=bins, edgecolor=edgecolor, alpha=0.5, label='df0')
plt.hist(df1['Close'], bins=bins, edgecolor=edgecolor, alpha=0.5, label='df1')
plt.hist(df2['Close'], bins=bins, edgecolor=edgecolor, alpha=0.5, label='df2')

plt.xlabel('Stock Price')
plt.ylabel('Frequency')

plt.title('Close Prices Comparison')

plt.xticks(rotation=45)

plt.legend()

plt.show()


In [None]:
import matplotlib.pyplot as plt

bins = 25

edgecolor = 'black'

fig, axs = plt.subplots(2, 2, figsize=(10, 8))

axs[0, 0].hist(df0['Close'], bins=bins, edgecolor=edgecolor)
axs[0, 0].set_title('df0 Close')
axs[0, 0].set_xlabel('Stock Price')
axs[0, 0].set_ylabel('Frequency')
axs[0, 0].tick_params(axis='x', rotation=45)

axs[0, 1].hist(df1['Close'], bins=bins, edgecolor=edgecolor)
axs[0, 1].set_title('df1 Close')
axs[0, 1].set_xlabel('Stock Price')
axs[0, 1].set_ylabel('Frequency')
axs[0, 1].tick_params(axis='x', rotation=45)

axs[1, 0].hist(df2['Close'], bins=bins, edgecolor=edgecolor)
axs[1, 0].set_title('df2 Close')
axs[1, 0].set_xlabel('Stock Price')
axs[1, 0].set_ylabel('Frequency')
axs[1, 0].tick_params(axis='x', rotation=45)

axs[1, 1].axis('off')

plt.tight_layout()

plt.show()
