In [28]:
import yfinance as yf
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import matplotlib.dates as mdates

In [29]:
# 1. 抓取股價資料
def get_stock_data(stock_id, start_date, end_date):
    df = yf.download(stock_id, start=start_date, end=end_date)
    df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
    df.columns = ['開盤價', '最高價', '最低價', '收盤價', '成交量']
    return df

stock_data = get_stock_data('2330.TW', '2020-01-01', '2023-01-01')
stock_data.head(10)

[*********************100%***********************]  1 of 1 completed


Unnamed: 0_level_0,開盤價,最高價,最低價,收盤價,成交量
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-01-02,332.5,339.0,332.5,339.0,31754120
2020-01-03,344.0,345.0,335.5,339.5,41811268
2020-01-06,333.0,334.5,332.0,332.0,45343057
2020-01-07,332.5,333.0,326.5,329.5,50879181
2020-01-08,325.0,333.0,325.0,329.5,37567748
2020-01-09,335.0,337.5,333.5,337.5,31481504
2020-01-10,340.5,341.0,336.0,339.5,27032115
2020-01-13,342.0,342.0,339.0,341.5,30663332
2020-01-14,345.5,346.0,344.5,346.0,30223993
2020-01-15,345.0,345.0,337.5,340.0,47434274


In [30]:
# 2. 標記漲跌標籤
def create_labels(df):
    df['5_SMA'] = df['收盤價'].rolling(window=5).mean()
    df['10_SMA'] = df['收盤價'].rolling(window=10).mean()
    df['Label'] = np.where(df['5_SMA'] > df['10_SMA'], 1, 0)  # 漲為1，跌為0
    df.dropna(inplace=True)  # 移除 NaN
    return df

df = create_labels(stock_data)
df.head(10)

Unnamed: 0_level_0,開盤價,最高價,最低價,收盤價,成交量,5_SMA,10_SMA,Label
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2020-01-15,345.0,345.0,337.5,340.0,47434274,340.9,337.4,1
2020-01-16,330.0,336.5,330.0,334.5,55211420,340.3,336.95,1
2020-01-17,334.0,335.5,332.0,333.0,52060097,339.0,336.3,1
2020-01-20,334.0,335.5,333.0,333.0,32658203,337.3,336.4,1
2020-01-30,326.0,326.0,316.5,316.5,125451049,331.4,335.1,0
2020-01-31,323.0,323.5,319.0,320.0,62526055,327.4,334.15,0
2020-02-03,315.0,316.5,312.0,315.0,59560849,323.5,331.9,0
2020-02-04,336.5,337.0,325.0,325.0,51890496,321.9,330.45,0
2020-02-05,329.0,329.5,324.5,327.5,36601453,320.8,329.05,0
2020-02-06,329.5,332.5,329.0,332.5,29267094,324.0,327.7,0


In [31]:
# 3. 保存 K 線圖
def save_candlestick_images(df, output_dir='candlestick_images'):
    os.makedirs(output_dir, exist_ok=True)

    for i in range(20, len(df)):
        subset = df.iloc[i-20:i]
        label = df.iloc[i]['Label']
        label_str = 'up' if label == 1 else 'down'
        save_path = os.path.join(output_dir, f"candlestick_{i}_{label_str}.png")  # 文件名
        try:
            fig, ax = plt.subplots(figsize=(6, 4))
            for idx, row in subset.iterrows():
                date = mdates.date2num(idx)
                open_price, high, low, close = row['開盤價'], row['最高價'], row['最低價'], row['收盤價']
                color = 'green' if close >= open_price else 'red'

                rect = plt.Rectangle((date - 0.2, min(open_price, close)), 0.4, abs(close - open_price), color=color)
                ax.add_patch(rect)
                plt.plot([date, date], [low, high], color=color)

            ax.xaxis_date()
            ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
            plt.xticks(rotation=45)
            plt.axis('off')
            plt.grid(False)

            plt.savefig(save_path, bbox_inches='tight', pad_inches=0)
            plt.close(fig)
        except Exception as e:
            print(f"Failed to save candlestick image for index {i}: {e}")

# 呼叫函數保存 K 線圖
save_candlestick_images(df)

In [33]:
# 4. 定義 Dataset 和 DataLoader
class KLineDataset(Dataset):
    def __init__(self, df, img_dir, transform=None):
        self.df = df
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df) - 20  # 減去偏移量 20，確保索引範圍內

    def __getitem__(self, idx):
        if idx + 20 >= len(self.df):  # 確保索引不超出範圍
            raise IndexError(f"Index {idx} out of range for dataset of size {len(self.df)}")

        row = self.df.iloc[idx]
        label = row['Label']
        img_path = os.path.join(self.img_dir, f"candlestick_{idx+20}_{'up' if label == 1 else 'down'}.png")

        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image file not found: {img_path}")

        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)

        numeric_features = torch.tensor([row['開盤價'], row['最高價'], row['最低價'], row['成交量']], dtype=torch.float32)
        return img, numeric_features, torch.tensor(label, dtype=torch.long)

transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor()
])
# 分割數據集
dataset = KLineDataset(df, img_dir='candlestick_images', transform=transform)
dataset_length = len(dataset)  # 獲取修正後的數據集長度
train_size = int(0.8 * dataset_length)
test_size = dataset_length - train_size

train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [34]:
# 5. 定義多模態模型 (早期融合)
class MultiModalModel(nn.Module):
    def __init__(self):
        super(MultiModalModel, self).__init__()
        # CNN 模型
        self.cnn = models.resnet18(pretrained=True)
        self.cnn.fc = nn.Identity()  # 去掉全連接層

        # 數值特徵
        self.fc_numeric = nn.Sequential(
            nn.Linear(4, 32),
            nn.ReLU()
        )

        # 融合層
        self.fc_combined = nn.Sequential(
            nn.Linear(512 + 32, 64),
            nn.ReLU(),
            nn.Linear(64, 2)  # 2分類
        )

    def forward(self, img, numeric_features):
        img_features = self.cnn(img)
        numeric_features = self.fc_numeric(numeric_features)
        combined = torch.cat((img_features, numeric_features), dim=1)
        output = self.fc_combined(combined)
        return output

In [35]:
# 6. 模型訓練與測試
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultiModalModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
print(model)

MultiModalModel(
  (cnn): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track



In [36]:
def train_model(model, train_loader, test_loader, epochs=10):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for imgs, numeric_features, labels in train_loader:
            imgs, numeric_features, labels = imgs.to(device), numeric_features.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(imgs, numeric_features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {train_loss:.4f}")

        # 評估模型
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for imgs, numeric_features, labels in test_loader:
                imgs, numeric_features, labels = imgs.to(device), numeric_features.to(device), labels.to(device)
                outputs = model(imgs, numeric_features)
                preds = torch.argmax(outputs, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        accuracy = accuracy_score(all_labels, all_preds)
        print(f"Test Accuracy: {accuracy:.4f}")

train_model(model, train_loader, test_loader, epochs=10)

FileNotFoundError: Image file not found: candlestick_images\candlestick_54_down.png