In [None]:
!pip install patool

Collecting patool
  Downloading patool-2.0.0-py2.py3-none-any.whl (93 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m93.7/93.7 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: patool
Successfully installed patool-2.0.0


In [None]:
#extract zip file

import patoolib

file='stock price.zip'

patoolib.extract_archive(file)

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Activation
from datetime import datetime as dt

import tensorflow as tf

import xgboost as xgb


In [None]:
%cd '/content/drive/MyDrive/Colab Notebooks/Data/stock_price_data'

/content/drive/MyDrive/Colab Notebooks/Data/stock_price_data


In [None]:
def metrics_summary(actual, predicted):
    cm = confusion_matrix(actual, predicted)
    TN, FP, FN, TP = cm.ravel()

    # 計算準確度 (Accuracy = (TP + TN) / (TP + TN + FP + FN))
    accuracy = (TP + TN) / (TP + TN + FP + FN)

    # 計算精確度 (Precision = TP / (TP + FP))
    precision = TP / (TP + FP)

    # 計算召回率 (Recall = TP / (TP + FN))
    recall = TP / (TP + FN)

    # 計算F1分數 (F1 Score = 2 * (Precision * Recall) / (Precision + Recall))
    f1_score = 2 * (precision * recall) / (precision + recall)

    # 計算特異性 (Specificity = TN / (TN + FP))
    specificity = TN / (TN + FP)

    # 整理成字典並回傳
    metrics_summary = {
        "真陽性 (True Positive)": TP,
        "假陽性 (False Positive)": FP,
        "真陰性 (True Negative)": TN,
        "假陰性 (False Negative)": FN,
        "準確度 (Accuracy)": accuracy,
        "精確度 (Precision)": precision,
        "召回率 (Recall)": recall,
        "F1分數 (F1 Score)": f1_score,
        "特異性 (Specificity)": specificity
    }

    return metrics_summary

In [None]:
def sds(stock_id):

    df = pd.read_excel(f'{stock_id}.xlsx')
    df = df.dropna()

    # 訓練、驗證、測試集的比例
    train_rate = 0.7
    validate_rate = 0.2

    # 資料數輛
    data_num = df.shape[0]

    # 切割資料點
    validate_split = data_num * train_rate # 訓練、驗證集的切割點
    test_split = data_num * (train_rate + validate_rate) # 驗證、測試集的切割點
    validate_split, test_split = int(validate_split), int(test_split) # 讓切割點變成整數

    # 切割資料
    train_df = df.iloc[ : validate_split]
    validate_df = df.iloc[ validate_split : test_split ]
    test_df = df.iloc[ test_split : ]

    # 將資料的標籤(y)取出來
    train_y = train_df["sign"]
    validate_y = validate_df["sign"]
    test_y = test_df["sign"]

    # 將資料的特徵(x)取出來
    train_x = train_df.drop(["日期", "sign"], axis = 1)
    validate_x = validate_df.drop(["日期", "sign"], axis = 1)
    test_x = test_df.drop(["日期", "sign"], axis = 1)

    # 正規化到0與1之間
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaler.fit(train_x.values)

    # 將數值正規化
    scaler_train_x = scaler.transform(train_x.values)
    scaler_validate_x = scaler.transform(validate_x.values)
    scaler_test_x = scaler.transform(test_x.values)

    # 將正規化後的數值轉回DataFrame
    train_x = pd.DataFrame(scaler_train_x, columns = train_x.columns)
    validate_x = pd.DataFrame(scaler_validate_x, columns = validate_x.columns)
    test_x = pd.DataFrame(scaler_test_x, columns = test_x.columns)

    return train_x, validate_x, test_x, train_y, validate_y, test_y, train_df, validate_df, test_df

#for XGBoost, if we change y-value as t-price/(t-2)-price, then the accuracy rate will increase

In [None]:
SEED = 42

save_path = "prediction result/" # 預測結果儲存位置

threshold = 0.5


def lstm(stock_id, epochs, batch_size):
     #Set the random seed for TensorFlow
    tf.random.set_seed(SEED)

    # Set the random seed for NumPy
    np.random.seed(SEED)

    train_x, validate_x, test_x, train_y, validate_y, test_y, train_df, validate_df, test_df = sds(stock_id)

    # window比例
    window = 60 # LSTM每次訓練的資料筆數，每次輸入1筆時間序列資料，常用為30、60
    print(epochs, batch_size, window )


    # 以將index用rolling window處置
    def rolling_with_gap(df_x, df_y, window):
        n = len(df_x)
        result = []

        for i in range(0, n - window + 1):
            result.append(np.arange(i, i + window))

        index_result = np.array(result)

        rolling_x = []
        rolling_y = []
        for i in index_result:
            rolling_x.append(df_x.iloc[i].to_numpy())
            rolling_y.append(df_y.iloc[i].to_numpy())
        rolling_x = np.array(rolling_x)
        rolling_y = np.array(rolling_y)

        return rolling_x, rolling_y

    # 將X、Y的以rolling window的形式，變成三維的numpy資料
    rolling_train_x, rolling_train_y = rolling_with_gap( train_x, train_y, window = window)
    rolling_validate_x, rolling_validate_y = rolling_with_gap( validate_x, validate_y, window = window)
    rolling_test_x, rolling_test_y = rolling_with_gap( test_x, test_y, window = window)

    # 定義LSTM輸入的形狀，分別代表資料長度與時間步長(即這個window的形狀)
    input_shape = rolling_train_x.shape[1], rolling_train_x.shape[2]

    model = Sequential() # 建構依序模型

    # 定義輸入層神經元的數量
    model.add( LSTM(512, input_shape = input_shape, return_sequences=True) ) # 建構輸入層，LSTM本身就有啟動函數，所以不用加
    input_list = [256]# 隱藏層層數與個別的神經元的數量

    # 建構隱藏層
    for i in input_list:
        model.add( LSTM(i, return_sequences=True) )


    model.add( Dense(1) ) # 輸出層為1個神經元
    model.add( Activation('sigmoid')) # 啟動函數為sigmoid，輸出的值會在0到1之間

    # 編譯模型
    model.compile("Adam", loss='binary_crossentropy', metrics=['mse',"accuracy"])

    # 開始時間
    start_time = dt.now()
    print(start_time)

    # 訓練模型
    model.fit(rolling_train_x,
              rolling_train_y,
              validation_data = (rolling_validate_x, rolling_validate_y),
              epochs = epochs,
              batch_size = batch_size,
              verbose=0)

    validate_pred_y = model.predict(rolling_validate_x) # 預測驗證集
    validate_pred_y = validate_pred_y > threshold
    # 找出每一個window的最後一筆資料
    validate_pred_y = validate_pred_y[:,-1].flatten().astype(int)

    # 將預測結果放入驗證集
    validate_df.loc[:, "LSTM預測"] = pd.NA # 新增一個空欄位
    validate_df.loc[validate_df.index[window - 1:], "LSTM預測"] = validate_pred_y # 放入預測結果

    test_pred_y = model.predict(rolling_test_x) # 預測測試集
    test_pred_y = test_pred_y > threshold
    # 找出每一個window的最後一筆資料
    test_pred_y = test_pred_y[:,-1].flatten().astype(int)

    # 將預測結果放入測試集
    test_df.loc[:, "LSTM預測"] = pd.NA # 新增一個空欄位
    test_df.loc[test_df.index[window - 1:], "LSTM預測"] = test_pred_y # 放入預測結果

    # 結束時間
    end_time = dt.now()
    print(end_time)

    validate_summary = metrics_summary(validate_pred_y, rolling_validate_y[:,-1].flatten())
    test_summary = metrics_summary(test_pred_y, rolling_test_y[:,-1].flatten())

    print("耗費時間", end_time - start_time)

    print(test_summary)

    #validate_df.to_excel(f"{save_path}{stock_id}_驗證.xlsx")
    #test_df.to_excel(f"{save_path}{stock_id}_測試.xlsx")

In [None]:
def xgboost(stock_id):
    # Set the random seed for NumPy
    np.random.seed(SEED)

    train_x, validate_x, test_x, train_y, validate_y, test_y, train_df, validate_df, test_df = sds(stock_id)

    # 設定XGBoost的參數
    params = {
        'objective': 'binary:logistic',
        'max_depth':10,
        'alpha': 0.01,
        'learning_rate': 0.1,
        'n_estimators': 100
    }

    # 創建XGBoost分類器
    model = xgb.XGBClassifier(**params)


    # 訓練模型
    model.fit(train_x, train_y, eval_set=[(validate_x, validate_y)], verbose = False)

    # 開始時間
    start_time = dt.now()
    print(start_time)


    validate_pred_y = model.predict(validate_x) # 預測驗證集
    test_pred_y = model.predict(test_x) # 預測測試集

    validate_summary = metrics_summary(validate_pred_y, validate_y)
    test_summary = metrics_summary(test_pred_y, test_y)

    print("驗證集概要", validate_summary)
    print("-------------------")
    print("測試集概要", test_summary)
    print("-------------------")
    print("驗證集準確度", validate_summary["準確度 (Accuracy)"])
    print("測試集準確度", test_summary["準確度 (Accuracy)"])
    # 結束時間
    end_time = dt.now()
    print(end_time)
    print("耗費時間", end_time - start_time)

    #validate_df.to_excel(f"{save_path}{stock_id}_驗證.xlsx")
    #test_df.to_excel(f"{save_path}{stock_id}_測試.xlsx")

In [None]:
stock_id = 2330
epochs = 256
batch_size = 1000

In [None]:
lstm(stock_id, epochs, batch_size)

256 1000 60
2023-12-28 23:54:23.742720
2023-12-28 23:56:36.235324
耗費時間 0:02:12.492604
{'真陽性 (True Positive)': 70, '假陽性 (False Positive)': 17, '真陰性 (True Negative)': 15, '假陰性 (False Negative)': 82, '準確度 (Accuracy)': 0.46195652173913043, '精確度 (Precision)': 0.8045977011494253, '召回率 (Recall)': 0.4605263157894737, 'F1分數 (F1 Score)': 0.5857740585774059, '特異性 (Specificity)': 0.46875}


In [None]:
xgboost(stock_id)

2023-12-28 23:50:09.500990
驗證集概要 {'真陽性 (True Positive)': 30, '假陽性 (False Positive)': 203, '真陰性 (True Negative)': 229, '假陰性 (False Negative)': 22, '準確度 (Accuracy)': 0.5351239669421488, '精確度 (Precision)': 0.12875536480686695, '召回率 (Recall)': 0.5769230769230769, 'F1分數 (F1 Score)': 0.21052631578947367, '特異性 (Specificity)': 0.5300925925925926}
-------------------
測試集概要 {'真陽性 (True Positive)': 28, '假陽性 (False Positive)': 77, '真陰性 (True Negative)': 119, '假陰性 (False Negative)': 19, '準確度 (Accuracy)': 0.6049382716049383, '精確度 (Precision)': 0.26666666666666666, '召回率 (Recall)': 0.5957446808510638, 'F1分數 (F1 Score)': 0.3684210526315789, '特異性 (Specificity)': 0.6071428571428571}
-------------------
驗證集準確度 0.5351239669421488
測試集準確度 0.6049382716049383
2023-12-28 23:50:09.526111
耗費時間 0:00:00.025121
