In [26]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense, Input
from sklearn.model_selection import train_test_split


df = pd.read_csv('pca_residual_test_supervised.csv')


X = df[[f'lag_{i}' for i in range(1, 15)]].values
y = df[['target_day_1', 'target_day_2', 'target_day_3']].values


X = X.reshape((X.shape[0], 14, 1))


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [27]:
from tensorflow.keras.layers import GRU, Dense, Input, Dropout
from tensorflow.keras.optimizers import Adam

# 建立模型
model = Sequential()
model.add(Input(shape=(14, 1)))        # 輸入為 14 天 × 1 維特徵
model.add(GRU(32, activation='tanh', return_sequences=False))
model.add(Dropout(0.06))                # 防止過擬合
model.add(Dense(3))                    # 預測未來三天殘差

# 編譯模型
model.compile(optimizer=Adam(0.001), loss='mse', metrics=['mae'])

# 訓練模型
history = model.fit(X_train, y_train, epochs=5, batch_size=32, validation_split=0.1)

Epoch 1/5
[1m56720/56720[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m191s[0m 3ms/step - loss: 5.0328e-04 - mae: 0.0122 - val_loss: 5.0372e-04 - val_mae: 0.0121
Epoch 2/5
[1m56720/56720[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m166s[0m 3ms/step - loss: 4.6397e-04 - mae: 0.0122 - val_loss: 5.0336e-04 - val_mae: 0.0123
Epoch 3/5
[1m56720/56720[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m169s[0m 3ms/step - loss: 4.2987e-04 - mae: 0.0122 - val_loss: 5.0226e-04 - val_mae: 0.0122
Epoch 4/5
[1m56720/56720[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m179s[0m 3ms/step - loss: 4.2828e-04 - mae: 0.0122 - val_loss: 5.0257e-04 - val_mae: 0.0121
Epoch 5/5
[1m56720/56720[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m165s[0m 3ms/step - loss: 4.3778e-04 - mae: 0.0122 - val_loss: 5.0329e-04 - val_mae: 0.0121


In [38]:
from sklearn.metrics import mean_absolute_error

y_pred = model.predict(X_test)
diff = y_pred - y_test
result_df = pd.DataFrame({
    "True_day1": y_test[:, 0], "Pred_day1": y_pred[:, 0], "Diff_day1": diff[:, 0],
    "True_day2": y_test[:, 1], "Pred_day2": y_pred[:, 1], "Diff_day2": diff[:, 1],
    "True_day3": y_test[:, 2], "Pred_day3": y_pred[:, 2], "Diff_day3": diff[:, 2],
})


[1m15756/15756[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 1ms/step


In [39]:
result_df

Unnamed: 0,True_day1,Pred_day1,Diff_day1,True_day2,Pred_day2,Diff_day2,True_day3,Pred_day3,Diff_day3
0,-0.021507,-0.000979,0.020528,0.002347,0.001957,-0.000390,-0.017759,0.000479,0.018237
1,-0.001965,-0.003209,-0.001244,-0.007294,-0.000459,0.006835,-0.004214,-0.001767,0.002447
2,0.002162,-0.000858,-0.003020,0.013109,0.001768,-0.011340,-0.023958,0.000515,0.024472
3,0.002812,-0.003665,-0.006476,-0.000975,-0.000899,0.000076,-0.012281,-0.002235,0.010046
4,-0.013255,-0.001463,0.011792,0.001902,0.001296,-0.000606,-0.014956,-0.000064,0.014891
...,...,...,...,...,...,...,...,...,...
504167,-0.005153,-0.000251,0.004901,-0.038328,0.002510,0.040837,0.007608,0.001181,-0.006427
504168,-0.000628,-0.002110,-0.001482,-0.003174,0.000540,0.003715,0.000497,-0.000724,-0.001221
504169,0.001172,-0.002038,-0.003210,-0.004235,0.000705,0.004940,0.015068,-0.000626,-0.015694
504170,0.013183,-0.001858,-0.015041,-0.003128,0.000746,0.003874,0.005225,-0.000474,-0.005699


In [30]:
result_df = pd.DataFrame({
    "True_day1": y_test[:, 0],
    "Pred_day1": y_pred[:, 0],
    "Diff_day1": diff[:, 0],
    "True_day2": y_test[:, 1],
    "Pred_day2": y_pred[:, 1],
    "Diff_day2": diff[:, 1],
    "True_day3": y_test[:, 2],
    "Pred_day3": y_pred[:, 2],
    "Diff_day3": diff[:, 2],
})


# 指標分析
for day in [1, 2, 3]:
    true_col = f"True_day{day}"
    pred_col = f"Pred_day{day}"
    mae = mean_absolute_error(result_df[true_col], result_df[pred_col])
    mse = mean_squared_error(result_df[true_col], result_df[pred_col])
    acc = np.mean((result_df[true_col] > 0) == (result_df[pred_col] > 0))
    print(f"📅 Day {day}")
    print(f"MAE: {mae:.4f}")
    print(f"MSE: {mse:.4f}")
    print(f"方向預測準確率: {acc * 100:.2f}%")
    print("-" * 30)

[1m15756/15756[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 1ms/step
📅 Day 1
MAE: 0.0120
MSE: 0.0004
方向預測準確率: 56.68%
------------------------------
📅 Day 2
MAE: 0.0121
MSE: 0.0004
方向預測準確率: 48.08%
------------------------------
📅 Day 3
MAE: 0.0120
MSE: 0.0005
方向預測準確率: 53.70%
------------------------------


In [43]:
# 預測總和作為 Signal
result_df["Signal"] = result_df["Pred_day1"] + result_df["Pred_day2"] + result_df["Pred_day3"]

# 加入門檻判斷進場
threshold = 0
def signal_to_position(signal):
    if signal > threshold:
        return 1  # 做多
    elif signal < -threshold:
        return -1  # 做空
    else:
        return 0  # 不進場

result_df["Position"] = result_df["Signal"].apply(signal_to_position)

# 隔日進場，持有三天
result_df["Strategy_Return"] = (
    result_df["Position"].shift(1) * (
        result_df["True_day1"] + result_df["True_day2"]+ result_df["True_day3"]
    )
)

result_df = result_df.dropna(subset=["Strategy_Return"])

# 計算累積報酬與績效
result_df["Cumulative_Strategy_Return"] = result_df["Strategy_Return"].cumsum()
result_df["Cumulative_Benchmark"] = (
    result_df["True_day1"] + result_df["True_day2"]+ result_df["True_day3"]
).cumsum()

# 總報酬與勝率
total_return = result_df["Cumulative_Strategy_Return"].iloc[-1]
win_rate = np.mean(result_df["Strategy_Return"] > 0)

# 年化報酬與夏普值
daily_return = result_df["Strategy_Return"]
annual_return = daily_return.mean() * 252
annual_volatility = daily_return.std() * np.sqrt(252)
sharpe = annual_return / annual_volatility if annual_volatility > 0 else 0

print(f"總報酬（3日持有策略）: {total_return:.4f}")
print(f"勝率: {win_rate * 100:.2f}%")
print(f"年化報酬率: {annual_return:.4f}")
print(f"年化波動率: {annual_volatility:.4f}")
print(f"夏普比率: {sharpe:.4f}")

✅ 總報酬（3日持有策略）: 29.8335
✅ 勝率: 49.77%
✅ 年化報酬率: 0.0149
✅ 年化波動率: 0.5983
✅ 夏普比率: 0.0249


In [14]:
# 計算三天預測殘差總和作為決策指標
result_df["Signal"] = result_df["Pred_day1"] + result_df["Pred_day2"] + result_df["Pred_day3"]

# 訊號判斷：正做多、負做空
result_df["Position"] = np.where(result_df["Signal"] > 0, 1, -1)

# 策略報酬（用 True_day1 近似）
result_df["Strategy_Return"] = result_df["Position"] * result_df["True_day1"]

# 累積策略報酬 vs Benchmark
result_df["Cumulative_Strategy_Return"] = result_df["Strategy_Return"].cumsum()
result_df["Cumulative_Benchmark"] = result_df["True_day1"].cumsum()

# 總報酬與勝率
total_return = result_df["Cumulative_Strategy_Return"].iloc[-1]
win_rate = np.mean(result_df["Strategy_Return"] > 0)

print(f"✅ 殘差套利策略總報酬（以三天預測合計判斷）：{total_return:.4f}")
print(f"✅ 勝率：{win_rate * 100:.2f}%")

✅ 殘差套利策略總報酬（以三天預測合計判斷）：17651.8735
✅ 勝率：50.31%


In [19]:
# 計算 Signal
result_df["Signal"] = result_df["Pred_day1"] + result_df["Pred_day2"] + result_df["Pred_day3"]

# 加入門檻進場條件
def signal_to_position(signal, threshold=0.5):
    if signal > threshold:
        return 1  # 做多
    elif signal < -threshold:
        return -1  # 做空
    else:
        return 0   # 觀望

result_df["Position"] = result_df["Signal"].apply(signal_to_position)

# 策略報酬：Position × True_day1
result_df["Strategy_Return"] = result_df["Position"] * result_df["True_day1"]
result_df["Cumulative_Strategy_Return"] = result_df["Strategy_Return"].cumsum()

# Benchmark（不動作）
result_df["Cumulative_Benchmark"] = result_df["True_day1"].cumsum()

# 每日報酬資訊
daily_returns = result_df["Strategy_Return"]

# 報酬統計：年化報酬、波動、夏普
days_per_year = 252
mean_daily_return = daily_returns.mean()
std_daily_return = daily_returns.std()

annual_return = mean_daily_return * days_per_year
annual_volatility = std_daily_return * np.sqrt(days_per_year)
sharpe_ratio = annual_return / annual_volatility if annual_volatility > 0 else 0

# 顯示結果
print(f"✅ 年化報酬率: {annual_return:.4f}")
print(f"✅ 年化波動率: {annual_volatility:.4f}")
print(f"✅ 夏普比率: {sharpe_ratio:.4f}")
print(f"✅ 觀察次數: {len(result_df)}，進場次數: {(result_df['Position'] != 0).sum()}")

✅ 年化報酬率: 2.9993
✅ 年化波動率: 3.7110
✅ 夏普比率: 0.8082
✅ 觀察次數: 504172，進場次數: 14269
