In [12]:
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import numpy as np
from compute_pointer import get_df

In [13]:
df = get_df(flag=0)
df

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2014-01-03,103.0,103.0,102.0,102.5,75.244667,38599000
2014-01-06,102.0,103.0,102.0,102.5,75.244667,23706000
2014-01-07,102.5,103.0,102.0,102.0,74.877609,15836000
2014-01-08,103.0,104.5,102.5,104.0,76.345802,30218000
2014-01-09,102.5,103.0,100.5,101.0,74.143509,63998000
...,...,...,...,...,...,...
2023-12-25,582.0,585.0,580.0,581.0,573.515503,9548143
2023-12-26,583.0,586.0,582.0,586.0,578.451111,16094308
2023-12-27,587.0,592.0,586.0,592.0,584.373840,33401336
2023-12-28,592.0,593.0,589.0,593.0,585.360901,25514849


In [14]:
from sklearn.preprocessing import MinMaxScaler
def preprocess(data_trend, train_ratio, n_past, predict_day):
    scaler = MinMaxScaler()  
    data_trend = scaler.fit_transform(data_trend)
    
    train_ind = int(len(data_trend) * train_ratio)
    train_data = data_trend[:train_ind]
    test_data = data_trend[train_ind:]

    # 產生序列資料
    X_train_full, Y_train_full, Y_train_direction = create_sequences(train_data, n_past, predict_day)
    X_test, Y_test, Y_test_direction = create_sequences(test_data, n_past, predict_day)

    # 訓練和驗證集切分
    val_ind = int(len(X_train_full) * test_val_ratio)
    X_train, X_val = X_train_full[:val_ind], X_train_full[val_ind:]
    Y_train, Y_val = Y_train_full[:val_ind], Y_train_full[val_ind:]
    Y_train_direction, Y_val_direction = Y_train_direction[:val_ind], Y_train_direction[val_ind:]

    return X_train, Y_train, Y_train_direction, X_val, Y_val, Y_val_direction, X_test, Y_test, Y_test_direction, scaler

def create_sequences(data, n_past, predict_day):
    X, Y, Y_direction = [], [], []
    L = len(data)
    for i in range(L - (n_past + predict_day - 1)):
        # 過去 n_past 天的資料
        X.append(data[i:i + n_past])
        
        # 預測未來 predict_day 天的收盤價
        Y.append(data[i + n_past:i + n_past + predict_day, 3]) 
        
        # 計算方向性標籤
        y_dir = []
        for j in range(predict_day):
            if data[i + n_past + j, 3] > data[i + n_past + j - 1, 3]:
                y_dir.append(1)  # 1 表示上漲
            else:
                y_dir.append(0)  # 0 表示下跌
        
        # 將方向性標籤加入
        Y_direction.append(y_dir)
    
    # 將序列轉為 NumPy 並轉換為 PyTorch 張量
    return (
        torch.tensor(np.array(X), dtype=torch.float32), 
        torch.tensor(np.array(Y), dtype=torch.float32), 
        torch.tensor(np.array(Y_direction), dtype=torch.long)
    )

# 使用更新後的 preprocess 函數
data = df[[c for c in df.columns if c not in ['Date','Adj Close']]].values
train_ratio = 0.9
test_val_ratio = 0.9
n_past = 40     # 移動視窗大小
predict_day = 10 # 預測後面幾天

X_train, Y_train, Y_train_direction, X_val, Y_val, Y_val_direction, X_test, Y_test, Y_test_direction, scaler = preprocess(data, train_ratio, n_past, predict_day)

batch_size = 32

# 訓練數據加載
train_set = torch.utils.data.TensorDataset(X_train, Y_train, Y_train_direction)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)

val_set = torch.utils.data.TensorDataset(X_val, Y_val, Y_val_direction)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False)

test_set = torch.utils.data.TensorDataset(X_test, Y_test, Y_test_direction)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)



In [15]:
print(f"X_train: {X_train.shape}, Y_train: {Y_train.shape}, Y_train_direction: {Y_train_direction.shape}")
print(f"X_val: {X_val.shape}, Y_val: {Y_val.shape}, Y_val_direction: {Y_val_direction.shape}")
print(f"X_test: {X_test.shape}, Y_test: {Y_test.shape}, Y_test_direction: {Y_test_direction.shape}")


X_train: torch.Size([1931, 40, 5]), Y_train: torch.Size([1931, 10]), Y_train_direction: torch.Size([1931, 10])
X_val: torch.Size([215, 40, 5]), Y_val: torch.Size([215, 10]), Y_val_direction: torch.Size([215, 10])
X_test: torch.Size([195, 40, 5]), Y_test: torch.Size([195, 10]), Y_test_direction: torch.Size([195, 10])


In [16]:
Y_train[1]

tensor([0.0215, 0.0275, 0.0249, 0.0232, 0.0240, 0.0240, 0.0197, 0.0206, 0.0206,
        0.0223])

In [17]:
Y_train_direction[1]

tensor([0, 1, 0, 0, 1, 0, 0, 1, 0, 1])

In [18]:

import torch.nn as nn

class LSTM(nn.Module):
    def __init__(self, n_features, hidden_dim, predict_day):
        super(LSTM, self).__init__()
        self.n_features = n_features
        self.hidden_dim = hidden_dim
        self.predict_day = predict_day
        
        # LSTM 層
        self.lstm = nn.LSTM(input_size=self.n_features, hidden_size=self.hidden_dim, 
                            num_layers=2, bidirectional=False, batch_first=True, dropout=0.2)
        
        # 回歸層 (預測價格)
        self.regression_layer = nn.Linear(self.hidden_dim, self.predict_day)
        
        # 分類層 (預測漲跌)
        self.classification_layer = nn.Linear(self.hidden_dim, self.predict_day * 2) # 二分類
        
    def forward(self, x): 
        x, _ = self.lstm(x)
        x_last = x[:, -1, :]  # 取最後一個時間步的輸出
        
        # 回歸輸出 (價格)
        regression_output = self.regression_layer(x_last)
        
        # 分類輸出 (漲跌)
        classification_output = self.classification_layer(x_last)
        classification_output = classification_output.view(-1, self.predict_day, 2) # 每一天都有兩個類別
        
        return regression_output, classification_output



In [19]:
def combined_loss(regression_output, classification_output, 
                  price_target, trend_target, lambda_reg=1.0, lambda_cls=1.0):
    """
    :param regression_output: 回歸輸出 (價格預測)
    :param classification_output: 分類輸出 (漲跌預測)
    :param price_target: 實際價格
    :param trend_target: 實際趨勢標籤 (0 或 1，0 表示下跌，1 表示上漲)
    :param lambda_reg: 回歸損失權重
    :param lambda_cls: 分類損失權重
    """
    # 回歸損失
    regression_loss = nn.MSELoss()(regression_output, price_target)
    
    # 分類損失
    classification_loss = nn.CrossEntropyLoss()(classification_output.view(-1, 2), trend_target.view(-1))
    
    # 組合損失
    total_loss = lambda_reg * regression_loss + lambda_cls * classification_loss
    return total_loss





In [23]:
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
# 檢查是否有可用的GPU
if torch.cuda.is_available():
    print("GPU")
    device = torch.device("cuda")  # 使用GPU
else:
    print("CPU")
    device = torch.device("cpu")  # 使用CPU
# 設置訓練參數
num_epochs = 100
learning_rate = 0.0001

# 初始化模型、損失函數和優化器
model = LSTM(n_features=5, hidden_dim=128, predict_day=10)
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

GPU


In [24]:
lambda_reg = 1.0
lambda_cls = 1.0

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    
    for X_batch, Y_batch, Y_direction_batch in train_loader:
        X_batch = X_batch.to(device)
        Y_batch = Y_batch.to(device)
        Y_direction_batch = Y_direction_batch.to(device)
        optimizer.zero_grad()
        
        # 前向傳播
        regression_output, classification_output = model(X_batch)
        
        # 計算損失
        loss = combined_loss(regression_output, classification_output, 
                             Y_batch, Y_direction_batch, 
                             lambda_reg=lambda_reg, lambda_cls=lambda_cls)
        total_loss += loss.item()
        
        # 反向傳播與優化
        loss.backward()
        optimizer.step()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader)}")


Epoch 1/100, Loss: 0.8230518503267257
Epoch 2/100, Loss: 0.718282377133604
Epoch 3/100, Loss: 0.6970505548305199
Epoch 4/100, Loss: 0.6956614246133899
Epoch 5/100, Loss: 0.6951965865541677
Epoch 6/100, Loss: 0.6948110685973871
Epoch 7/100, Loss: 0.6946319038750696
Epoch 8/100, Loss: 0.694363356613722
Epoch 9/100, Loss: 0.6941572437520886
Epoch 10/100, Loss: 0.6941303004983996
Epoch 11/100, Loss: 0.6940303249437301
Epoch 12/100, Loss: 0.6938791597475771
Epoch 13/100, Loss: 0.6937873431893645
Epoch 14/100, Loss: 0.6938923841617146
Epoch 15/100, Loss: 0.6936994791030884
Epoch 16/100, Loss: 0.6936037892200908
Epoch 17/100, Loss: 0.6936241495804708
Epoch 18/100, Loss: 0.6934449643385215
Epoch 19/100, Loss: 0.6935040941003894
Epoch 20/100, Loss: 0.6935014578162647
Epoch 21/100, Loss: 0.6933458125005003
Epoch 22/100, Loss: 0.693357277111929
Epoch 23/100, Loss: 0.6932673483598427
Epoch 24/100, Loss: 0.6933327639689211
Epoch 25/100, Loss: 0.6932498627021665
Epoch 26/100, Loss: 0.693168449597280

In [25]:
def evaluate_model(model, data_loader):
    model.eval()
    total_regression_loss = 0.0
    total_correct = 0
    total_samples = 0
    
    with torch.no_grad():
        for X_batch, Y_batch, Y_direction_batch in data_loader:
            X_batch = X_batch.to(device)
            Y_batch = Y_batch.to(device)
            Y_direction_batch = Y_direction_batch.to(device)
            regression_output, classification_output = model(X_batch)
            
            # 回歸損失
            regression_loss = nn.MSELoss()(regression_output, Y_batch)
            total_regression_loss += regression_loss.item()
            
            # 分類準確率
            predicted_trends = torch.argmax(classification_output, dim=-1)
            total_correct += (predicted_trends == Y_direction_batch).sum().item()
            total_samples += Y_direction_batch.numel()
    
    avg_regression_loss = total_regression_loss / len(data_loader)
    accuracy = total_correct / total_samples
    return avg_regression_loss, accuracy

# 驗證集評估
val_reg_loss, val_accuracy = evaluate_model(model, val_loader)
print(f"Validation Loss: {val_reg_loss:.4f}, Accuracy: {val_accuracy:.4f}")

# 測試集評估
test_reg_loss, test_accuracy = evaluate_model(model, test_loader)
print(f"Test Loss: {test_reg_loss:.4f}, Accuracy: {test_accuracy:.4f}")


Validation Loss: 0.0059, Accuracy: 0.5079
Test Loss: 0.0030, Accuracy: 0.5015


In [28]:
model.eval()
with torch.no_grad():
    for X_batch, Y_batch, Y_direction_batch in test_loader:
        X_batch = X_batch.to(device)
        Y_batch = Y_batch.to(device)
        Y_direction_batch = Y_direction_batch.to(device)
        regression_output, classification_output = model(X_batch)
        
        # 回歸預測
        print("Predicted Prices:", regression_output[0].cpu().numpy())
        print("Actual Prices:", Y_batch[0].cpu().numpy())
        
        # 分類預測
        predicted_trends = torch.argmax(classification_output, dim=-1)
        print("Predicted Trends:", predicted_trends[0].cpu().numpy())
        print("Actual Trends:", Y_direction_batch[0].cpu().numpy())
        break  # 只顯示第一個批次


Predicted Prices: [0.7008664  0.7160211  0.7083208  0.7198194  0.75074613 0.70767623
 0.72966987 0.75136524 0.76371187 0.7549444 ]
Actual Prices: [0.7270386  0.7218884  0.72360516 0.7081545  0.7133047  0.7030043
 0.70472103 0.6944206  0.7167382  0.70643777]
Predicted Trends: [1 1 1 1 1 1 1 1 1 1]
Actual Trends: [1 0 1 0 1 0 1 0 1 0]
