# 雙動能交易策略 (Dual Momentum) 回測專案

In [1]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import xlsxwriter

# 1. 資料讀取與預處理
df = pd.read_excel('資料2.xlsx', header=0, index_col=0)
df_clean = df.ffill().bfill()
start_dates = df.apply(lambda x: x.first_valid_index())

# 2. 回測引擎實作
def calculate_momentum(prices, periods):
    moms = [prices.pct_change(p) for p in periods if p > 0]
    return pd.concat(moms, axis=1).mean(axis=1) if moms else pd.Series(0, index=prices.index)

def calculate_metrics(equity):
    if len(equity) < 2: return 0,0,0,0
    ret = equity.iloc[-1]/equity.iloc[0]-1
    yrs = max((equity.index[-1]-equity.index[0]).days/365.25, 0.1)
    cagr = (1+ret)**(1/yrs)-1 if ret>-1 else -1
    mdd = ((equity - equity.cummax())/equity.cummax()).min()
    calmar = cagr/abs(mdd) if mdd<0 else cagr
    win = (equity.pct_change()>0).mean()
    return cagr, mdd, calmar, win

def backtest(prices, asset_params, initial_capital=10000000.0):
    assets = prices.columns
    mom_df = pd.DataFrame({a: calculate_momentum(prices[a], asset_params[a]) for a in assets}, index=prices.index)
    cash, portfolio, history = initial_capital, {}, []
    for i, date in enumerate(prices.index):
        curr_prices = prices.loc[date]
        portfolio_value = sum(d['shares']*curr_prices[a] for a,d in portfolio.items())
        total_equity = portfolio_value + cash
        available = [a for a in assets if date >= start_dates[a]]
        selected = mom_df.loc[date, available][lambda x: x>0].sort_values(ascending=False).head(2).index.tolist()
        
        if i > 0:
            # Rebalance
            for a in [a for a in list(portfolio.keys()) if a not in selected]:
                cash += portfolio[a]['shares']*curr_prices[a]
                del portfolio[a]
            target = total_equity / 2.0
            for a in [a for a in selected if a not in portfolio]:
                buy_cash = min(cash, target)
                if buy_cash > 0:
                    portfolio[a] = {'shares': buy_cash / curr_prices[a]}
                    cash -= buy_cash
        
        history.append({
            'Date': date, 'Cash': cash, 'Equity': total_equity, 
            'Holdings': {a: d['shares'] for a,d in portfolio.items()}, 
            'Selected': selected, 'Moments': mom_df.loc[date].to_dict()
        })
    return pd.DataFrame(history).set_index('Date')

# 3. 最佳化 (ACO)
class ACO:
    def __init__(self, assets, prices):
        self.assets, self.prices, self.range = assets, prices, list(range(2, 53))
    def optimize(self):
        ph = [np.ones(len(self.range)) for _ in range(3)]
        best_p, best_c = [12, 12, 12], -1
        for _ in range(15):
            for _ in range(12):
                p = sorted([int(np.random.choice(self.range, p=p2**2/(p2**2).sum())) for p2 in ph])
                h = backtest(self.prices, {a: p for a in self.assets})
                _, _, c, _ = calculate_metrics(h['Equity'])
                if c > best_c: best_c, best_p = c, p
                for j in range(3): ph[j][self.range.index(p[j])] += max(0, c)
            for j in range(3): ph[j] *= 0.8
        return best_p, best_c

aco = ACO(df_clean.columns, df_clean)
print("Running optimization...")
best_params, _ = aco.optimize()
history = backtest(df_clean, {a: best_params for a in df_clean.columns})
cagr, mdd, calmar, win = calculate_metrics(history['Equity'])

# 4. 輸出 Excel 檔案
writer = pd.ExcelWriter('strategy_results.xlsx', engine='xlsxwriter')

# Sheet: Trades
trades_data = []
prev_h = {}
for date, row in history.iterrows():
    curr_h = row['Holdings']
    selected = row['Selected']
    moments = row['Moments']
    ret_pct = f"{(history['Equity'].pct_change().loc[date]*100):.2f}%" if not pd.isna(history['Equity'].pct_change().loc[date]) else "0.00%"
    
    for a in set(prev_h.keys()) | set(curr_h.keys()) | set(selected):
        status = ""
        if a in prev_h and a in curr_h: status = "每周平衡時保留倉與上一期相同之商品"
        elif a in curr_h and a not in prev_h: status = "買進新持有商品"
        elif a in prev_h and a not in curr_h: status = "賣出剃除商品"
        
        if status:
            trades_data.append({
                '買進日期': date, '標的名稱': a, '價格': df_clean.loc[date, a], 
                '股數': curr_h.get(a, prev_h.get(a, 0)), '狀態': status,
                '每期報酬表現': ret_pct, '最佳參數': str(best_params),
                '動能值': f"{moments.get(a, 0):.4f}", '選取原因': "相對動能前 2 且為正"
            })
    prev_h = curr_h
pd.DataFrame(trades_data).to_excel(writer, sheet_name='Trades', index=False)

# Sheet: Equity_Curve
history[['Equity']].to_excel(writer, sheet_name='Equity_Curve')

# Sheet: Equity_Hold
hold_summary = history.apply(lambda r: ",".join(r['Holdings'].keys()), axis=1)
pd.DataFrame({'持股檔數': history['Holdings'].apply(len), '明細': hold_summary}).to_excel(writer, sheet_name='Equity_Hold')

# Sheet: Summary
sum_data = [['CAGR', f'{cagr*100:.2f}%'], ['MaxDD', f'{abs(mdd)*100:.2f}%'], ['Calmar Ratio', f'{calmar:.2f}'], ['Win Rate', f'{win*100:.2f}%'], ['最佳參數', str(best_params)]]
summary_df = pd.DataFrame(sum_data, columns=['項目', '數值'])
summary_df.to_excel(writer, sheet_name='Summary', index=False)

# 年度報酬率
y_ret = history['Equity'].resample('YE').last().pct_change()
first_val = history['Equity'].resample('YE').last().iloc[0] / 10000000.0 - 1
y_df = pd.DataFrame({'年度': y_ret.index.year, '年化報酬率': y_ret.values})
y_df.iloc[0, 1] = first_val
y_df['年化報酬率'] = y_df['年化報酬率'].apply(lambda x: f"{x*100:.2f}%")
y_df.to_excel(writer, sheet_name='Summary', startrow=len(sum_data)+2, index=False)

writer.close()

# 參數高原表
plateau = []
for p in [[12], [26], [5, 15], [10, 20], [4, 10, 20]]:
    fp = p*3 if len(p)==1 else (p+[p[1]] if len(p)==2 else p)
    h = backtest(df_clean, {a: fp for a in df_clean.columns})
    c, m, cl, _ = calculate_metrics(h['Equity'])
    plateau.append({'參數組合': f'{p} 周', 'CAGR': f'{c*100:.1f}%', 'MaxDD': f'{abs(m)*100:.1f}%', 'Calmar': f'{cl:.2f}'})
pd.DataFrame(plateau).to_csv('plateau_table.csv', index=False)
print("Backtest Completed and Files Generated.")


Running optimization...


Backtest Completed and Files Generated.
