In [26]:
import datetime
import importlib
import warnings
warnings.simplefilter('ignore')

from typing import Dict, Any, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pandas_datareader.data as web
import yfinance as yf

In [27]:
"""
start_date = "2000-01-01"
end_date = datetime.datetime.now().strftime("%Y-%m-%d")

ticker = [
    "XLB", "XLE", "XLF", "XLI", "XLK", "XLP", "XLU", "XLV", "XLY"
] 
df: pd.Series = yf.download(ticker, start=start_date, end=end_date)["Close"]

df_cpi = web.DataReader("CPIAUCSL", "fred", start_date, end_date)

df_sp = yf.download(["SPY"], start=start_date, end=end_date)["Close"]
"""

'\nstart_date = "2000-01-01"\nend_date = datetime.datetime.now().strftime("%Y-%m-%d")\n\nticker = [\n    "XLB", "XLE", "XLF", "XLI", "XLK", "XLP", "XLU", "XLV", "XLY"\n] \ndf: pd.Series = yf.download(ticker, start=start_date, end=end_date)["Close"]\n\ndf_cpi = web.DataReader("CPIAUCSL", "fred", start_date, end_date)\n\ndf_sp = yf.download(["SPY"], start=start_date, end=end_date)["Close"]\n'

In [28]:
cpi_path = "cpi.csv"
prices_path = "prices.csv"
spy_path = "SPY.csv"

In [29]:
def prepare_monthly_panel(
    cpi_path: str = "cpi.csv",
    prices_path: str = "prices.csv",
    spy_path: str = "SPY.csv",
) -> pd.DataFrame:
    """
    FRED CPI (monthly), sector ETF daily prices, SPY daily prices から
    月次パネルを作成するユーティリティ。
    戻り値は PeriodIndex (M) を持つ DataFrame で、
    列に "infl", "SPY_ret", 各セクターの月次リターンが入る。
    """

    # --- CPI: monthly (already monthly series) ---
    cpi = pd.read_csv(cpi_path)
    cpi["DATE"] = pd.to_datetime(cpi["DATE"])
    cpi = cpi.set_index("DATE").sort_index()
    # ログ差分でインフレ率
    cpi["infl"] = np.log(cpi["CPIAUCSL"]).diff()
    cpi_m = cpi[["infl"]].dropna()
    cpi_m["period"] = cpi_m.index.to_period("M")

    # --- Sector ETF prices (daily -> monthly returns) ---
    prices = pd.read_csv(prices_path)
    prices["Date"] = pd.to_datetime(prices["Date"])
    prices = prices.set_index("Date").sort_index()
    prices_m = prices.resample("M").last()
    rets_m = np.log(prices_m).diff().dropna()
    rets_m["period"] = rets_m.index.to_period("M")

    # --- SPY (daily -> monthly returns) ---
    spy = pd.read_csv(spy_path)
    spy["Date"] = pd.to_datetime(spy["Date"])
    spy = spy.set_index("Date").sort_index()
    spy_m = spy.resample("M").last()
    spy_rets = np.log(spy_m).diff().dropna()

    # SPY の列名を SPY_ret に統一（CSV に合わせて必要なら修正）
    # 例: 列名が "SPY" の場合
    if "SPY" in spy_rets.columns:
        spy_rets = spy_rets.rename(columns={"SPY": "SPY_ret"})
    else:
        # 最初の列を SPY_ret とみなす
        spy_rets = spy_rets.rename(columns={spy_rets.columns[0]: "SPY_ret"})

    spy_rets["period"] = spy_rets.index.to_period("M")

    # --- inner merge on monthly period ---
    df = (
        cpi_m.reset_index(drop=True)
        .merge(rets_m.reset_index(drop=True), on="period", how="inner")
        .merge(spy_rets.reset_index(drop=True), on="period", how="inner")
    )

    df = df.set_index("period").sort_index()
    return df

In [30]:
df = prepare_monthly_panel(cpi_path, prices_path, spy_path)
df

Unnamed: 0_level_0,infl,XLB,XLE,XLF,XLI,XLK,XLP,XLU,XLV,XLY,SPY_ret
period,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2000-02,0.004126,-0.105882,-0.043256,-0.113220,-0.056752,0.099907,-0.124446,-0.129582,-0.067622,-0.057327,-0.015343
2000-03,0.005865,0.096122,0.117270,0.167736,0.130332,0.080559,0.036904,0.103672,0.086132,0.132358,0.092502
2000-04,-0.000585,-0.033785,-0.015038,0.009612,0.014343,-0.096343,0.051661,0.063406,-0.011786,-0.021042,-0.035752
2000-05,0.001754,-0.032065,0.111026,0.022076,-0.004228,-0.109789,0.068899,-0.002197,-0.027170,-0.055492,-0.015847
2000-06,0.005824,-0.089390,-0.056588,-0.049717,-0.040126,0.094907,0.055068,-0.034558,0.002116,-0.055620,0.019491
...,...,...,...,...,...,...,...,...,...,...,...
2025-05,0.000808,0.028787,0.012714,0.044131,0.084710,0.095063,0.012146,0.037562,-0.057355,0.080518,0.060949
2025-06,0.002866,0.021964,0.047544,0.030714,0.035418,0.093926,-0.015875,0.003740,0.020842,0.018500,0.050110
2025-07,0.001964,-0.000911,0.027906,0.000000,0.029982,0.036867,-0.014806,0.047938,-0.032881,0.018690,0.022770
2025-08,0.003817,0.050563,0.035815,0.030465,0.000000,-0.001104,0.012456,-0.015884,0.052278,0.045510,0.020312


In [33]:
def ols_ssr(X: np.ndarray, y: np.ndarray):
    """
    最小二乗で beta と SSR を計算する簡易関数。
    """
    beta, *_ = np.linalg.lstsq(X, y, rcond=None)
    resid = y - X @ beta
    ssr = float(resid @ resid)
    return ssr, beta

def one_break_partial_inflation(
    df: pd.DataFrame,
    sec: str,
    eps: float = 0.15,
) -> Dict[str, Any]:
    """
    部分構造変化モデル（インフレ係数のみブレーク）の 1 ブレーク版を
    セクター sec について推定する。

        r_t^sec = γ0 + γ1 * r_SPY_t + γ2 * r_sec_{t-1}
                  + Σ_{m=2..12} γ_m * D_{m,t}
                  + δ_j * π_t + u_t ,   t ∈ regime j (j=1,2)

    ここで δ_j（インフレ係数）のみがレジームごとに変化する partial structural change。

    実装：
    - Z: ブレークしない説明変数（定数, SPY_ret, lag_ret, 月次ダミー）
    - W: ブレークするインフレ率（infl）
    - 0 ブレーク: [Z, W]
    - 1 ブレーク: [Z, W * D1, W * D2] （D1: t <= tb, D2: t > tb）
    - supF = max_tb F(tb) を計算
    """

    # 対象セクターとインフレ・SPY を抜き出し
    tmp = df[[sec, "infl", "SPY_ret"]].copy()

    # 1 期ラグを追加
    tmp["lag_ret"] = tmp[sec].shift(1)

    # 月情報（月次ダミー用）: PeriodIndex -> Timestamp -> month
    tmp["month"] = tmp.index.to_timestamp().month

    # 月次ダミー（2〜12 月）。1 月はベース。
    for m in range(2, 13):
        tmp[f"m{m}"] = (tmp["month"] == m).astype(float)

    # ラグが入ったことで先頭に NaN が出るので除去
    tmp = tmp.dropna()

    # 被説明変数
    y = tmp[sec].to_numpy()
    Tsec = len(y)

    # Z: ブレークしない説明変数（const, SPY_ret, lag_ret, 11 個の月次ダミー）
    Z_cols = ["SPY_ret", "lag_ret"] + [f"m{m}" for m in range(2, 13)]
    Z = np.column_stack([np.ones(Tsec), tmp[Z_cols].to_numpy()])
    qZ = Z.shape[1]  # ここでは 1 + 1 + 1 + 11 = 14 のはず

    # W: ブレークするインフレ係数用の regressor
    W = tmp["infl"].to_numpy().reshape(-1, 1)

    # --- 0 ブレーク（インフレ係数共通）の基準モデル ---
    X0 = np.hstack([Z, W])          # [Z, infl]
    SSR0, beta0 = ols_ssr(X0, y)
    p0 = X0.shape[1]                # = qZ + 1

    # --- 候補ブレーク点の範囲（トリミング eps） ---
    h = max(5, int(eps * Tsec))     # 各レジームの最小長さ
    F_stats: List[float] = []
    ssr_list: List[float] = []
    idx_list: List[int] = []

    # --- 1 ブレーク partial structural change supF の計算 ---
    for tb in range(h, Tsec - h):
        # regime1: t <= tb, regime2: t > tb
        D1 = (np.arange(Tsec) <= tb).astype(float).reshape(-1, 1)
        D2 = 1.0 - D1

        # インフレ係数のみレジームごとに変化させる
        # [Z, infl * D1, infl * D2]
        X1 = np.hstack([Z, W * D1, W * D2])

        SSR1, _ = ols_ssr(X1, y)
        p1 = X1.shape[1]            # = qZ + 2

        df1 = p1 - p0               # 変化するパラメータの自由度（ここでは 1）
        df2 = Tsec - p1             # 残差の自由度

        if SSR1 <= 0 or df2 <= 0:
            continue

        F = ((SSR0 - SSR1) / df1) / (SSR1 / df2)
        F_stats.append(F)
        ssr_list.append(SSR1)
        idx_list.append(tb)

    if not F_stats:
        # ブレーク候補が全く作れなかった場合（サンプルが極端に短いときなど）
        return {
            "sec": sec,
            "T": Tsec,
            "has_break": False,
            "message": "No admissible breakpoints (sample too short or eps too large).",
        }

    # supF とそのときのブレーク位置
    supF = max(F_stats)
    k = F_stats.index(supF)
    tb_hat = idx_list[k]
    SSR1_hat = ssr_list[k]

    # supF 最大の tb_hat で再推定して、インフレ係数の pre/post を取り出す
    D1 = (np.arange(Tsec) <= tb_hat).astype(float).reshape(-1, 1)
    D2 = 1.0 - D1
    X1_hat = np.hstack([Z, W * D1, W * D2])
    SSR1, beta1 = ols_ssr(X1_hat, y)

    # beta1 の最後の 2 つが infl * D1, infl * D2 の係数 = 各レジームのインフレ係数
    beta_infl_pre = float(beta1[-2])
    beta_infl_post = float(beta1[-1])

    # ブレークに対応する期間（Period）を控えておく
    break_period = tmp.index[tb_hat]

    return {
        "sec": sec,
        "T": Tsec,
        "has_break": True,
        "supF": float(supF),
        "break_idx": int(tb_hat),
        "break_period": str(break_period),
        "SSR0": float(SSR0),
        "SSR1": float(SSR1_hat),
        "beta_infl_pre": beta_infl_pre,
        "beta_infl_post": beta_infl_post,
        "eps": eps,
        "min_segment_length": h,
    }


In [39]:

sectors = ["XLI"] #, "XLE", "XLF", "XLI", "XLK", "XLP", "XLU", "XLV", "XLY"]
eps = 0.15

results: List[Dict[str, Any]] = []
for sec in sectors:
    if sec not in df.columns:
        print(f"[WARN] sector {sec} が df に見つかりません。スキップします。")
        continue
    res = one_break_partial_inflation(df, sec, eps=eps)
    results.append(res)

In [40]:
results

[{'sec': 'XLI',
  'T': 307,
  'has_break': True,
  'supF': 0.880111623732849,
  'break_idx': 260,
  'break_period': '2021-11',
  'SSR0': 0.14618574583836572,
  'SSR1': 0.14574494919271327,
  'beta_infl_pre': -0.39306231681644255,
  'beta_infl_post': 0.43396765324683406,
  'eps': 0.15,
  'min_segment_length': 46}]

In [36]:
df["XLB"]

period
2000-02   -0.105882
2000-03    0.096122
2000-04   -0.033785
2000-05   -0.032065
2000-06   -0.089390
             ...   
2025-05    0.028787
2025-06    0.021964
2025-07   -0.000911
2025-08    0.050563
2025-09   -0.024515
Freq: M, Name: XLB, Length: 308, dtype: float64

In [38]:
import numpy as np
import pandas as pd
import math
from typing import Dict, Any, List


# ============================
# 1. 月次パネルの構築
# ============================

def prepare_monthly_panel(
    cpi_path: str = "cpi.csv",
    prices_path: str = "prices.csv",
    spy_path: str = "SPY.csv",
) -> pd.DataFrame:
    """
    cpi.csv: FRED CPI (CPIAUCSL, monthly)
    prices.csv: セクターETFの日次価格 (Date, XLB, XLE, ..., XLY)
    SPY.csv: SPYの日次価格 (Date, SPY など)

    戻り値:
        index : PeriodIndex (freq='M')
        columns : 'infl', 'SPY_ret', 各セクターの月次ログリターン
    """
    # --- CPI ---
    cpi = pd.read_csv(cpi_path)
    cpi["DATE"] = pd.to_datetime(cpi["DATE"])
    cpi = cpi.set_index("DATE").sort_index()
    cpi["infl"] = np.log(cpi["CPIAUCSL"]).diff()
    cpi_m = cpi[["infl"]].dropna()
    cpi_m["period"] = cpi_m.index.to_period("M")

    # --- Sector ETF prices ---
    prices = pd.read_csv(prices_path)
    prices["Date"] = pd.to_datetime(prices["Date"])
    prices = prices.set_index("Date").sort_index()
    prices_m = prices.resample("M").last()
    rets_m = np.log(prices_m).diff().dropna()
    rets_m["period"] = rets_m.index.to_period("M")

    # --- SPY ---
    spy = pd.read_csv(spy_path)
    spy["Date"] = pd.to_datetime(spy["Date"])
    spy = spy.set_index("Date").sort_index()
    spy_m = spy.resample("M").last()
    spy_rets = np.log(spy_m).diff().dropna()

    if "SPY" in spy_rets.columns:
        spy_rets = spy_rets.rename(columns={"SPY": "SPY_ret"})
    else:
        spy_rets = spy_rets.rename(columns={spy_rets.columns[0]: "SPY_ret"})

    spy_rets["period"] = spy_rets.index.to_period("M")

    df = (
        cpi_m.reset_index(drop=True)
        .merge(rets_m.reset_index(drop=True), on="period", how="inner")
        .merge(spy_rets.reset_index(drop=True), on="period", how="inner")
    )
    df = df.set_index("period").sort_index()
    return df


# ============================
# 2. 部分構造変化モデル（インフレ係数のみブレーク）
# ============================

def ols_ssr(X: np.ndarray, y: np.ndarray):
    beta, *_ = np.linalg.lstsq(X, y, rcond=None)
    resid = y - X @ beta
    return float(resid @ resid), beta


def estimate_partial_model_current_beta(df_train: pd.DataFrame, sec: str, eps: float = 0.15) -> float:
    """
    トレーニングデータ df_train（~ 年末まで）を使って、
    セクター sec の部分構造変化モデルを推定し、
    「最新レジームのインフレ係数 β_pi」を返す。

    1 ブレークを許容：
        r_t = Z_t * gamma + infl_t * delta_j + u_t, j=1,2
    """
    tmp = df_train[[sec, "infl", "SPY_ret"]].copy()
    tmp["lag_ret"] = tmp[sec].shift(1)
    tmp["month"] = tmp.index.to_timestamp().month
    for m in range(2, 13):
        tmp[f"m{m}"] = (tmp["month"] == m).astype(float)
    tmp = tmp.dropna()

    y = tmp[sec].to_numpy()
    Tsec = len(y)
    if Tsec < 20:
        return float("nan")  # サンプル不足

    Z_cols = ["SPY_ret", "lag_ret"] + [f"m{m}" for m in range(2, 13)]
    Z = np.column_stack([np.ones(Tsec), tmp[Z_cols].to_numpy()])
    W = tmp["infl"].to_numpy().reshape(-1, 1)

    # --- 0-break model ---
    X0 = np.hstack([Z, W])
    SSR0, beta0 = ols_ssr(X0, y)
    p0 = X0.shape[1]

    # --- 1-break supF 探索 ---
    h = max(5, int(eps * Tsec))
    best_F = -np.inf
    best_tb = None

    for tb in range(h, Tsec - h):
        D1 = (np.arange(Tsec) <= tb).astype(float).reshape(-1, 1)
        D2 = 1.0 - D1
        X1 = np.hstack([Z, W * D1, W * D2])
        SSR1, _ = ols_ssr(X1, y)
        p1 = X1.shape[1]
        df1 = p1 - p0
        df2 = Tsec - p1
        if SSR1 <= 0 or df2 <= 0:
            continue
        F = ((SSR0 - SSR1) / df1) / (SSR1 / df2)
        if F > best_F:
            best_F = F
            best_tb = tb

    if best_tb is None:
        # ブレークなし：インフレ係数は beta0 の末尾
        beta_pi = beta0[-1]
    else:
        # best_tb で再推定して post-regime のインフレ係数を取る
        D1 = (np.arange(Tsec) <= best_tb).astype(float).reshape(-1, 1)
        D2 = 1.0 - D1
        X1_hat = np.hstack([Z, W * D1, W * D2])
        SSR1, beta1 = ols_ssr(X1_hat, y)
        beta_pi = beta1[-1]  # infl * D2 の係数 = 最終レジーム

    return float(beta_pi)


# ============================
# 3. 年1回ロールフォワード戦略バックテスト
# ============================

def backtest_yearly_rollforward_beta(
    df: pd.DataFrame,
    min_train_months: int = 60,
    eps: float = 0.15,
):
    """
    毎年1月の時点で、それ以前の全データ df_train で partial β_pi を推定し、
    その年のセクター配分（70/30）を決めて月次リターンを計算する。
    """
    idx = df.index
    sector_cols = [c for c in df.columns if c not in ["infl", "SPY_ret"]]

    sec_simple = np.exp(df[sector_cols]) - 1.0
    spy_simple = np.exp(df["SPY_ret"]) - 1.0
    ew_ret = sec_simple.mean(axis=1)

    years = sorted(set(idx.year))
    strat_ret = pd.Series(index=idx, dtype=float)

    for y in years:
        year_mask = (idx.year == y)
        months_in_year = idx[year_mask]
        if len(months_in_year) == 0:
            continue

        first_period_this_year = months_in_year[0]
        train_mask = (idx < first_period_this_year)
        df_train = df[train_mask]

        # トレーニング期間が短いときは等ウェイトで代用
        if len(df_train) < min_train_months:
            for t in months_in_year:
                strat_ret[t] = ew_ret.loc[t]
            continue

        # 各セクターの β_pi を推定
        beta_pi_dict: Dict[str, float] = {}
        for sec in sector_cols:
            beta_pi = estimate_partial_model_current_beta(df_train, sec, eps=eps)
            beta_pi_dict[sec] = beta_pi

        # 符号でプラス/マイナス側セクターを分ける
        pos = [sec for sec, b in beta_pi_dict.items() if not math.isnan(b) and b > 0]
        neg = [sec for sec, b in beta_pi_dict.items() if not math.isnan(b) and b <= 0]

        if (not pos) and (not neg):
            for t in months_in_year:
                strat_ret[t] = ew_ret.loc[t]
            continue

        weights: Dict[str, float] = {}
        if pos and neg:
            for sec in pos:
                weights[sec] = 0.7 / len(pos)
            for sec in neg:
                weights[sec] = 0.3 / len(neg)
        elif pos:
            for sec in pos:
                weights[sec] = 1.0 / len(pos)
        else:  # neg only
            for sec in neg:
                weights[sec] = 1.0 / len(neg)

        # その年の各月に対してリターン計算
        for t in months_in_year:
            r_t = 0.0
            for sec, w in weights.items():
                r_t += w * sec_simple.loc[t, sec]
            strat_ret[t] = r_t

    return strat_ret, ew_ret, spy_simple


# ============================
# 4. パフォーマンス指標
# ============================

def performance_stats(returns: pd.Series, freq: int = 12) -> Dict[str, float]:
    r = returns.dropna()
    if len(r) == 0:
        return {"CAGR": np.nan, "Vol": np.nan, "Sharpe": np.nan, "MaxDD": np.nan, "Total": np.nan}

    n = len(r)
    total_ret = (1.0 + r).prod()
    cagr = total_ret ** (freq / n) - 1.0
    vol = r.std() * math.sqrt(freq)
    sharpe = cagr / vol if vol > 0 else float("nan")

    wealth = (1.0 + r).cumprod()
    dd = wealth / wealth.cummax() - 1.0
    max_dd = dd.min()

    return {"CAGR": cagr, "Vol": vol, "Sharpe": sharpe, "MaxDD": max_dd, "Total": total_ret}


# ============================
# 5. メイン
# ============================

def main():
    df = prepare_monthly_panel("cpi.csv", "prices.csv", "SPY.csv")
    strat_ret, ew_ret, spy_ret = backtest_yearly_rollforward_beta(
        df,
        min_train_months=60,
        eps=0.15,
    )

    stats_strat = performance_stats(strat_ret)
    stats_ew = performance_stats(ew_ret)
    stats_spy = performance_stats(spy_ret)

    stats_df = pd.DataFrame(
        {"BetaPi_Rotation": stats_strat, "EqualWeight": stats_ew, "SPY": stats_spy}
    )
    print("=== Performance (2000-02〜終端, 月次, freq=12) ===")
    print(stats_df)


if __name__ == "__main__":
    main()


=== Performance (2000-02〜終端, 月次, freq=12) ===
        BetaPi_Rotation  EqualWeight       SPY
CAGR           0.097296     0.088137  0.081975
Vol            0.155605     0.148328  0.151956
Sharpe         0.625272     0.594204  0.539467
MaxDD         -0.470466    -0.491246 -0.507848
Total         10.838637     8.740731  7.555123
