## I. Cài đặt + Khai báo thư viện

In [1]:
!pip install pmdarima

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pmdarima
  Downloading pmdarima-2.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl (1.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pmdarima
Successfully installed pmdarima-2.0.3


In [None]:
from google.colab import drive
drive.mount('/content/drive')
base_path = 'drive/MyDrive/Build_stock_model'

import itertools 
import numpy as np
import pandas as pd
import seaborn as sns
from numpy import argmin
from datetime import datetime
import matplotlib.pyplot as plt
import scipy.integrate as integrate
from time import sleep
from datetime import datetime, date, timedelta

# Turn off warning
import warnings
warnings.filterwarnings("error")
    
    
import statsmodels.api as sm

# statsmodel
# from statsmodels.tsa.arima_model import ARIMA
from pmdarima.arima import auto_arima
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# II. Function

In [None]:
# Đọc dữ liệu
#/content/drive/MyDrive/Build_model_stock/SHFE_max_volume_week_month__Mar12.xlsx
def _read_data(time, name_stock):   # time: w-week, m-month, 6m-6month; name_stock: tên loại cổ phiếu;
  if time =='w':
    df = pd.read_excel(f'{base_path}/SHFE_max_volume_week_month__Mar12.xlsx',sheet_name = f'Fri{name_stock}')[['Date','CLOSEPRICE']]
  elif time =='m':
    df = pd.read_excel(f'{base_path}/SHFE_max_volume_week_month__Mar12.xlsx',sheet_name = f'EOM{name_stock}')[['Date','CLOSEPRICE']]
  elif time =='6m':
    df_m = pd.read_excel(f'{base_path}/SHFE_max_volume_week_month__Mar12.xlsx',sheet_name = f'EOM{name_stock}')[['Date','CLOSEPRICE']]
    list_date_6m = []
    for i in np.arange(-1,-len(df_m),-6): 
      list_date_6m.append(df_m.Date.values[i])
    df = df_m[df_m['Date'].isin(list_date_6m)].copy()
  else:
    print('Không tồn tại data bạn đã đọc')
  df.columns = ['date','close_price']
  df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y')
  df = df.sort_values(by='date',ascending=True).reset_index(drop=True)
  return df


# Tách dữ liệu
def _split_data(df):    
  train_val_data = df[:int(len(df)*0.8)].copy()
  test_data = df[int(len(df)*0.8):].copy()
  train_data = train_val_data[:int(len(train_val_data)*0.8)].copy()
  val_data = train_val_data[int(len(train_val_data)*0.8):].copy()
  return train_data, val_data, test_data


# Kiểm tra tính dừng của data
def _check_stationary(train_df):
  df = train_df['close_price'].copy()
  for d in range(0, 4):
    if d > 0:
      df = df.diff()[1:]
  # check stationary
    if adfuller(df.values)[1] <= 0.05:
      break
  print('Dữ liệu có tính dừng tại phân sai:',d)
  
  #Visualization acf, pacf
  fig, ax = plt.subplots(1,2,figsize=(15,4))
  plot_acf(df, lags=15, ax=ax[0])
  ax[0].set_title(f'ACF sai phân bậc {d}')
  plot_pacf(df, lags=10, ax=ax[1],method="ols")
  ax[1].set_title(f'PACF sai phân bậc {d}') 
  plt.show()  
  return d


# Tạo ra các bộ params ARIMA - SARIMA
## arima
def _create_orders_arima(potential_p, d, potential_q):
  # p: autoregressive lags => dynamic
  # d: difference in the order => static
  # q: moving average lags => dynamic
  orders = []
  for p in range(0, potential_p+1, 1):
      for q in range(0, potential_q+1, 1):
          order = (p, d, q)
          orders += [order]
  return orders
## sarima
def _create_order_sarima(p = range(1, 7),
                         q = range(1, 4),
                         d =1,
                         P = range(1,4),
                         Q = range(1,3),
                         D = 0,
                         m=range(1,13,1)):
    
  pdq = list(itertools.product(p, q))
  orders = [(x[0], d, x[1]) for x in list(itertools.product(p,q))]
  seasonal_orders=[]
  for i in m:
    seasonal_orders+=([(x[0], 0, x[1], i) for x in list(itertools.product(P,Q))])
  return orders,seasonal_orders


# Tìm model ARIMA top
def _arima_fit(order_s, df):
  result_models = pd.DataFrame(columns=['order', 'aic', 'bic'])
  fail_parameters = []
  for order in order_s:
    try:
      model = ARIMA(df['close_price'].values, order=order).fit() 
      row = {'order': order, 'aic': model.aic, 'bic': model.bic}
      result_models = pd.concat([result_models, pd.DataFrame([row])], ignore_index=True)
        
    except:
      fail_parameters.append(order)
    
  result_models = result_models.sort_values(by=['aic'], ascending=True).reset_index(drop=True)
  return result_models,fail_parameters

# Tìm model SARIMA top
def _sarima_fit(order_s,seasonal_order_s, df):
  fail_parameters = []
  parameters = []
  k=1
  for order in order_s:
    for seasonal_order in seasonal_order_s:
      try:
        model = ARIMA(df['close_price'].values,
                                        order=order,
                                        seasonal_order=seasonal_order)
        results = model.fit()
        aic = results.aic
        # print(order,seasonal_order,aic)
        parameters.append([order,seasonal_order,aic])
      except:
        fail_parameters.append([order, seasonal_order])
    k+=1
    print(f'Percentage: {k/len(order_s)}')
              
  result_models = pd.DataFrame(parameters)
  result_models.columns = ['order','seasonal_order','aic']
  # sorting in ascending order, the lower AIC is - the better
  result_models = result_models.sort_values(by='aic', ascending=True).reset_index(drop=True)
  return result_models,fail_parameters


# Dự đoán giá tương lai
def _predict(start_point,
             end_point,
             df,
             best_order,
             best_sesonal_order):  
  data_results = pd.DataFrame()
  for j in np.arange(start_point, end_point, 1):
  # fit model
    model_arima = ARIMA(df['close_price'][:j].values, order = best_order, seasonal_order= best_sesonal_order)
    model_fit = model_arima.fit()

  # get results
    predict = model_fit.forecast(1)[0]
    date = df.iloc[j]['date']
    truth = df.iloc[j]['close_price']
    row = {'date': date, 'truth': truth, 'predict': predict}
    data_results = pd.concat([data_results, pd.DataFrame([row])], ignore_index=True)
    # print(j)

  # calculate errors
  data_results['error'] = data_results['predict'] - data_results['truth']
  performance = np.mean(100*(abs(data_results['error']/data_results['truth'])))
  data_results['performance'] = performance
  return data_results

# III. Chạy tìm best params cho model

## 1. Khai báo biến: 

In [None]:
name_time = 'w'  # w, m, 6m
name_data = '6'   # 1, 3, 4, 5, 6, 7, 8, 9, 10, 11 , MaxVol

In [None]:
data = _read_data(name_time, name_data) 
train_data, val_data, test_data = _split_data(data)
d = _check_stationary(train_data)

## 2. Chạy

In [None]:
%%time
# ARIMA
multiple_model_arima,fail_parameters_arima  = _arima_fit(_create_orders_arima(10 ,d, 10),train_data)
multiple_model_arima.head(2)

# SARIMA
order_sarima_s, seasonal_sarima_s = _create_order_sarima()
print('Done generate params')
multiple_model_sarima, fail_parameters_sarima = _sarima_fit(order_sarima_s, seasonal_sarima_s, train_data)
multiple_model_sarima.head(2)

# Lấy ra top các params
top_arima = multiple_model_arima.head(3).order.values
top_order_sarima = multiple_model_sarima.head(3).order.values
top_seasonal_sarima = multiple_model_sarima.head(3).seasonal_order.values

performance_val = []
params = []
for i in range(0,6):
  print(i)
  try:
    value = _predict(len(train_data),len(train_data)+len(val_data),data, top_arima[i], (0,0,0,0)).performance[0]
    print(top_arima[i])
    performance_val.append(round(value,4))
    params.append(top_arima[i])
  except:
    print('Not fit')

for i in range(0,6):
  print(i)
  try:
    value = _predict(len(train_data),len(train_data)+len(val_data),data, top_order_sarima[i], top_seasonal_sarima[i]).performance[0]
    print(top_order_sarima[i], top_seasonal_sarima[i])
    performance_val.append(round(value,4))
    params.append(f'{top_order_sarima[i]}-{top_seasonal_sarima[i]}')
  except:
    print('Not fit')

print(params[argmin(performance_val)])

# IV. Dự đoán và tính sai số trên tập test

In [None]:
data = _read_data('m',5) # time: w, m, 6m and name_stock: 1, 3, 4, 5, 6, 7, 8, 9, 10, 11
train_data, val_data, test_data = _split_data(data)
best_order_model = (3,1,2)
best_seasonal_order_model = (0,0,0,0)

In [None]:
predict_test_by_best_model = _predict(len(train_data)+len(val_data),
                                      len(data),
                                      data,
                                      best_order_model,
                                      best_seasonal_order_model)

print(predict_test_by_best_model['performance'][0])

# V. Phân loại mức độ dao động

In [None]:
class ProbaError:
    def __init__(self, 
                 price_actual_yesterday=97.555,
                 price_predict_today=98.555,
                 params_normal=(0.2, 0.005), 
                 range_change_rate=[-0.05, 0.03]):
    
    # assign params
        self.price_actual_yesterday = price_actual_yesterday
        self.price_predict_today = price_predict_today
        self.mean_error = params_normal[0]
        self.sigma_error = params_normal[1]
        self.lower_change_rate = range_change_rate[0]
        self.upper_change_rate = range_change_rate[1]

  # build distribute normal
    def _buil_normal(self, x, mu, sigma):
        return np.exp(-(x-mu)**2 / (2*sigma**2))/(sigma*np.sqrt(2*np.pi))

  # find range error: (price_predict_today) with (price_actual_yesterday in range_change_rate)
    def _find_range_error(self, price_actual_yesterday, price_predict_today, lower_change_rate, upper_change_rate):
    # interval price
        thresold_price_today_1 = price_actual_yesterday * (1 + lower_change_rate)
        thresold_price_today_2 = price_actual_yesterday * (1 + upper_change_rate)

    # interval error:
        thresold_error_today_1 = price_predict_today - thresold_price_today_1
        thresold_error_today_2 = price_predict_today - thresold_price_today_2

        if thresold_error_today_1 < thresold_error_today_2:
            return thresold_error_today_1, thresold_error_today_2

        return thresold_error_today_2, thresold_error_today_1


  #  get density probability of distribute normal
    def density_proba_normal(self):
    
        lower_error_today, upper_error_today = self._find_range_error(self.price_actual_yesterday, self.price_predict_today, self.lower_change_rate, self.upper_change_rate)
        val, err = integrate.quad(lambda x : self._buil_normal(x, self.mean_error, self.sigma_error),
                                  lower_error_today , upper_error_today)
        return round(val,3)

In [None]:
class PredictPrice:
    def __init__(self,
                 data, 
                params_model={'order': (4, 1, 1),
                             'seasonal_order': (1,1,0,7)}, 
                description_model="Dự báo tuần và sai số trên tập test là 3%",
                index_start_error=1
               ):
        # params
        self.data = data
        self.params_model = params_model
        self.description_model = description_model
        self.index_start_error = index_start_error

  # fit_model for data
    def fit_model(self, column_name='close_price'):
        model = ARIMA(self.data[column_name].values, order=self.params_model['order'], seasonal_order=self.params_model['seasonal_order'])
        model_fit = model.fit()

        return model_fit

  # get params for error distribute
    def get_params_error(self, model_fit, column_name='close_price'):
        # get price
        truth_price_s = self.data[self.index_start_error:][column_name].values
        predict_price_s = model_fit.predict(self.index_start_error, len(self.data)-1)

        # calculate errors
        error_s = predict_price_s - truth_price_s

        # get params error_s
        mean_error = np.mean(error_s)
        sigma_error = np.sqrt(sum((error_s - mean_error)**2)/len(error_s))

        # return
        return mean_error, sigma_error

  # get pair price
    def get_pair_price(self, model_fit, column_name='close_price'):
        price_actual_yesterday = self.data[column_name].values[-1]
        price_predict_today = model_fit.forecast(1)[0]

        return price_actual_yesterday, price_predict_today

  # run class
    def run(self):
        model_fit = self.fit_model()
        mean_error, sigma_error = self.get_params_error(model_fit)
        price_actual_yesterday, price_predict_today = self.get_pair_price(model_fit)

        dict_range_error_s = {
            'Nhỏ hơn -5%': [-np.inf, -0.05],
            'Từ -5% đến -3%': [-0.05, -0.03],
            'Từ -3% đến 0%': [-0.03, 0],
            'Từ 0% đến 3%': [0, 0.03],
            'Từ 3% đến 5%': [0.03, 0.05],
            'Lớn hơn 5%': [0.05, np.inf]
            }

        result = {
            'Ghi chú': self.description_model
            }
        for description, range_change_rate in dict_range_error_s.items():
            density_proba = ProbaError(price_actual_yesterday,
                                     price_predict_today,
                                     params_normal=(mean_error, sigma_error), 
                                     range_change_rate=range_change_rate).density_proba_normal()
            row = {description: round(density_proba*100,2)}
            result.update(row)

        return pd.DataFrame(result, index=[0])

In [None]:
classifi_df = pd.DataFrame()
for i in range(-13,0,1):
  split_data = data[:i]
  if i <-1:
    date = data[:i+1].date.max()
  else:
    date = data.date.max()
#        date = data[:i].date.max()+ timedelta(weeks=4)
  classifi_point = PredictPrice(split_data,
                                params_model={'order': best_order_model,
                                              'seasonal_order': best_seasonal_order_model},
                                description_model = f'Dự báo tuần và sai số trên tập test là {round(1*100,2)}%',
                                index_start_error=1).run()
  classifi_point['Date'] = date
  classifi_df = classifi_df.append(classifi_point)
  classifi_df = classifi_df[['Ghi chú', 'Nhỏ hơn -5%', 'Từ -5% đến -3%', 'Từ -3% đến 0%','Từ 0% đến 3%', 'Từ 3% đến 5%', 'Lớn hơn 5%', 'Date']]
  # print(i)

In [None]:
classifi_df['Increase'] = classifi_df[['Từ 0% đến 3%', 'Từ 3% đến 5%', 'Lớn hơn 5%',]].sum(axis=1)
classifi_df['Decrease'] = classifi_df[['Nhỏ hơn -5%', 'Từ -5% đến -3%', 'Từ -3% đến 0%']].sum(axis=1)

In [None]:
classifi_df[['Date', 'Nhỏ hơn -5%', 'Từ -5% đến -3%', 'Từ -3% đến 0%','Từ 0% đến 3%', 'Từ 3% đến 5%', 'Lớn hơn 5%','Increase','Decrease']].tail(11)