In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

In [30]:
class StockDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform
        self.image_names = os.listdir(image_dir)

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        image_name = self.image_names[idx]
        image_path = os.path.join(self.image_dir, image_name)
        label_path = os.path.join(self.label_dir, image_name.replace(".png", ".txt"))

        # 讀取影像
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # 讀取標籤並轉換為 int 型態
        with open(label_path, "r") as f:
            label = int(f.read().strip())  # 確保標籤為 int 類型

        return image, label

# 影像變換
transform = transforms.Compose([
    transforms.Resize((64, 64)),  # 調整影像大小
    transforms.ToTensor(),        # 轉換為 Tensor 格式
])

# 建立資料集與資料加載器
train_dataset = StockDataset(image_dir="./train_data/image", label_dir="./train_data/label", transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)


In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [31]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=2)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.dropout = nn.Dropout(0.3)
        
        # 自適應池化層將輸出調整為固定大小，這裡設置為 3x3
        self.adaptive_pool = nn.AdaptiveAvgPool2d((3, 3))
        
        # 全連接層
        self.fc1 = nn.Linear(128 * 3 * 3, 256)
        self.fc2 = nn.Linear(256, 2)  # 最後輸出 1 個數字，不再使用 Sigmoid

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x  # 直接返回線性輸出，不使用 Sigmoid

In [32]:
# 設置設備（如果有 GPU 就用 GPU）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 建立模型、損失函數和優化器
model = SimpleCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)

# 訓練模型
num_epochs = 500
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        # 初始化梯度
        optimizer.zero_grad()

        # 前向傳播
        outputs = model(images)
        loss = criterion(outputs, labels)

        # 反向傳播與優化
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

Epoch 1/500, Loss: 0.7005655765533447
Epoch 2/500, Loss: 0.692652336188725
Epoch 3/500, Loss: 0.6749545591218131
Epoch 4/500, Loss: 0.6495191965784345
Epoch 5/500, Loss: 0.6044888070651463
Epoch 6/500, Loss: 0.5476020957742419
Epoch 7/500, Loss: 0.4929038882255554
Epoch 8/500, Loss: 0.43646055459976196
Epoch 9/500, Loss: 0.4291723498276302
Epoch 10/500, Loss: 0.38941396134240286
Epoch 11/500, Loss: 0.3442677514893668
Epoch 12/500, Loss: 0.33065531722136904
Epoch 13/500, Loss: 0.28832243170057026
Epoch 14/500, Loss: 0.2551618835755757
Epoch 15/500, Loss: 0.23496026545763016
Epoch 16/500, Loss: 0.2222244686314038
Epoch 17/500, Loss: 0.2092806931052889
Epoch 18/500, Loss: 0.17648611749921525
Epoch 19/500, Loss: 0.1890683929835047
Epoch 20/500, Loss: 0.16747991421392985
Epoch 21/500, Loss: 0.14871336200407573
Epoch 22/500, Loss: 0.13454450986215047
Epoch 23/500, Loss: 0.13347047248056956
Epoch 24/500, Loss: 0.11593397387436458
Epoch 25/500, Loss: 0.10286049172282219
Epoch 26/500, Loss: 0.1

In [33]:
from datetime import datetime, timedelta

### 自動生成 24 個月份的列表，從 2021 年 1 月開始

In [34]:
def generate_date_list(start_date, months):
    date_list = []
    start = datetime.strptime(start_date, "%Y%m%d")
    for i in range(months):
        year_month = start + timedelta(days=30 * i)  # 每次增加一個月
        date_str = year_month.strftime("%Y%m01")  # 每個月的第一天
        date_list.append(date_str)
    return date_list

In [35]:
import yfinance as yf
import mplfinance as mpf
import matplotlib.pyplot as plt
import pandas as pd

In [36]:
# 將 Matplotlib 圖像 fig 轉換為 PIL Image
def fig_to_image(fig):
    # 將圖像渲染為二進位 RGB 數據
    fig.canvas.draw()
    img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    img = img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    
    # 將 numpy array 轉換為 PIL Image
    pil_image = Image.fromarray(img)
    return pil_image

# 將 PIL Image 轉換為 PyTorch 張量
def image_to_tensor(pil_image):
    transform = transforms.Compose([
        transforms.Resize((64, 64)),  # 確保圖像大小與模型輸入匹配
        transforms.ToTensor(),
    ])
    tensor_image = transform(pil_image)
    return tensor_image.unsqueeze(0)  # 增加批次維度 [1, C, H, W]

In [45]:
# 股票代碼
stock_id = "2330.TW"

# 過去十年資料，加上四天來計算五日移動平均
start_date = "2024-01-01"  # 提前四天
end_date = "2024-09-01"
interval_days = 4  # 每5天截圖一次

In [48]:
# 下載股票資料並移除缺失值
ticker = yf.Ticker(stock_id)
stock_data = ticker.history(start=start_date, end=end_date, interval="1d")
stock_data = stock_data.apply(pd.to_numeric, errors='coerce').dropna().astype(float)

# 計算五日均線
stock_data['SMA_5'] = stock_data['Close'].rolling(window=5).mean()


for i in range(0, len(stock_data), interval_days):
    idx = stock_data.index[i]
    plot_data = stock_data.loc[idx - pd.Timedelta(days=4):idx]  # 繪製該日期前的完整數據

    # 檢查是否有足夠的數據
    if len(plot_data) < 5:  # 確保5天數據充足
        continue

    # 繪製 K 線圖，不顯示 SMA
    fig, ax = mpf.plot(
        plot_data,
        type='candle',
        style='charles',
        ylabel="",
        volume=False,
        xrotation=0,
        returnfig=True
    )
    # 移除 X, Y 軸的刻度與標籤
    ax[0].set_xticks([])
    ax[0].set_yticks([])
    ax[0].set_xlabel("")
    ax[0].set_ylabel("")
    ax[0].set_title("")

    pil_image = fig_to_image(fig)
    tensor_image = image_to_tensor(pil_image).to(device)
    plt.close(fig) 

    model.eval()
    with torch.no_grad():
        output = model(tensor_image)
        prediction = torch.argmax(output, dim=1).item()

    row = plot_data.iloc[-1]  # 取當前最後一天的數據
    trend_label = 1 if row['Close'] > row['SMA_5'] else 0
    print(f"{(idx - pd.Timedelta(days=4)).strftime('%Y-%m-%d')}~{idx.strftime('%Y-%m-%d')} Prediction:{'漲' if prediction==1 else '跌'}")
    print(f"{(idx - pd.Timedelta(days=4)).strftime('%Y-%m-%d')}~{idx.strftime('%Y-%m-%d')} Truth：{'漲' if trend_label==1 else '跌'}")

2024-01-08~2024-01-12 Prediction:漲
2024-01-08~2024-01-12 Truth：跌
2024-03-04~2024-03-08 Prediction:漲
2024-03-04~2024-03-08 Truth：漲
2024-04-15~2024-04-19 Prediction:跌
2024-04-15~2024-04-19 Truth：跌
2024-05-20~2024-05-24 Prediction:漲
2024-05-20~2024-05-24 Truth：漲


  img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
  img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
  img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
  img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
