In [None]:
# Imports
import pandas as pd
import yfinance as yf


def load_close_prices(tickers, period="6mo", interval="1d"):
    """
    從 yfinance 下載多檔/單檔，回傳 DataFrame：
    index=日期，columns=各股票的 Close 或 Adj Close
    """
    df = yf.download(
        tickers,
        period=period,
        interval=interval,
        auto_adjust=True,  # 自動用 Adj Close 調整
        progress=False,
        group_by="column",  # 多檔時給 MultiIndex: (field, ticker)
    )

    # --- 多檔股票：df.columns 是 MultiIndex -> 第一層是 field（'Close' 或 'Adj Close'）
    if isinstance(df.columns, pd.MultiIndex):
        lvl0 = df.columns.get_level_values(0)
        field = "Adj Close" if "Adj Close" in lvl0 else "Close"
        if field not in lvl0:
            raise KeyError("未找到 Close/Adj Close 多層欄位")
        close = df[field]

        # 只保留需要的 tickers 順序（避免順序不同或缺失造成 KeyError）
        close = close.reindex(columns=[t for t in tickers if t in close.columns])

    # --- 單檔股票：df.columns 是一般 Index
    else:
        if "Adj Close" in df.columns:
            close = df["Adj Close"].to_frame(tickers[0])
        elif "Close" in df.columns:
            close = df["Close"].to_frame(tickers[0])
        else:
            raise KeyError("未找到 Close/Adj Close 欄位(單檔)")

    # 最終回傳（這一行要在函式層級，**不要**縮到 else 裡）
    return close.dropna(how="all")

    # 移動平均線def generate_signals(prices):


def perf_stats(eq, daily_ret):
    stats = {
        "CumReturn": (eq.iloc[-1] / eq.iloc[0]) - 1,
        "CAGR": (eq.iloc[-1] / eq.iloc[0]) ** (252 / len(eq)) - 1,
        "Sharpe": daily_ret.mean() / daily_ret.std() * (252**0.5),
        "MaxDD": ((eq / eq.cummax()) - 1).min(),
        "HitRate": (daily_ret > 0).sum() / len(daily_ret),
    }
    return stats


def backtest(close, ma_short=20, ma_long=50):
    """
    簡單的 Moving Average (MA) 策略回測：
    - 當短均線 > 長均線 -> 持有
    - 當短均線 < 長均線 -> 空倉
    """
    daily_ret = close.pct_change().fillna(0)

    ma_s = close.rolling(ma_short).mean()
    ma_l = close.rolling(ma_long).mean()

    signal = (ma_s > ma_l).astype(int)  # 短均線高於長均線 = 1，否則 0
    pos = signal.shift(1).fillna(0)  # 用昨天信號決定今天持倉

    strat_ret = pos * daily_ret
    equity = (1 + strat_ret).cumprod()

    bt = pd.DataFrame(
        {
            "Close": close,
            "MA_Short": ma_s,
            "MA_Long": ma_l,
            "Signal": signal,
            "Position": pos,
            "DailyRet": strat_ret,
            "Equity": equity,
        }
    )
    return bt

In [None]:
def generate_signals(prices):
    import pandas as pd

    signals = pd.DataFrame(index=prices.index)

    for ticker in prices.columns:
        s = prices[ticker]

        # MA20 / MA50
        ma20 = s.rolling(20).mean()
        ma50 = s.rolling(50).mean()

        # RSI(14)
        delta = s.diff()
        gain = delta.clip(lower=0).rolling(14).mean()
        loss = (-delta.clip(upper=0)).rolling(14).mean()
        rs = gain / loss
        rsi14 = 100 - (100 / (1 + rs))

        # 規則：MA20>MA50 做多，MA20<MA50 做空（可再加上 RSI 濾網）
        sig = pd.Series(0, index=s.index)
        sig[ma20 > ma50] = 1
        sig[ma20 < ma50] = -1
        # 例如加 RSI 濾網（可選）：
        # sig[(rsi14 > 70)] = -1
        # sig[(rsi14 < 30)] = 1

        signals[ticker] = sig

    return signals

In [None]:
import inspect

print(inspect.signature(generate_signals))

In [None]:
def perf_stats(eq, daily_ret):
    n = len(eq)
    # 總報酬
    cum_return = eq.iloc[-1] / eq.iloc[0] - 1.0
    # 年化報酬
    cagr = eq.iloc[-1] ** (252.0 / n) - 1.0
    # 年化波動
    vol = daily_ret.std() * (252**0.5)
    # Sharpe Ratio
    sharpe = daily_ret.mean() / daily_ret.std() * (252**0.5)
    # 勝率
    hit_rate = (daily_ret > 0).sum() / n

    return {
        "CumReturn": cum_return,
        "CAGR": cagr,
        "Volatility": vol,
        "Sharpe": sharpe,
        "HitRate": hit_rate,
    }

In [None]:
TICKERS = ["AAPL", "MSFT", "GOOG", "TSLA"]
PERIOD = "6mo"
INTERVAL = "1d"

prices = load_close_prices(TICKERS, PERIOD, INTERVAL)
print("shape:", prices.shape)
print("columns:", list(prices.columns))
display(prices.tail())

In [None]:
pick = "AAPL"
s = prices[pick]
ma20 = s.rolling(20).mean()
ma50 = s.rolling(50).mean()

import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(s.index, s, label=f"{pick} Close")
plt.plot(ma20.index, ma20, label="MA20")
plt.plot(ma50.index, ma50, label="MA50")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Imports


def load_close_prices(tickers, period="6mo", interval="1d"):
    """
    從 yfinance 下載多檔/單檔，回傳 DataFrame：
    index=日期，columns=各股票的 Close 或 Adj Close
    """
    df = yf.download(
        tickers,
        period=period,
        interval=interval,
        auto_adjust=True,  # 自動用 Adj Close 調整
        progress=False,
        group_by="column",  # 多檔時給 MultiIndex: (field, ticker)
    )

    # --- 多檔股票：df.columns 是 MultiIndex -> 第一層是 field（'Close' 或 'Adj Close'）
    if isinstance(df.columns, pd.MultiIndex):
        lvl0 = df.columns.get_level_values(0)
        field = "Adj Close" if "Adj Close" in lvl0 else "Close"
        if field not in lvl0:
            raise KeyError("未找到 Close/Adj Close 多層欄位")
        close = df[field]

        # 只保留需要的 tickers 順序（避免順序不同或缺失造成 KeyError）
        close = close.reindex(columns=[t for t in tickers if t in close.columns])

    # --- 單檔股票：df.columns 是一般 Index
    else:
        if "Adj Close" in df.columns:
            close = df["Adj Close"].to_frame(tickers[0])
        elif "Close" in df.columns:
            close = df["Close"].to_frame(tickers[0])
        else:
            raise KeyError("未找到 Close/Adj Close 欄位(單檔)")

    # 最終回傳（這一行要在函式層級，**不要**縮到 else 裡）
    return close.dropna(how="all")

In [None]:
# ============================================================
# Hybrid-AI Aggressive Trading － 一貼即跑的端到端小型管線
# 依賴套件：yfinance、pandas、numpy、matplotlib
# ============================================================

import warnings

warnings.filterwarnings("ignore")
import numpy as np
import matplotlib.pyplot as plt

# -----------------------------
# 0) 參數
# -----------------------------
TICKERS = ["AAPL", "MSFT", "GOOG", "TSLA"]  # 想測誰就改
PERIOD = "6mo"
INTERVAL = "1d"
INIT_CAPITAL = 1.0

# 成本/槓桿參數
TX_COST_BPS = 5  # 單邊交易成本 5 bps = 0.05%
BORROW_APR = 0.06  # 槓桿借貸年化成本(示意)
LEV_BASE = 1.0  # 基本不加槓桿
LEV_STRONG = 2.0  # 訊號較強時的槓桿
LEV_SUPER = 3.0  # 訊號最強時的槓桿
DD_CUTOFF = 0.15  # 最近60日回撤 > 15% 時，強制降回 1x

DAILY_BORROW = BORROW_APR / 252.0
TX_COST = TX_COST_BPS / 10000.0

# -----------------------------
# 1) 下載並抽出 Close/Adj Close（回傳: columns = TICKERS）
# -----------------------------


def load_close_prices(tickers, period="6mo", interval="1d"):
    df = yf.download(
        tickers,
        period=period,
        interval=interval,
        auto_adjust=True,
        progress=False,
        group_by="column",
    )

    # 多檔股票：yfinance 會回 MultiIndex 欄位 -> (field, ticker)
    if isinstance(df.columns, pd.MultiIndex):
        lvl0 = df.columns.get_level_values(0)
        field = "Adj Close" if "Adj Close" in lvl0 else "Close"
        if field not in lvl0:
            raise KeyError("未找到 Close/Adj Close 多層欄位")
        close = df[field]
        # 只保留你要求的 tickers 順序（避免順序或缺券造成 KeyError）
        close = close.reindex(columns=[t for t in tickers if t in close.columns])

    # 單檔股票：欄位是單層
    else:
        if "Adj Close" in df.columns:
            close = df["Adj Close"].to_frame(tickers[0])
        elif "Close" in df.columns:
            close = df["Close"].to_frame(tickers[0])
        else:
            raise KeyError("未找到 Close/Adj Close 欄位（單檔）")

    # ←← 很關鍵：這個 return 要與 if/else 同一層（函式層級），不要縮在 else 裡
    return close.dropna(how="all")


# -----------------------------
# 2) 技術指標：MA20 / MA50 / RSI(14)
# -----------------------------
def rsi(series, period=14):
    delta = series.diff()
    gain = delta.clip(lower=0).rolling(period, min_periods=period).mean()
    loss = (-delta.clip(upper=0)).rolling(period, min_periods=period).mean()
    rs = gain / loss
    return 100.0 - (100.0 / (1.0 + rs))


def make_indicators(close_df):
    ma20 = close_df.rolling(20, min_periods=1).mean()
    ma50 = close_df.rolling(50, min_periods=1).mean()
    rsi14 = close_df.apply(rsi, raw=False)
    return ma20, ma50, rsi14


# -----------------------------
# 3) 規則→權重（未正規化）
# 強多：Close>MA20 且 MA20>MA50 且 RSI>60 → 權重 1.0
# 超強：強多 且 MA20 斜率>0 且 RSI>65 → 額外 +0.5 (總 1.5)
# 中性：Close>MA50 且 RSI>50 且 不是強多 → 0.5
# 其他：0
# -----------------------------
def raw_weights(close, ma20, ma50, rsi14):
    slope20 = ma20.diff()  # 斜率近似
    strong = (close.gt(ma20)) & (ma20.gt(ma50)) & (rsi14.gt(60))
    super_ = strong & (slope20.gt(0)) & (rsi14.gt(65))
    neutral = (close.gt(ma50)) & (rsi14.gt(50)) & (~strong)

    w_raw = (
        neutral.astype(float) * 0.5
        + strong.astype(float) * 1.0
        + super_.astype(float) * 0.5
    )
    # 全部0時：給每檔同權重（避免空倉影響比較）
    row_sum = w_raw.sum(axis=1)
    w = w_raw.div(row_sum.replace(0, np.nan), axis=0).fillna(1.0 / len(w_raw.columns))
    return w


# -----------------------------
# 4) 風險開關 & 槓桿
# 最近60日回撤>DD_CUTOFF → 槓桿=1
# 其餘依整體訊號強度決定：平均權重>0.9 → 3x；>0.7 → 2x；否則 1x
# 借貸成本按 (lev-1) * DAILY_BORROW
# -----------------------------
def max_drawdown(series):
    roll_max = series.cummax()
    dd = series / roll_max - 1.0
    return dd.min()


def choose_leverage(eq_curve, avg_weight_row):
    # 近60日回撤檢查
    if len(eq_curve) >= 60:
        dd_60 = max_drawdown(eq_curve.tail(60))
        if dd_60 < -DD_CUTOFF:
            return LEV_BASE
    # 依訊號強度
    s = avg_weight_row
    if s > 0.9:
        return LEV_SUPER
    elif s > 0.7:
        return LEV_STRONG
    else:
        return LEV_BASE


# -----------------------------
# 5) 回測主函數（含成本/槓桿）
# -----------------------------
def backtest(close):
    rets = close.pct_change().fillna(0.0)

    # 指標與權重
    MA20, MA50, RSI14 = make_indicators(close)
    W = raw_weights(close, MA20, MA50, RSI14)

    # 交易成本（以權重變化估轉手率）
    W_shift = W.shift(1).fillna(0.0)
    turnover = (W - W_shift).abs().sum(axis=1)  # 當天調倉的總變動
    tx_cost = turnover * TX_COST

    # 槓桿 & 借貸成本
    port_eq = pd.Series(INIT_CAPITAL, index=close.index, dtype=float)
    daily_ret = pd.Series(0.0, index=close.index, dtype=float)

    for i, dt in enumerate(close.index):
        if i == 0:
            continue
        # 槓桿決策（用到前一日的權重平均 & 權益曲線）
        avg_w = W.iloc[i - 1].mean()
        lev = choose_leverage(port_eq.iloc[:i], avg_w)

        # 當日投資組合（用前一日權重 * 當日資產報酬）
        r_gross = (W.iloc[i - 1] * rets.iloc[i]).sum()

        # 成本：交易 + 借貸
        borrow_cost = (lev - 1.0) * DAILY_BORROW
        r_net = lev * r_gross - tx_cost.iloc[i] - borrow_cost

        daily_ret.iloc[i] = r_net
        port_eq.iloc[i] = port_eq.iloc[i - 1] * (1.0 + r_net)

    return {
        "Equity": port_eq,
        "DailyRet": daily_ret,
        "Weights": W,
        "Turnover": turnover,
    }


# -----------------------------
# 6) 績效統計
# -----------------------------
def perf_stats(eq, daily_ret):
    n = len(eq)
    if condition:
        pass
    cagr = eq.iloc[-1] ** (252.0 / n) - 1.0
    rollmax = eq.cummax()
    maxdd = (eq / rollmax - 1.0).min()
    annvol = daily_ret.std() * np.sqrt(252.0)
    annret = daily_ret.mean() * 252.0
    sharpe = 0.0 if annvol == 0 else annret / annvol
    hitrate = (daily_ret > 0).sum() / max(1, (daily_ret != 0).sum())
    return {
        "CumReturn": eq.iloc[-1] - 1.0,
        "CAGR": cagr,
        "MaxDD": maxdd,
        "AnnVol": annvol,
        "Sharpe": sharpe,
        "HitRate": hitrate,
    }


# -----------------------------
# 7) 一次執行：下載→回測→表格→圖
# -----------------------------
close = load_close_prices(TICKERS, PERIOD, INTERVAL)
assert not close.empty, "下載不到價格資料，請檢查網路或 ticker 代碼"

bt = backtest(close)


# 績效表
def perf_stats(eq, daily_ret):
    n = len(eq)
    if n == 0:
        return {
            "CumReturn": 0.0,
            "CAGR": 0.0,
            "Volatility": 0.0,
            "Sharpe": 0.0,
            "HitRate": 0.0,
        }

    # 累積 / 年化報酬
    cum_return = eq.iloc[-1] / eq.iloc[0] - 1.0
    cagr = (eq.iloc[-1] / eq.iloc[0]) ** (252.0 / n) - 1.0

    # 年化波動與 Sharpe（無風險利率近似 0）
    vol = float(daily_ret.std()) * (252.0**0.5)
    mean = float(daily_ret.mean())
    sharpe = 0.0 if vol == 0 else (mean / vol) * (252.0**0.5)

    # 勝率
    hit_rate = float((daily_ret > 0).sum()) / n

    return {
        "CumReturn": cum_return,
        "CAGR": cagr,
        "Volatility": vol,
        "Sharpe": sharpe,
        "HitRate": hit_rate,
    }


# 圖：權益曲線 + 單檔價格(含MA)
fig, ax = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
ax[0].plot(
    bt["Equity"].index, bt["Equity"].values, label="Equity (with dynamic leverage)"
)
ax[0].set_title("Portfolio Equity")
ax[0].legend()

pick = close.columns[0]
ax[1].plot(close.index, close[pick], label=f"{pick} Close")
ax[1].plot(close.index, close[pick].rolling(20).mean(), label="MA20", linestyle="--")
ax[1].plot(close.index, close[pick].rolling(50).mean(), label="MA50", linestyle=":")
ax[1].set_title(f"{pick} with MA20/MA50")
ax[1].legend()
plt.tight_layout()
plt.show()

In [None]:
prices = load_close_prices(TICKERS, PERIOD, INTERVAL)
print(prices.shape)
print(list(prices.columns)[:5])  # 應該看到你的股票代碼
prices.tail()  # 眼檢最後幾筆

In [None]:
# ===== Backtest: simple MA20/MA50 + RSI filter, equal-weight =====
import matplotlib.pyplot as plt

TX_COST_BPS = 5  # 5 bps per trade (0.05%)
REBAL_FREQ = "D"  # daily rebalance; change to "W" or "M" if you want


def generate_signals(close, ma20, ma50, rsi14):
    """Long = MA20>MA50 & RSI>55; Neutral = RSI 45–55; Flat otherwise."""
    strong = (close.gt(ma20)) & (ma20.gt(ma50)) & (rsi14.gt(55))
    neutral = (close.gt(ma50)) & (rsi14.between(45, 55))
    # raw weights: 1.0 for strong, 0.5 for neutral, 0 for others
    w_raw = strong.astype(float) * 1.0 + neutral.astype(float) * 0.5
    # normalize to equal-weight across selected names each day
    w = w_raw.div(w_raw.sum(axis=1).replace(0, np.nan), axis=0).fillna(0.0)
    return w


def backtest(close, weights, tx_cost_bps=5, rebalance="D"):
    # resample weights to the chosen frequency (e.g., weekly) and forward-fill
    if rebalance != "D":
        weights = (
            weights.resample(rebalance).last().reindex(close.index).ffill().fillna(0.0)
        )

    # returns and positions (trade at next bar open ~ here we use close-to-close)
    ret = close.pct_change().fillna(0.0)

    # apply 1-bar lag so today’s signal executes next day
    w_lag = weights.shift(1).fillna(0.0)

    # portfolio daily return before costs
    port_ret_gross = (w_lag * ret).sum(axis=1)

    # turnover & costs (sum of abs change in weights)
    turnover = w_lag.diff().abs().sum(axis=1).fillna(0.0)
    cost = turnover * (tx_cost_bps / 10000.0)

    port_ret_net = port_ret_gross - cost

    equity = (1.0 + port_ret_net).cumprod()
    out = pd.DataFrame(
        {"DailyRet": port_ret_net, "Turnover": turnover, "Equity": equity},
        index=close.index,
    )
    return out


# 1) build signals/weights
signals = generate_signals(prices)

# 2) run backtest
bt = backtest(prices, W, tx_cost_bps=TX_COST_BPS, rebalance=REBAL_FREQ)

# 3) show quick stats + plot
stats = perf_stats(bt["Equity"], bt["DailyRet"])
summary = pd.Series(stats, name="Portfolio").to_frame().T
display(
    summary.style.format(
        {
            "CumReturn": "{:.2%}",
            "CAGR": "{:.2%}",
            "MaxDD": "{:.2%}",
            "AnnVol": "{:.2%}",
            "Sharpe": "{:.2f}",
            "HitRate": "{:.1%}",
        }
    )
)

fig, ax = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
ax[0].plot(bt.index, bt["Equity"])
ax[0].set_title("Portfolio Equity")
ax[0].grid(True)
ax[1].plot(bt.index, bt["DailyRet"].rolling(21).sum())  # 1m rolling sum
ax[1].set_title("Rolling 21D Return")
ax[1].grid(True)
plt.tight_layout()
plt.show()