### 🌐 Weekly Macro Indicator Download

This section downloads weekly data for key macroeconomic signals that are used as input features for the model:

| Indicator        | Source Symbol | Description |
|------------------|---------------|-------------|
| **VIX**          | `^VIX`        | CBOE Volatility Index (market fear gauge) |
| **10Y Yield**    | `^TNX`        | 10-Year U.S. Treasury yield (interest rate proxy) |
| **USD Index**    | `DX-Y.NYB`    | Strength of the U.S. dollar |
| **Crude Oil**    | `CL=F`        | WTI Crude Oil futures price |

All indicators are:
- Downloaded at **weekly frequency** using Yahoo Finance
- Aligned on the same date index as the ETF data
- The 10-year yield is converted to a % by multiplying by `0.1`


In [None]:
import os
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta

YEARS = 10

# ETF list
etf_list = [
    'XLK', 'XLF', 'XLV', 'XLE', 'XLI', 'XLY', 'XLP', 'XLRE', 'XLU', 'XLB', 'XLC',
    'SOXX', 'SH', 'DOG', 'RWM', 'ITA', 'JETS', 'PSQ', 'VNQ', 'SPY'
]

# Date range
end_date = datetime.today().strftime('%Y-%m-%d')
start_date = (datetime.today() - timedelta(weeks=YEARS*52)).strftime('%Y-%m-%d')

print(f"📅 Downloading data from {start_date} to {end_date}")

# Ensure dataset/ exists
dataset_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'dataset'))
if not os.path.isdir(dataset_path):
    raise FileNotFoundError(f"🚫 'dataset/' folder not found at {dataset_path}")

# Containers
adjclose_data, volume_data, high_data, low_data = {}, {}, {}, {}

# Download each ETF
for symbol in etf_list:
    print(f"⬇️ Downloading {symbol}...")
    data = yf.download(symbol, start=start_date, end=end_date, interval='1wk', auto_adjust=False)
    if not data.empty:
        adjclose_data[symbol] = data[['Adj Close']].rename(columns={'Adj Close': symbol})
        volume_data[symbol] = data[['Volume']].rename(columns={'Volume': symbol})
        high_data[symbol] = data[['High']].rename(columns={'High': symbol})
        low_data[symbol] = data[['Low']].rename(columns={'Low': symbol})

# Merge and clean
def combine_and_save(data_dict, filename):
    df = pd.concat(data_dict.values(), axis=1)
    df = df.apply(pd.to_numeric, errors='coerce')
    df.index = pd.to_datetime(df.index, errors='coerce')
    df = df[~df.index.duplicated(keep='first')].sort_index()
    df.dropna(axis=0, how='all', inplace=True)
    path = os.path.join(dataset_path, filename)
    df.to_csv(path)
    print(f"✅ Saved: {filename}")
    return df

# Save all
price_df = combine_and_save(adjclose_data, 'etf_prices_weekly.csv')
volume_df = combine_and_save(volume_data, 'etf_volume_weekly.csv')
high_df = combine_and_save(high_data, 'etf_high_weekly.csv')
low_df = combine_and_save(low_data, 'etf_low_weekly.csv')

# Preview
price_df.head()

# Macro indicator tickers on Yahoo Finance
macro_tickers = {
    'VIX': '^VIX',               # Volatility Index
    '10Y_Yield': '^TNX',         # 10-Year Treasury Yield (multiply by 0.1)
    'USD_Index': 'DX-Y.NYB',     # U.S. Dollar Index
    'WTI_Crude': 'CL=F'          # Crude Oil (WTI)
}

# Date range matching your ETF backtest period
end_date = datetime.today().strftime('%Y-%m-%d')
start_date = (datetime.today() - timedelta(weeks=YEARS*52)).strftime('%Y-%m-%d')

# Download weekly data
macro_data = {}
for name, ticker in macro_tickers.items():
    print(f"Downloading {name} ({ticker})...")
    data = yf.download(ticker, start=start_date, end=end_date, interval='1wk', auto_adjust=False)
    macro_data[name] = data[['Close']].rename(columns={'Close': name})

# Combine all macro indicators into one DataFrame
macro_df = pd.concat(macro_data.values(), axis=1)

# Fix 10Y yield scale
if '10Y_Yield' in macro_df.columns:
    macro_df['10Y_Yield'] = macro_df['10Y_Yield'] * 0.1

# Drop missing rows
macro_df.dropna(inplace=True)
macro_df.columns = pd.Index(list(macro_tickers.keys()))

# Save to CSV
macro_save_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'dataset', 'macro_indicators_weekly.csv'))
macro_df.to_csv(macro_save_path)
print(f"✅ Macro indicators saved to: {macro_save_path}")

# Preview

# macro_df = macro_df.apply(pd.to_numeric, errors='coerce')
# macro_df.index = pd.to_datetime(macro_df.index)
macro_df = macro_df[~macro_df.index.duplicated(keep='first')]
macro_df.sort_index(inplace=True)
macro_df.head()


### 🧠 Feature Engineering

This section prepares input features for the machine learning model.

#### 📈 ETF-Specific Features:
For each ETF, we will compute:
- **1-week return**: Short-term price movement
- **3-week return**: Medium-term trend
- **6-week return**: Momentum across a longer window
- **Streak**: Number of consecutive up weeks

#### 🌐 Macro Indicators:
From the macro_df, we already have:
- **VIX**
- **10Y Treasury Yield**
- **USD Index**
- **Crude Oil Price**

These will be aligned with the ETF data by date and merged in.

#### 📦 Resulting Feature Matrix:
For each ETF on each week:
- One row = a snapshot of that ETF and macro environment
- Target = the **next week's return** for that ETF


In [None]:
import pandas as pd
import os
from ta import momentum, trend, volume

# === Paths ===
price_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'dataset', 'etf_prices_weekly.csv'))
volume_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'dataset', 'etf_volume_weekly.csv'))
num_etf = 0

# === Helper to load ETF CSVs ===
def load_etf_csv(path, name='[unknown]'):
    global num_etf
    try:
        header_row = pd.read_csv(path, header=None, nrows=2)
        columns = header_row.iloc[1].tolist()[1:]
        df = pd.read_csv(path, skiprows=3, header=None)
        df = df.iloc[:, :len(columns) + 1]
        df.columns = ['Date'] + columns
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
        df = df.set_index('Date')
        df = df.apply(pd.to_numeric, errors='coerce')
        num_etf = len(columns)
        print(f"✅ Loaded {name} with {len(columns)} tickers")
        return df
    except Exception as e:
        print(f"❌ Failed to load {name}: {e}")
        raise

# === Load data ===
price_df = load_etf_csv(price_path, name='ETF Prices')
volume_df = load_etf_csv(volume_path, name='ETF Volume')

# === Feature storage ===
features_all = []
skipped = []

# === Feature generation loop ===
for symbol in price_df.columns:
    if symbol not in volume_df.columns:
        print(f"⚠️ Skipping {symbol}: volume data missing.")
        skipped.append(symbol)
        continue

    df = pd.DataFrame(index=price_df.index)
    df['close'] = price_df[symbol]
    df['volume'] = volume_df[symbol]

    try:
        # === Return-based technical indicators ===
        df[f'{symbol}_ret_1w'] = df['close'].pct_change(1)
        df[f'{symbol}_ret_3w'] = df['close'].pct_change(3)
        df[f'{symbol}_ret_6w'] = df['close'].pct_change(6)

        high = df['close'].rolling(window=14).max()
        low = df['close'].rolling(window=14).min()
        df[f'{symbol}_stoch_k'] = 100 * (df['close'] - low) / (high - low)
        df[f'{symbol}_stoch_d'] = df[f'{symbol}_stoch_k'].rolling(window=3).mean()
        df[f'{symbol}_williams_r'] = -100 * (high - df['close']) / (high - low)

        df[f'{symbol}_cci'] = trend.cci(high=df['close'], low=df['close'], close=df['close'], window=20)
        df[f'{symbol}_rsi'] = momentum.rsi(df['close'], window=14)
        df[f'{symbol}_obv'] = volume.on_balance_volume(df['close'], df['volume'])

        df[f'{symbol}_macd'] = trend.macd(df['close'])
        df[f'{symbol}_macd_signal'] = trend.macd_signal(df['close'])
        df[f'{symbol}_macd_diff'] = trend.macd_diff(df['close'])

        # === Price/volume and its variation ===
        df[f'{symbol}_price_change'] = df['close'].pct_change(1)
        df[f'{symbol}_volume_change'] = df['volume'].pct_change(1)

        # === Short-term KST (custom) ===
        roc1 = df['close'].pct_change(10)
        roc2 = df['close'].pct_change(15)
        roc3 = df['close'].pct_change(20)
        roc4 = df['close'].pct_change(30)
        df[f'{symbol}_kst_short'] = (
            roc1.rolling(10).mean() +
            roc2.rolling(10).mean() * 2 +
            roc3.rolling(10).mean() * 3 +
            roc4.rolling(15).mean() * 4
        )

        derived_cols = df.columns.difference(['close', 'volume'])
        feature_df = df[derived_cols].copy()

        features_all.append(feature_df)
        print(f"📈 {symbol}: {feature_df.dropna(how='all').shape[0]} valid rows")

    except Exception as e:
        print(f"❌ Error processing {symbol}: {e}")
        skipped.append(symbol)

# === Final merge ===
if len(features_all) == 0:
    raise ValueError("🛑 No valid ETF features generated.")

features_df = pd.concat(features_all, axis=1).sort_index()
features_df = features_df[~features_df.index.duplicated(keep='first')]

print("📀 Final feature shape:", features_df.shape)

# === Save to CSV ===
base_dir = os.path.abspath(os.path.join(os.getcwd(), '..', 'dataset'))
fname = os.path.join(base_dir, f'weekly_{num_etf}_etf_tech_features.csv')
features_df.to_csv(fname)
print(f"✅ Saved features to: {fname}")

### 📌 Deep Sector Rotation Strategy with Shock-Aware Early Exit

This strategy builds on the "Deep Sector Rotation" approach proposed in [SSRN-4280640](https://papers.ssrn.com/sol3/papers.cfm?abstract_id=4280640), with the following modifications:

---

#### 🧠 Core Model (MLP)

- A multi-layer perceptron (MLP) is trained to predict next-week returns for each ETF independently.
- Features include:
  - Past 1w, 3w, 6w returns
  - Volume (log normalized)
  - Macro indicators (VIX, 10Y yield, USD index, oil)
  - Streak up count (3-week up trend)
  - Shock Amplify features:
    - This week
    - 1-week lag
    - 3-week average

---

#### 🔁 Weekly Rotation Rule (baseline)

- Each week (e.g., Monday), predict returns for all ETFs using the MLP.
- Rank the ETFs by predicted return.
- Buy top-N (e.g., 3) ETFs.
- Hold for 1 week (unless overridden by shock rule below).

---

#### ⚡ Shock Amplify Early Exit Rule (custom addition)

- Each day (or evaluation step), check for ETFs in the portfolio with:
  - `Shock_Amplify_3w` > +10% or < -10%
- If triggered:
  - Sell that ETF immediately.
  - Immediately start a new turn (predict again, re-select top-N).

---

#### 💼 Goal

- Combine deep learning-based prediction with handcrafted rules for volatility control.
- Achieve more stable and responsive ETF swing trading performance.


In [None]:
import os
import re
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
from tqdm import trange
from heapq import heappush, heappushpop
from torch.utils.data import DataLoader
from torchmetrics import MeanAbsolutePercentageError
from model_definition import StockDataset, StockPredictionTransformer

def clear_line(n=1):
    LINE_UP = '\033[1A'
    LINE_CLEAR = '\x1b[2K'
    for _ in range(n):
        print(LINE_UP, end=LINE_CLEAR)

# 超參數設定
INPUT_DIM = 32
ETF_EMBEDDING_DIM = 32
TRANSFORMER_DIM = 64
OUTPUT_DIM = 1 # 預測下週漲跌幅度 (單一數值)
SEQ_LEN = 4
NUM_HEADS = 2
NUM_LAYERS = 2
BATCH_SIZE = 32
LEARNING_RATE = 1E-3
NUM_EPOCHS = 250
SAMPLING_INTERVAL = 1
# 1. 資料準備 
TEST_PERCENTAGE = 0.25

etf_list = [
    'XLK', 'XLF', 'XLV', 'XLE', 'XLI', 'XLY', 'XLP', 'XLRE', 'XLU', 'XLB', 'XLC',
    'SOXX', 'SH', 'DOG', 'RWM', 'ITA', 'JETS', 'PSQ', 'VNQ', 'SPY'
]

# 範例 ETF 資料 (DataFrame) - 假設你的 ETF 資料是 DataFrame 格式，每行代表一天一個 ETF 的資料
# 實際情況你需要從你的資料來源載入
base_dir = os.path.abspath(os.path.join(os.getcwd(), '..', 'dataset'))
feature_file = [res.group(0) for f in os.listdir(base_dir) if (res := re.search(r'weekly_(\d+)_etf_tech_features.csv', f)) is not None]
etf_data = pd.read_csv(os.path.join(base_dir, feature_file[0]))

# 範例 Macro 指標資料 (DataFrame) - 假設你的 Macro 指標資料是 DataFrame 格式，每行代表一天的 Macro 指標
# 實際情況你需要從你的資料來源載入
macro_data = pd.read_csv(os.path.join(base_dir, 'macro_indicators_weekly.csv'), index_col=0)

# 建立資料集和資料載入器
dataset = StockDataset(etf_list, etf_data, macro_data, 
                       test_percentage=TEST_PERCENTAGE, 
                       sequence_length=SEQ_LEN, 
                       sampling_interval=SAMPLING_INTERVAL)
dataset.train()
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
dataset.train()
test_dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=False) # batch_size 可以與訓練時相同，shuffle=False

class customMetric():
    def __init__(self):
        self.mape_metric = MeanAbsolutePercentageError.to(device)

    def __call__(self, pred, y):
        mape = self.mape_metric(pred, y)
        direction_loss = 1 - (torch.sign(pred) == torch.sign(y)).float().mean()
        return 0.5 * mape +  0.5 * direction_loss

# 建立模型、損失函數和優化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = StockPredictionTransformer(ETF_EMBEDDING_DIM,
                                   dataset.num_etf_features, 
                                   dataset.num_macros,
                                   INPUT_DIM,
                                   TRANSFORMER_DIM,
                                   SEQ_LEN, 
                                   OUTPUT_DIM,
                                   len(etf_list),
                                   NUM_HEADS, 
                                   NUM_LAYERS).to(device)

model.device = device
criterion = nn.MSELoss() # 均方誤差損失函數 (回歸任務)
metrics = customMetric()
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, factor=np.sqrt(np.exp(-1)), patience=20, min_lr=1E-7
)

k = 3
best_k_models = []

# 訓練迴圈
iterator = trange(NUM_EPOCHS)
best_test_loss_epoch = [1E308, 0]
for epoch in iterator:
    total_loss = 0

    # Train
    model.train() # 設定模型為訓練模式
    dataset.train()
    for batch in dataloader:
        etf_features = batch['etf_features'].to(model.device) # [seq_len, feature_dim]
        macro_features = batch['macro_features'].to(model.device) # [seq_len, feature_dim]
        targets = batch['targets'].to(model.device) # [seq_len, 1]
        etf_indices = batch['etf_index'].to(model.device)

        # 前向傳播
        outputs = model(etf_features, macro_features, etf_indices) # [output_dim]

        # 計算損失 (只取最後一個時間步的目標值進行比較，範例簡化處理)
        loss = criterion(outputs, targets) # targets[-1] 取最後一個時間步的目標值，並移除 batch_size 維度

        # 反向傳播和優化
        optimizer.zero_grad() # 清空梯度
        loss.backward() # 反向傳播計算梯度
        optimizer.step() # 更新模型參數

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader) 
    scheduler.step(avg_loss)

    # Test
    model.eval()
    dataset.test()
    with torch.no_grad(): # 關閉梯度計算
        avg_valid_metrics = 0

        for batch in test_dataloader:
            etf_features = batch['etf_features'].to(model.device)
            macro_features = batch['macro_features'].to(model.device)
            targets = batch['targets'].to(model.device)
            etf_indices = batch['etf_index'].to(model.device)

            # 前向傳播
            outputs = model(etf_features, macro_features, etf_indices) # [output_dim]
            valid_metrics += metrics(outputs, targets).item()

        avg_valid_metrics = valid_metrics / len(dataloader) 

    if len(best_k_models) >= k:
        heappushpop(best_k_models, (-avg_valid_metrics, model.state_dict()))
    else:
        heappush(best_k_models, (-avg_valid_metrics, model.state_dict()))

    best_test_loss_epoch = [avg_valid_metrics, epoch + 1] if max(model[0] for model in best_k_models) == -avg_valid_metrics else best_test_loss_epoch
    msg = f"Epoch {epoch+1} lr={scheduler.get_last_lr()[0]:.2e}, Average Loss: {avg_loss:.2e}" + \
          f", Average Valid Metrics: {avg_valid_metrics:.2e}" + \
          f", Best test loss {best_test_loss_epoch[0]:.2e} at epoch {best_test_loss_epoch[1]} "
    iterator.set_description(msg)

print(f"Metrics of top {k} best models: {[f'{-m[0]:.2e}' for m in sorted(best_k_models, key=lambda x: -x[0])]}")
model.load_state_dict(best_k_models[0][1])
print("Training finished!")

Generated 4627 samples.


Epoch 300 lr=1.00e-03, Average Loss: 1.05e-01, Average Test Loss: 2.52e-01, Best test loss 2.24e-01 at epoch 24 : 100%|██████████| 300/300 [03:23<00:00,  1.47it/s]

Loss of top 3 best models: ['2.24e-01', '2.27e-01', '2.30e-01']
Training finished!





In [6]:
def evaluation(model:nn.Module, dataset:StockDataset, test=True):
    # 設定模型為評估模式
    model.eval()

    # 準備測試資料集和資料載入器
    if test:
        dataset.test()
    else:
        dataset.train() # 使用與訓練集相同的 ETF 列表
    test_dataloader = DataLoader(dataset, batch_size=32, shuffle=False) # batch_size 可以與訓練時相同，shuffle=False

    predictions = {} # 儲存所有預測結果
    actual_targets = {} # 儲存所有真實目標值
    date_of_inputs = {}

    with torch.no_grad(): # 關閉梯度計算
        for batch in test_dataloader:
            etf_features = batch['etf_features'].to(model.device)
            macro_features = batch['macro_features'].to(model.device)
            targets = batch['targets'].to(model.device)
            dates = batch['dates']
            etf_symbol = batch['etf_symbol']
            etf_indices = batch['etf_index'].to(model.device)

            # 前向傳播
            outputs = model(etf_features, macro_features, etf_indices) # [output_dim]

            # 將預測結果和真實目標值轉換為 NumPy array 並儲存
            for etf, d, pred, real in zip(etf_symbol, dates, outputs.cpu().numpy(), targets.cpu().numpy()):
                predictions[etf] = np.hstack([predictions.setdefault(etf, []), pred])
                actual_targets[etf] = np.hstack([actual_targets.setdefault(etf, []), real])
                date_of_inputs.setdefault(etf, []).append(d.split(',')[-1])

    # # 將預測結果和真實目標值列表轉換為 NumPy array

    for etf in predictions:
        predictions[etf] = np.array(predictions[etf])
        actual_targets[etf] = np.array(actual_targets[etf]).flatten()
        date_of_inputs[etf] = np.array(date_of_inputs[etf]).flatten()

    return predictions, actual_targets, date_of_inputs

predictions, actual_targets, date_of_inputs = evaluation(model, dataset, test=False)
predictions.keys()

dict_keys(['XLK', 'XLF', 'XLV', 'XLE', 'XLI', 'XLY', 'XLP', 'XLRE', 'XLU', 'XLB', 'XLC', 'SOXX', 'SH', 'DOG', 'RWM', 'ITA', 'JETS', 'PSQ', 'VNQ', 'SPY'])

In [7]:
import plotly.graph_objs as go
from plotly.subplots import make_subplots

etf = 'XLE'
sorted_idx = date_of_inputs[etf].argsort()

fig = make_subplots(rows=1, cols=1, shared_xaxes=True)
fig.add_trace(go.Scatter(x = date_of_inputs[etf][sorted_idx], y = predictions[etf][sorted_idx], name='Predictions'), 1, 1)
fig.add_trace(go.Scatter(x = date_of_inputs[etf][sorted_idx], y = actual_targets[etf][sorted_idx], name='Real'), 1, 1)
fig.update_layout(title=f'{etf}')