# 環境設定

In [None]:
# Install HappyML
import os

if not os.path.isdir("HappyML"):
  os.system("git clone https://github.com/cnchi/HappyML.git")

In [None]:
# Download Dataset
Dataset_File = 'TaiwanStockID.csv'

if not os.path.isfile(Dataset_File):
    os.system('wget https://raw.githubusercontent.com/cnchi/datasets/master/' + Dataset_File)

In [None]:
# Install Yahoo Finance package
!pip install yfinance



In [None]:
# Chinese Font Settings for Chart Plotting (Colab)
from matplotlib.font_manager import FontProperties
import matplotlib.pyplot as plt

# 用這個方法可以檢查系統有沒有中文字體（空空如也=沒有）
!fc-list :lang=zh family

# 下載台北思源黑體，並命名taipei_sans_tc_beta.ttf
!wget -O taipei_sans_tc_beta.ttf https://drive.google.com/uc?id=1eGAsTN1HBpJAkeVM57_C7ccp7hbgSz3_&export=download
# 移至指定路徑
!mv taipei_sans_tc_beta.ttf /usr/local/lib/python3.10/dist-packages/matplotlib/mpl-data/fonts/ttf

# 自定義字體變數
myfont = FontProperties(fname=r'/usr/local/lib/python3.10/dist-packages/matplotlib/mpl-data/fonts/ttf/taipei_sans_tc_beta.ttf')

# 後續在相關函式中增加fontproperties=myfont屬性即可。如：plt.xlabel("時間", fontproperties=myfont)

--2023-11-28 07:22:59--  https://drive.google.com/uc?id=1eGAsTN1HBpJAkeVM57_C7ccp7hbgSz3_
Resolving drive.google.com (drive.google.com)... 142.251.163.139, 142.251.163.101, 142.251.163.100, ...
Connecting to drive.google.com (drive.google.com)|142.251.163.139|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://doc-0k-9o-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/hlrm4t9erufa5a4u4l1hi5lkcqe08e32/1701156150000/02847987870453524430/*/1eGAsTN1HBpJAkeVM57_C7ccp7hbgSz3_?uuid=6e8a1214-9a73-4e49-9c55-e1bdbc6113bc [following]
--2023-11-28 07:23:02--  https://doc-0k-9o-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/hlrm4t9erufa5a4u4l1hi5lkcqe08e32/1701156150000/02847987870453524430/*/1eGAsTN1HBpJAkeVM57_C7ccp7hbgSz3_?uuid=6e8a1214-9a73-4e49-9c55-e1bdbc6113bc
Resolving doc-0k-9o-docs.googleusercontent.com (doc-0k-9o-docs.googleusercontent.com)... 172.253.122.132, 2607:f8b0:4004:c09::84
Connecting to d

In [None]:
# Customizable Constants
train_size = 0.6
val_size = 0.2

win_size = 30
sample_step = 1
win_moving = 1

data_batch = 5

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import dateutil.parser as psr

import yfinance as yf

# 讀入股票漲跌資訊

In [None]:
# Read Stock ID
stockIDMap = pd.read_csv("TaiwanStockID.csv")

stockInput = input("請輸入台灣股票名稱、或代號：")
if stockInput.isdigit():
    stockID = int(stockInput)
    condition = stockIDMap["StockID"] == stockID
    stockName = stockIDMap[condition].iloc[0]["StockName"]
else:
    stockName = stockInput
    condition = stockIDMap["StockName"] == stockName
    stockID = stockIDMap[condition].iloc[0]["StockID"]

startDate = psr.parse(input("請輸入查詢起始日期："))
endDate = psr.parse(input("請輸入查詢截止日期："))

請輸入台灣股票名稱、或代號：台積電
請輸入查詢起始日期：6/20
請輸入查詢截止日期：9/20


In [None]:
# Download the Stock Data
stockQuery = "{}.TW".format(stockID)
dataset = yf.download(stockQuery, start=startDate.strftime("%Y-%m-%d"), end=endDate.strftime("%Y-%m-%d"))

[*********************100%%**********************]  1 of 1 completed


# 資料前處理

In [None]:
# Preprocessing: Decomposition
X = dataset.loc[:, ["High", "Low", "Open", "Close", "Volume"]]
Y = dataset.loc[:, ["Close"]]

In [None]:
# Preprocessing: Feature Scaling (Normalization) with MinMaxScaler
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range=(0, 1))
X_scale = scaler.fit_transform(X)
Y_scale = scaler.fit_transform(Y)

In [None]:
# Preprocessing: Split Training & Testing Data
X_train, X_val, X_test = np.split(X_scale,
                [int(train_size * len(X_scale)), int((train_size + val_size) * len(X_scale))])
Y_train, Y_val, Y_test = np.split(Y_scale,
                [int(train_size * len(Y_scale)), int((train_size + val_size) * len(Y_scale))])

In [None]:
# Preprocessing: Generate Recurrent Data
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator

train_set = TimeseriesGenerator(
        data=X_train,
        targets=Y_train,
        length=win_size,
        sampling_rate=sample_step,
        stride=win_moving,
        batch_size=data_batch)

val_set = TimeseriesGenerator(
        data=X_val,
        targets=Y_val,
        length=win_size,
        sampling_rate=sample_step,
        stride=win_moving,
        batch_size=data_batch)

test_set = TimeseriesGenerator(
        data=X_test,
        targets=Y_test,
        length=win_size,
        sampling_rate=sample_step,
        stride=win_moving,
        batch_size=data_batch)

ValueError: ignored

# 模型建立

In [None]:
# Create Model
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers

model = Sequential()
model.add(layers.LSTM(units=50, return_sequences=True, input_shape=(win_size, X.shape[1])))
model.add(layers.Dropout(0.2))
model.add(layers.LSTM(units=50, return_sequences=True))
model.add(layers.Dropout(0.2))
model.add(layers.LSTM(units=50))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(Y.shape[1], activation="linear"))

from tensorflow.keras.optimizers import RMSprop
model.compile(optimizer=RMSprop(learning_rate=1e-5), loss="mse", metrics=["mse"])

# 模型校正

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

# Create TensorBoard log directory
import os
from datetime import datetime
from tensorflow.keras.callbacks import TensorBoard

logdir = os.path.join("logs", datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = TensorBoard(logdir, histogram_freq=1)

In [None]:
# Start the TensorBoard
%tensorboard --logdir logs

In [None]:
# Train Model
model.fit(train_set, validation_data=val_set, epochs=100, callbacks=[tensorboard_callback])

# 模型訓練

In [None]:
# Train Model (After Correction)
model.fit(train_set, validation_data=val_set, epochs=12)

# 模型評估

In [None]:
# In[] Model Evaluation
test_loss, test_mse = model.evaluate(test_set)
print("Loss of Test Set:", test_loss)
print("MSE of Test Set:", test_mse)

# 模型預測

In [None]:
# Get the Predict Price (with date)
Y_pred = model.predict(test_set)
Y_pred_price = scaler.inverse_transform(Y_pred)
Y_pred_price = pd.DataFrame(data=Y_pred_price)
Y_pred_price.columns = ["收盤價"]
Y_pred_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()

# Get the Real Price (with date)
Y_real_price = dataset.iloc[-len(Y_pred_price):]["Close"].to_frame()
Y_real_price.columns = ["收盤價"]
Y_real_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()

# Plot the predict vs. real price
import matplotlib.ticker as ticker

fig, ax = plt.subplots(1, 1)

ax.plot(Y_pred_price, color="blue", label="Predict")
ax.plot(Y_real_price, color="red", label="Real")

tick_spacing = 5
ax.xaxis.set_major_locator(ticker.MultipleLocator(tick_spacing))
fig.autofmt_xdate()

plt.title("{} 收盤價盲測結果".format(stockName), fontproperties=myfont)
plt.xlabel("日期", fontproperties=myfont)
plt.ylabel("收盤價", fontproperties=myfont)
plt.legend(loc="best")
plt.show()

# 預測明日股價

In [None]:
# Get Trade Day
import datetime
this_trade_day = dataset.index[-1].to_pydatetime()
next_trade_day = this_trade_day + datetime.timedelta(days=1)
if next_trade_day.isoweekday() in set((6, 7)):
    next_trade_day += datetime.timedelta(days=8-next_trade_day.isoweekday())

# Show Predict Price
lookback_data = [[]]
for i in range(-win_size, 0):
    lookback_data[0].append(X_scale[i].tolist())

lookback_data = np.reshape(lookback_data, (1, win_size, X_scale.shape[1]))
tomorrow_pred = scaler.inverse_transform(model.predict(lookback_data))

print()
print("{}預測收盤價 ----------".format(stockName))
print("最後一日（{}）：{:.2f}".format(this_trade_day.strftime("%Y/%m/%d"), Y_pred_price.iloc[-1][0]))
print("次交易日（{}）：{:.2f}".format(next_trade_day.strftime("%Y/%m/%d"), tomorrow_pred[0][0]))
print("預測漲跌：{:+.2f}".format(tomorrow_pred[0][0] - Y_pred_price.iloc[-1][0]))

# Show Real Price
print()
print("{}真實收盤價 ----------".format(stockName))
print("最後一日（{}）：{:.2f}".format(this_trade_day.strftime("%Y/%m/%d"), dataset.iloc[-1]["Close"]))

if next_trade_day.date() < datetime.date.today():
    next_trade_day_end = next_trade_day + datetime.timedelta(days=1)
    tomorrow_real = yf.download(stockQuery,
                  start=next_trade_day.date().strftime("%Y-%m-%d"),
                  end=next_trade_day_end.date().strftime("%Y-%m-%d"))
    print("次交易日（{}）：{:.2f}".format(next_trade_day.strftime("%Y/%m/%d"), tomorrow_real.iloc[0]["Close"]))
    print("真實漲跌：{:+.2f}".format(tomorrow_real.iloc[0]["Close"] - dataset.iloc[-1]["Close"]))