多模態模型，採用早期融合的方式進行數據整合。多模態資料來源採用以下組合 : k線圖 + 股價資料

In [3]:
pip install yfinance torch torchvision numpy pandas scikit-learn mplfinance

Collecting mplfinance
  Downloading mplfinance-0.12.10b0-py3-none-any.whl.metadata (19 kB)
Downloading mplfinance-0.12.10b0-py3-none-any.whl (75 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.0/75.0 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mplfinance
Successfully installed mplfinance-0.12.10b0


In [27]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import yfinance as yf
import matplotlib.pyplot as plt
import mplfinance as mpf
import os

# 步驟 1: 下載台積電股價資料
def download_stock_data(ticker, start_date, end_date):
    # 使用 yfinance 下載指定日期範圍的股票數據
    data = yf.download(ticker, start=start_date, end=end_date)
    return data

# 步驟 2: 生成 K 線圖影像
def generate_kline_images(data, output_dir, window_size=20):
    # 如果輸出資料夾不存在，則創建
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 確保數據索引為日期格式
    data.index = pd.to_datetime(data.index)

    # 以滑動窗口的方式生成多個 K 線圖
    for i in range(len(data) - window_size+1):
        window_data = data.iloc[i:i + window_size]
        save_path = os.path.join(output_dir, f"kline_{i}.png")
        try:
            mpf.plot(window_data, type='candle', style='charles', savefig=save_path)
        except Exception as e:
            print(f"生成圖像失敗，索引: {i}, 錯誤: {e}")

# 步驟 3: 定義數據集類別
class MultimodalStockDataset(Dataset):
    def __init__(self, kline_image_paths, stock_data, labels):
        # 初始化 K 線圖路徑、結構化股價數據和標籤
        self.kline_image_paths = kline_image_paths
        self.stock_data = stock_data
        self.labels = labels

    def __len__(self):
        # 返回數據集大小
        return len(self.labels)

    def __getitem__(self, idx):
        # 加載圖像並處理
        kline_image = plt.imread(self.kline_image_paths[idx])
        if kline_image.shape[-1] == 4:  # 如果有 Alpha 通道，僅保留 RGB
            kline_image = kline_image[..., :3]
        kline_image = torch.tensor(kline_image, dtype=torch.float32).permute(2, 0, 1) / 255.0
        # 加載結構化數據和標籤
        stock_data = torch.tensor(self.stock_data[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return {'kline_image': kline_image, 'stock_data': stock_data, 'label': label}

# 步驟 4: 定義多模態模型
class MultimodalModel(nn.Module):
    def __init__(self):
        super(MultimodalModel, self).__init__()
        # 定義 CNN 模塊來處理 K 線圖特徵
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((5, 5))  # 強制將特徵圖尺寸縮小到固定大小
        )
        # 定義全連接層來處理結構化股價數據
        self.fc_stock = nn.Sequential(
            nn.Linear(5, 64),  # 5 個特徵: Open, High, Low, Close, Volume
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        # 定義融合層，結合 CNN 和全連接層的輸出
        self.fc_fusion = nn.Sequential(
            nn.Linear(64 * 5 * 5 + 32, 64),  # 根據 CNN 輸出形狀調整
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, kline_image, stock_data):
        # CNN 模塊處理 K 線圖
        cnn_out = self.cnn(kline_image)
        print(f"展平前的 CNN 輸出形狀: {cnn_out.shape}")
        cnn_out = cnn_out.view(cnn_out.size(0), -1)  # 展平 CNN 輸出
        print(f"展平後的 CNN 輸出形狀: {cnn_out.shape}")
        # 全連接層處理結構化數據
        stock_out = self.fc_stock(stock_data)
        # 融合兩部分輸出
        combined = torch.cat((cnn_out, stock_out), dim=1)
        # 通過融合層得到最終預測
        output = self.fc_fusion(combined)
        return output


In [30]:
# 步驟 5: 準備數據
ticker = "2330.TW"
start_date = "2020-01-01"
end_date = "2023-01-01"
data = download_stock_data(ticker, start_date, end_date)

# 如果存在 MultiIndex，降為單層索引
if isinstance(data.columns, pd.MultiIndex):
    data.columns = data.columns.droplevel(1)  # 移除 'Ticker' 層

# 確認欄位名稱
print("欄位名稱:", data.columns)

# 選擇必要的欄位
required_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
if not all(col in data.columns for col in required_columns):
    raise KeyError(f"缺少必要的欄位: {set(required_columns) - set(data.columns)}")

# 清理數據，移除空值並轉為數字
data = data[required_columns]
data = data.dropna()  # 移除空值
for col in required_columns:
    data[col] = pd.to_numeric(data[col], errors='coerce')  # 轉換為數字
data = data.dropna()  # 再次移除 NaN

# 添加標籤
data['Label'] = data['Close'].pct_change().shift(-1)
data = data.dropna()  # 移除因標籤計算產生的空值

# 生成 K 線圖影像
output_dir = "kline_images"
generate_kline_images(data, output_dir)

# 確認生成的 K 線圖文件數量
kline_images_dir = "kline_images"
kline_image_paths = [os.path.join(kline_images_dir, f) for f in sorted(os.listdir(kline_images_dir)) if f.endswith(".png")]
print(f"生成的 K 線圖文件數量: {len(kline_image_paths)}")

# 提取結構化股價數據並標準化
stock_features = data[['Open', 'High', 'Low', 'Close', 'Volume']].values
scaler = StandardScaler()
stock_features = scaler.fit_transform(stock_features)
labels = data['Label'].values

# 確保數據對齊
stock_features = stock_features[:len(kline_image_paths)]
labels = labels[:len(kline_image_paths)]

# 檢查缺失文件並補齊
missing_files = [path for path in kline_image_paths if not os.path.exists(path)]
if missing_files:
    print(f"缺失的文件數量: {len(missing_files)}")
    for missing_file in missing_files:
        idx = int(missing_file.split('_')[-1].split('.')[0])
        window_data = data.iloc[idx:idx + 20]
        try:
            mpf.plot(window_data, type='candle', style='charles', savefig=missing_file)
        except Exception as e:
            print(f"補齊圖像失敗，索引: {idx}, 錯誤: {e}")

# 切分數據
train_images, test_images, train_stock, test_stock, train_labels, test_labels = train_test_split(
    kline_image_paths, stock_features, labels, test_size=0.2, random_state=42
)

train_dataset = MultimodalStockDataset(train_images, train_stock, train_labels)
test_dataset = MultimodalStockDataset(test_images, test_stock, test_labels)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 步驟 6: 訓練模型
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultimodalModel().to(device)
criterion = nn.MSELoss()  # 使用均方誤差作為損失函數
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # 使用 Adam 優化器

num_epochs = 1
for epoch in range(num_epochs):
    model.train()
    for batch in train_loader:
        kline_image = batch['kline_image'].to(device)
        stock_data = batch['stock_data'].to(device)
        label = batch['label'].to(device)

        optimizer.zero_grad()
        output = model(kline_image, stock_data)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

# 步驟 7: 回測
def backtest(model, test_loader):
    model.eval()
    predictions = []
    actuals = []
    with torch.no_grad():
        for batch in test_loader:
            kline_image = batch['kline_image'].to(device)
            stock_data = batch['stock_data'].to(device)
            label = batch['label'].to(device)
            output = model(kline_image, stock_data)

            # 確保每個批次的輸出被展平
            predictions.append(output.cpu().numpy().flatten())
            actuals.append(label.cpu().numpy().flatten())

    # 合併所有批次
    predictions = np.concatenate(predictions)
    actuals = np.concatenate(actuals)

    backtest_results = pd.DataFrame({
        'Predicted': predictions,
        'Actual': actuals
    })
    return backtest_results


results = backtest(model, test_loader)
print(results.head())

[*********************100%***********************]  1 of 1 completed


欄位名稱: Index(['Adj Close', 'Close', 'High', 'Low', 'Open', 'Volume'], dtype='object', name='Price')
生成的 K 線圖文件數量: 714
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])


  return F.mse_loss(input, target, reduction=self.reduction)


展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5]

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/1], Loss: 0.0002
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([32, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([32, 1600])
展平前的 CNN 輸出形狀: torch.Size([15, 64, 5, 5])
展平後的 CNN 輸出形狀: torch.Size([15, 1600])
   Predicted    Actual
0  -0.003142  0.007386
1   0.004692 -0.008741
2  -0.010572  0.004910
3  -0.001564 -0.001736
4   0.009724  0.024211
