In [17]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import plotly.express as px

In [18]:

# 讀取資料
df = pd.read_csv('0050.TW.csv')
df = df.round(3)
# df.index = list(map(lambda x:datetime.datetime.strptime(x, '%Y-%m-%d'), df.index))

# 设置显示选项
pd.set_option('display.max_colwidth', None)
torch.set_printoptions(profile='full')

df.info()

# # 設定要使用的特徵
# features = ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

# print(df)
# timeseries = df[features].values.round(3)

# print(timeseries)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 243 entries, 0 to 242
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Date       243 non-null    object 
 1   Open       243 non-null    float64
 2   High       243 non-null    float64
 3   Low        243 non-null    float64
 4   Close      243 non-null    float64
 5   Adj Close  243 non-null    float64
 6   Volume     243 non-null    int64  
dtypes: float64(5), int64(1), object(1)
memory usage: 13.4+ KB


In [26]:
timeseries = df[['Open', 'High', 'Low', 'Adj Close', 'Volume','Close']].values.astype('float32')
input_size = timeseries.shape[-1]
print(input_size)
# 一定要先將資料標準化，不然loss值會過大導致無法收斂，且預測值全部變成同一個數
# scaler
scaler = StandardScaler()
timeseries = scaler.fit_transform(timeseries)

# train-val split for time series
train_size = int(len(timeseries) * 0.67)
test_size = len(timeseries) - train_size
train, val = timeseries[:train_size], timeseries[train_size:]

6


In [27]:
def create_dataset(dataset, lookback):

    X, y = [], []
    for i in range(len(dataset)-lookback):
        feature = dataset[i:i+lookback, :]
        target = dataset[i+1:i+lookback+1][-1][-1]

        # 只使用收盤價作為input
        # feature = dataset[i:i+lookback]
        # target = dataset[i+1:i+lookback+1][-1]

        X.append(feature)
        y.append(target)
    return torch.tensor(X), torch.tensor(y).view(-1, 1)

lookback = 5
X_train, y_train = create_dataset(train, lookback=lookback)
X_val, y_val = create_dataset(val, lookback=lookback)
print(X_train.size(), y_train.size())
print(X_val.size(), y_val.size())

torch.Size([157, 5, 6]) torch.Size([157, 1])
torch.Size([76, 5, 6]) torch.Size([76, 1])


In [28]:
class LSTM_Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm = nn.LSTM(input_size = input_size, hidden_size=64, num_layers=2, batch_first = True)
        self.linear = nn.Linear(64, 1)
        self.dropout = nn.Dropout(0.1)
    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.dropout(x)
        x = x[:, -1, :]
        x = self.linear(x)
        return x

In [29]:
model = LSTM_Model()
# optimizer = optim.Adam(model.parameters())
# 使用SDG或Adam演算法的lstm經常會用RMSprop做為優化方向，因為，它收斂的速度會比較快，原因是RMSprop 的學習速率(learning rate)會隨著之前的梯度總和作反向的調整。
optimizer = optim.RMSprop(model.parameters())
loss_fn = nn.MSELoss()
n_epochs = 300

loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=False, batch_size=8)
train_loss_array = []
val_loss_array = []

In [None]:
for epoch in range(n_epochs):
    model.train()
    for X_batch, y_batch in loader:
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 確認loss值的變化正常
    # if epoch % 100 ==0:
    #     print(y_pred)

    # Validation
    model.eval()
    with torch.no_grad():
        y_pred = model(X_train)
        # train_rmse = np.sqrt(loss_fn(y_pred, y_train))
        train_loss = loss_fn(y_pred, y_train).item()

        y_pred = model(X_val)
        # test_rmse = np.sqrt(loss_fn(y_pred, y_test))
        val_loss = loss_fn(y_pred, y_val).item()

        train_loss_array.append(train_loss)
        val_loss_array.append(val_loss)

    if epoch % 100 == 0:
        print("Epoch %d: train_loss %.4f, val_loss %.4f" % (epoch, train_loss, val_loss))
with torch.no_grad():
    model.eval()
    # shift train predictions for plotting
    # 因為timeseries包含開、高、低、收等資料，因此需要timeseries[:, -1]取得最後一項 收盤價 的欄位長度即可
    train_plot = np.ones_like(timeseries[:, -1]) * np.nan
    y_pred = model(X_train)
    # train_plot.shape == (len(X_train), )為一維矩陣，model(X_train).size == (len(X_train), 1)為二維矩陣，因此需要view()
    train_plot[lookback:train_size] = model(X_train).view(-1)

    # shift test predictions for plotting
    test_plot = np.ones_like(timeseries[:,-1]) * np.nan
    test_plot[train_size+lookback:len(timeseries)] = model(X_val).view(-1)

Epoch 0: train_loss 0.3458, val_loss 1.9592
Epoch 100: train_loss 0.0149, val_loss 3.4015
Epoch 200: train_loss 0.0031, val_loss 2.3523


In [None]:
# plot
# 因為timeseries包含開、高、低、收等資料，因此需要timeseries[:, -1]取得最後一項 收盤價 的欄位資料即可
plt.plot(timeseries[:, -1])
plt.plot(train_plot)
plt.plot(test_plot)
plt.show()