設計一個多模態模型，採用(a)早期融合、(b)晚期融合或(c)中期融合的方式進行數據整合（擇一實現）。多模態資料來源可包括以下組合之一：
    
    1. 新聞情緒指標 + 股價資料
    2. K 線圖 + 股價資料
模型目標可針對分類任務（如股價漲跌預測）或回歸任務（如股價變動幅度預測）。

In [1]:
!pip install mplfinance



In [2]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import yfinance as yf
import datetime as dt
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

# 設定設備
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 設定股票代號與時間範圍
stock_id = '2330.TW'
end = dt.date.today()
start = end - dt.timedelta(days=3650)
df_stat = yf.download(stock_id, start=start, end=end)

# 新增移動平均線和其他技術指標
df_stat['SMA_5'] = df_stat['Close'].rolling(window=5).mean()
df_stat['SMA_20'] = df_stat['Close'].rolling(window=20).mean()
df_stat['RSI'] = 100 - (100 / (1 + df_stat['Close'].diff().gt(0).rolling(14).sum() /
                                 df_stat['Close'].diff().lt(0).rolling(14).sum()))
df_stat = df_stat.dropna()

# 添加標籤 (簡單比較 SMA_5 與 SMA_20)
df_stat['Label'] = (df_stat['SMA_5'] > df_stat['SMA_20']).astype(int)

[*********************100%***********************]  1 of 1 completed


In [3]:
# 生成 K 線圖並保存
def generate_candlestick_images(df, save_dir, window_size=5):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for i in range(window_size, len(df)):
        window_data = df.iloc[i-window_size:i]
        label = int(df['Label'].iloc[i])  # 確保標籤為整數
        filepath = os.path.join(save_dir, f"{i}_label_{label}.png")

        # 繪製 K 線圖
        fig, ax = plt.subplots(figsize=(6, 4))
        ax.plot(window_data.index, window_data['Close'], label='Close', color='black', lw=2)
        ax.plot(window_data.index, window_data['SMA_5'], label='SMA_5', color='blue', linestyle='--')
        ax.plot(window_data.index, window_data['SMA_20'], label='SMA_20', color='red', linestyle='--')
        ax.legend()
        plt.tight_layout()
        plt.savefig(filepath)
        plt.close(fig)

generate_candlestick_images(df_stat, save_dir="./candlestick_images", window_size=5)

# 特徵縮放
features = df_stat[['Close', 'SMA_5', 'SMA_20', 'RSI']].values
labels = df_stat['Label'].values
feature_scaler = MinMaxScaler()
scaled_features = feature_scaler.fit_transform(features)

# 構建序列數據
N = 5  # 窗口大小
X, y = [], []
for i in range(N, len(scaled_features)):
    X.append(scaled_features[i-N:i])
    y.append(labels[i])
X, y = np.array(X), np.array(y)

# 分割數據集
train_size = int(len(X) * 0.7)
train_features, test_features = X[:train_size], X[train_size:]
train_labels, test_labels = y[:train_size], y[train_size:]

  label = int(df.iloc[i]['Label'])  # 確保標籤為整數


In [4]:
# 檢查缺失影像並補充
def check_and_generate_missing_images(indices, features, labels, df, save_dir, window_size):
    for idx in indices:
        if idx < window_size or idx >= len(df):
            continue  # 超出範圍的索引

        # 確保影像存在
        label = labels[idx - window_size]  # 測試數據的標籤
        filepath = os.path.join(save_dir, f"{idx}_label_{label}.png")
        if not os.path.exists(filepath):
            print(f"Missing file detected, generating: {filepath}")
            window_data = df.iloc[idx-window_size:idx]
            fig, ax = plt.subplots(figsize=(6, 4))
            ax.plot(window_data.index, window_data['Close'], label='Close', color='black', lw=2)
            ax.plot(window_data.index, window_data['SMA_5'], label='SMA_5', color='blue', linestyle='--')
            ax.plot(window_data.index, window_data['SMA_20'], label='SMA_20', color='red', linestyle='--')
            ax.legend()
            plt.tight_layout()
            plt.savefig(filepath)
            plt.close(fig)

test_indices = list(range(len(test_features)))
check_and_generate_missing_images(
    indices=[idx + N for idx in test_indices],
    features=test_features,
    labels=test_labels,
    df=df_stat,
    save_dir="./candlestick_images",
    window_size=N
)

In [5]:


# 自定義 Dataset
class MultimodalStockDataset(Dataset):
    def __init__(self, features, labels, image_dir, indices, transform=None):
        self.features = features
        self.labels = labels
        self.image_dir = image_dir
        self.indices = indices
        self.transform = transform

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        seq_features = torch.tensor(self.features[self.indices[idx]], dtype=torch.float32)
        label = int(self.labels[self.indices[idx]])  # 確保標籤為整數

        # 加載對應的 K 線圖
        image_path = os.path.join(self.image_dir, f"{self.indices[idx]+N}_label_{label}.png")
        if not os.path.exists(image_path):
            raise FileNotFoundError(f"Image not found: {image_path}")

        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        return image, seq_features, torch.tensor(label, dtype=torch.long)

# 定義圖像變換
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# 構建 Dataset 和 DataLoader
train_indices = list(range(len(train_features)))
test_indices = list(range(len(test_features)))

train_dataset = MultimodalStockDataset(train_features, train_labels, "./candlestick_images", train_indices, transform)
test_dataset = MultimodalStockDataset(test_features, test_labels, "./candlestick_images", test_indices, transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [6]:
# 定義多模態模型
class MultimodalModel(nn.Module):
    def __init__(self, seq_input_dim, hidden_dim):
        super(MultimodalModel, self).__init__()
        self.cnn = models.resnet18(pretrained=True)
        self.cnn.fc = nn.Linear(self.cnn.fc.in_features, 128)
        self.rnn = nn.LSTM(input_size=seq_input_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc_seq = nn.Linear(hidden_dim, 128)
        self.fc_combined = nn.Linear(256, 2)  # 二分類

    def forward(self, image, sequence_features):
        image_features = self.cnn(image)
        _, (hidden, _) = self.rnn(sequence_features)
        seq_features = self.fc_seq(hidden[-1])
        combined = torch.cat((image_features, seq_features), dim=1)
        output = self.fc_combined(combined)
        return output

In [23]:
# 初始化模型
seq_input_dim = train_features.shape[2]
hidden_dim = 32
model = MultimodalModel(seq_input_dim, hidden_dim).to(device)

# 訓練模型
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)

def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, seq_features, labels in train_loader:
            images, seq_features, labels = images.to(device), seq_features.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images, seq_features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

train_model(model, train_loader, criterion, optimizer)



Epoch [1/10], Loss: 0.1816
Epoch [2/10], Loss: 0.1114
Epoch [3/10], Loss: 0.0818
Epoch [4/10], Loss: 0.0507
Epoch [5/10], Loss: 0.0777
Epoch [6/10], Loss: 0.0649
Epoch [7/10], Loss: 0.0370
Epoch [8/10], Loss: 0.0281
Epoch [9/10], Loss: 0.0631
Epoch [10/10], Loss: 0.0320


In [24]:
# 模型評估並打印實際值與預測值 (只打印前五個)
def evaluate_and_print_predictions(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for images, seq_features, labels in test_loader:
            images, seq_features, labels = images.to(device), seq_features.to(device), labels.to(device)
            outputs = model(images, seq_features)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    accuracy = correct / total
    print(f"\nAccuracy: {accuracy:.4f}")

    # 打印前五個實際值與預測值
    print("\nReal Labels vs Predictions (Top 5):")
    for real, pred in zip(all_labels[:5], all_preds[:5]):
        print(f"Real: {real}, Predicted: {pred}")

    return accuracy, all_labels, all_preds

# 執行模型評估
accuracy, all_labels, all_preds = evaluate_and_print_predictions(model, test_loader, device)


Accuracy: 0.6196

Real Labels vs Predictions (Top 5):
Real: 0, Predicted: 0
Real: 0, Predicted: 0
Real: 0, Predicted: 0
Real: 0, Predicted: 0
Real: 1, Predicted: 0
