In [1]:
import yfinance as yf
import numpy as np
import pandas as pd
import cvxpy as cp
from yahooquery import Ticker
import matplotlib.pyplot as plt

In [None]:
# Assets
tickers = [
    'IBM', 'GE', 'XOM', 'JNJ', 'KO', 'PG', 'MMM', 'BA', 'CAT', 'CVX',
    'DIS', 'HON', 'MCD', 'MRK', 'PFE', 'TRV', 'WMT', 'PEP', 'AXP', 'BK',
    'CL', 'DE', 'EMR', 'GD', 'LMT', 'MO', 'SYY', 'TXN', 'UNP', 'USB',
    'WFC', 'XEL', 'F', 'DD', 'ECL', 'AA', 'ED', 'TGT',
    'HPQ', 'XRX', 'AVT',
    'AIG', 'BAC', 'C', 'MMC'
]
n = len(tickers)

t = Ticker(tickers)
df_raw = t.history(start='1980-01-01', end='2024-12-31', interval='1d')

# 重設 index，把 'symbol' 拿出來變成欄位
df_raw = df_raw.reset_index()

# 抽出「調整後收盤價」
df_adjclose = df_raw.pivot(index='date', columns='symbol', values='adjclose')
df = df_adjclose.copy()
df

In [None]:
# ------------------------------- Global variable --------------------------------
C_TARGET       = np.log(1.5) / 252          # LDP target c
UB             = 0.10                       # weight upper bound
TRAIN_YEARS    = 20                         # rolling window
BACKTEST_START = "2000-01-01"               # start point of test data
# -----------------------------------------------------------------------

# ------------------------- 1. 日報酬資料 ---------------------------------
simple_return = df.pct_change().dropna()
simple_return.index = pd.to_datetime(simple_return.index)

train = simple_return.loc[:'1999-12-31']       # estimate μ、Σ
test  = simple_return.loc[BACKTEST_START:]     # backtest

tickers, n = list(df.columns), df.shape[1]

# ------------------------- 2. 取得權重函式 --------------------------------
def get_weights(ret: pd.DataFrame, method: str,
                c=C_TARGET, ub=UB) -> np.ndarray:
    """
    method ∈ {'mv', 'ldp', 'eq'}
    """
    n_   = ret.shape[1]
    mu   = ret.mean().values                
    Sigma = ret.cov().values

    if method == 'eq':
        return np.full(n_, 1/n_)

    if method == 'mv':
        w = cp.Variable(n_)

        objective_mv = cp.Maximize(w @ mu - 0.5*cp.quad_form(w, Sigma))
        constraints_mv = [
            cp.sum(w) == 1, 
            w >= 0, 
            w <= ub
        ]

        problem_mv = cp.Problem(
            objective_mv, 
            constraints_mv
        )
        problem_mv.solve()

        return w.value

    if method == 'ldp':
        v = np.linalg.inv(Sigma) @ mu
        d = mu @ v
        w_star = np.sqrt(2*c/d) * v         # closed form solution
        w_var  = cp.Variable(n_)
        cp.Problem(cp.Minimize(cp.sum_squares(w_var - w_star)),
                   [cp.sum(w_var) == 1, w_var >= 0, w_var <= ub]
        ).solve()

        return w_var.value


# ------------------------- 3. 投組 NAV 工具 ------------------------------
def apply_portfolio(ret_df, w, start_nav=1.0):
    """weighted cumulative NAV"""
    return pd.Series(start_nav * (1 + ret_df.values @ w).cumprod(), index=ret_df.index)

def max_drawdown(nav: pd.Series) -> float:
    peak = nav.cummax()
    return float((nav/peak - 1).min())

# ------------------------- 4. 年度重平衡迴圈 ------------------------------
years = sorted(test.index.year.unique())    # 2000, 2001, …

# (a) 初始權重（用 1999-12-31 前資料估計）
w_ldp_cur = get_weights(simple_return.loc[:'1999-12-31'], 'ldp')
w_mv_cur  = get_weights(simple_return.loc[:'1999-12-31'], 'mv')
w_eq_cur  = np.full(n, 1/n)

nav_ldp = nav_mv = nav_eq = 1.0
wealth_ldp, wealth_mv, wealth_eq = [], [], []
turn_ldp, turn_mv, turn_eq = [], [], []

results_lst = []

for TRAIN_YEARS in range(10, 21):          # 10,11,…,20
    # -------- (A) 重新初始化一切狀態 ----------------------------------
    nav_ldp = nav_mv = nav_eq = 1.0

    w_ldp_cur = get_weights(simple_return.loc[:'1999-12-31'], 'ldp')
    w_mv_cur  = get_weights(simple_return.loc[:'1999-12-31'], 'mv')
    w_eq_cur  = np.full(n, 1/n)

    wealth_ldp, wealth_mv, wealth_eq = [], [], []
    turn_ldp,  turn_mv,  turn_eq  = [], [], []

    # -------- (B) 年度回圈 -------------------------------------------
    for yr in years:
        yr_start = pd.Timestamp(f"{yr}-01-01")
        yr_end   = pd.Timestamp(f"{yr}-12-31")

        # (1) rolling 訓練集
        train_ret = simple_return.loc[
            yr_start - pd.DateOffset(years=TRAIN_YEARS):
            yr_start - pd.DateOffset(days=1)
        ]

        # (2) 新權重
        w_mv  = get_weights(train_ret, 'mv')
        w_ldp = get_weights(train_ret, 'ldp')
        w_eq  = get_weights(train_ret, 'eq')

        # (3) 當年 NAV
        yr_ret = test.loc[str(yr)]
        wealth_mv .append(apply_portfolio(yr_ret, w_mv , nav_mv ))
        wealth_ldp.append(apply_portfolio(yr_ret, w_ldp, nav_ldp))
        wealth_eq .append(apply_portfolio(yr_ret, w_eq , nav_eq ))

        # (4) turnover
        grow = (1 + yr_ret).prod().values
        w_mv_end  = (w_mv_cur  * grow) / (w_mv_cur  * grow).sum()
        w_ldp_end = (w_ldp_cur * grow) / (w_ldp_cur * grow).sum()
        w_eq_end  = (w_eq_cur  * grow) / (w_eq_cur  * grow).sum()

        turn_mv .append(np.abs(w_mv_end  - w_mv ).sum())
        turn_ldp.append(np.abs(w_ldp_end - w_ldp).sum())
        turn_eq .append(np.abs(w_eq_end  - w_eq ).sum())

        # (5) 更新狀態
        nav_mv , nav_ldp, nav_eq = (wealth_mv [-1].iloc[-1],
                                    wealth_ldp[-1].iloc[-1],
                                    wealth_eq [-1].iloc[-1])
        w_mv_cur, w_ldp_cur, w_eq_cur = w_mv, w_ldp, w_eq

    # -------- (C) 彙整 --------------------------------------
    wealth = pd.concat([
        pd.concat(wealth_ldp).rename("LDP-rebalanced"),
        pd.concat(wealth_mv ).rename("MV-rebalanced"),
        pd.concat(wealth_eq ).rename("Equal-rebalanced")
    ], axis=1)



    # -------- (D) 績效指標 ------------------------------------------
    def perf(nav, to):
        ret = nav.pct_change().dropna()
        ann_r = (1+ret).prod()**(252/len(ret)) - 1
        ann_v = ret.std()*np.sqrt(252)
        return {
            "Ann_Return":   ann_r,
            "Ann_Vol":      ann_v,
            "Sharpe":       ann_r/ann_v if ann_v>0 else np.nan,
            "Max_Drawdown": max_drawdown(nav),
            "Turnover":     np.mean(to)
        }

    summary = pd.DataFrame({
        "LDP-rebalanced"  : perf(wealth["LDP-rebalanced"],  turn_ldp),
        "MV-rebalanced"   : perf(wealth["MV-rebalanced"],   turn_mv ),
        "Equal-rebalanced": perf(wealth["Equal-rebalanced"],turn_eq)
    }).T.round(4)

    print(f"\n===== Performance Summary ({TRAIN_YEARS}y rolling) =====")
    print(summary)
    results_lst.append(summary)

    # --------- 畫圖 ----------------
    fig, ax = plt.subplots(figsize=(18, 6))
    wealth.plot(ax=ax)
    ax.set_title(f"{TRAIN_YEARS}-Year Rolling Window, Annual Rebalance")
    ax.set_ylabel("Wealth")
    ax.legend(); plt.tight_layout(); plt.show(); plt.close(fig)