<a href="https://colab.research.google.com/github/ryotaiizuka2027-eng/tex/blob/main/Portfolio_Analysis_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np

def compare_quantile_portfolios(
    df,
    score_cols,
    target_quantile=None,
    n_quantiles=5,
    date_col='date',
    dateym_col='dateym',
    ticker_col='ticker',
    close_col='close',
    start_ym=None,
    end_ym=None
):
    """
    複数のスコアモデルについて、指定された分位ポートフォリオのパフォーマンスを一括計算して比較表を作成する関数。

    Parameters:
    -----------
    df : pd.DataFrame
        入力データフレーム（月次データ）。
    score_cols : list of str
        分析したいスコア列名のリスト (例: ['model_a_score', 'model_b_score'])
    target_quantile : int or None
        計算対象とする分位のインデックス (0始まり)。
        - 5分位の場合: 0=スコア最低層, 4=スコア最高層
        - Noneの場合: 自動的に一番スコアが高い層 (n_quantiles - 1) を選択
    n_quantiles : int
        分位数（例: 5なら5分位）
    start_ym, end_ym : int
        分析期間 (YYYYMM)

    Returns:
    --------
    summary_df : pd.DataFrame
        各モデルのパフォーマンス指標をまとめた比較表
    histories : dict
        各モデルの月次リターン推移DFを格納した辞書 { 'score_col_name': df_history }
    """

    # 単一の文字列が渡された場合はリストに変換
    if isinstance(score_cols, str):
        score_cols = [score_cols]

    # ターゲット分位のデフォルト設定（指定がなければ一番高い層）
    if target_quantile is None:
        target_quantile = n_quantiles - 1

    print(f"--- Analysis Configuration ---")
    print(f"Target Quantile Index: {target_quantile} (0=Lowest, {n_quantiles-1}=Highest)")
    print(f"Number of Quantiles: {n_quantiles}")
    print(f"Models to Compare: {score_cols}")

    # 元データをコピー
    work_df = df.copy()

    # 1. 前処理：リターン計算と期間フィルタリング（全モデル共通）
    work_df = work_df.sort_values([ticker_col, date_col])

    # 既にreturn列がなければ計算
    if 'return' not in work_df.columns:
        work_df['return'] = work_df.groupby(ticker_col)[close_col].pct_change()

    # リターン欠損行（初月）を除去
    work_df = work_df.dropna(subset=['return'])

    # 期間フィルタリング
    if start_ym is not None:
        work_df = work_df[work_df[dateym_col].astype(int) >= int(start_ym)]
    if end_ym is not None:
        work_df = work_df[work_df[dateym_col].astype(int) <= int(end_ym)]

    if work_df.empty:
        print("Warning: 指定された期間のデータが存在しません。")
        return pd.DataFrame(), {}

    # 結果格納用リスト
    summary_results = []
    history_results = {}

    # 2. 各スコアモデルごとにループ処理
    for s_col in score_cols:
        if s_col not in work_df.columns:
            print(f"Skipping '{s_col}': Column not found in DataFrame.")
            continue

        # 月ごとに分位計算を行い、ターゲット層のリターンを抽出する関数
        def get_target_layer_return(group):
            # スコアとリターンが有効なデータのみ
            valid_group = group.dropna(subset=[s_col, 'return'])

            if len(valid_group) < n_quantiles:
                return np.nan

            try:
                # 分位計算 (0 ~ n-1 のラベルが付く)
                valid_group['quantile_rank'] = pd.qcut(
                    valid_group[s_col].rank(method='first'),
                    q=n_quantiles,
                    labels=False
                )
            except ValueError:
                return np.nan

            # 指定された分位層を抽出
            target_layer = valid_group[valid_group['quantile_rank'] == target_quantile]

            if target_layer.empty:
                return np.nan

            # 等ウェイト平均リターン
            return target_layer['return'].mean()

        # グループ化適用
        portfolio_returns = work_df.groupby(date_col).apply(get_target_layer_return)
        portfolio_returns = portfolio_returns.dropna()

        if portfolio_returns.empty:
            print(f"Warning: '{s_col}' - ポートフォリオを構築できませんでした。")
            continue

        # --- 指標計算 ---
        # 累積リターン
        cum_return = (1 + portfolio_returns).prod() - 1
        # 累和リターン
        sum_return = portfolio_returns.sum()
        # リスク (年率)
        risk_annualized = portfolio_returns.std() * np.sqrt(12)
        # 年率平均リターン (リターン/リスク計算用)
        mean_return_annualized = portfolio_returns.mean() * 12
        # リターン/リスク
        rr_ratio = mean_return_annualized / risk_annualized if risk_annualized != 0 else 0

        # 結果を保存
        summary_results.append({
            'Model Name': s_col,
            'Cumulative Return': cum_return,
            'Sum Return': sum_return,
            'Risk (Ann. Std)': risk_annualized,
            'Risk-Return Ratio': rr_ratio,
            'Target Quantile': f"{target_quantile + 1}/{n_quantiles}" # 表示用(1始まり)
        })

        # 月次推移も保存（後でグラフにする場合などに使用）
        history_df = portfolio_returns.to_frame(name='portfolio_return')
        history_df['cumulative_return'] = (1 + history_df['portfolio_return']).cumprod() - 1
        history_results[s_col] = history_df

    # 3. 集計結果のDF化
    if not summary_results:
        return pd.DataFrame(), {}

    summary_df = pd.DataFrame(summary_results)

    # 表示順序の調整（モデル名をインデックスにするかはお好みで）
    summary_df = summary_df.set_index('Model Name')

    return summary_df, history_results

# ==========================================
# 実行部分 (dfは既に存在すると想定)
# ==========================================

# ------------------------------------------------
# 【ユーザー設定エリア】
# 比較したいスコア列のリストを指定してください
SCORE_COLUMNS = ['score_model_A', 'score_model_B', 'score_traditional']

# 分析期間設定
USER_START_YM  = 201501
USER_END_YM    = 202312

# 分位設定
# 5分位(20%刻み)で、一番高い層(4)を見たい場合: 4
# 5分位で、一番低い層(0)を見たい場合: 0
USER_N_QUANTILES = 5
USER_TARGET_RANK = 4  # Noneにすると自動で最高層(4)になります
# ------------------------------------------------

# 変数 `df` が存在するかチェックしてから実行
if 'df' in locals() and isinstance(df, pd.DataFrame):

    # --- データフレームにダミーのスコア列がない場合の安全策 ---
    # (実際のデータを使う場合はこのifブロックは不要です)
    for col in SCORE_COLUMNS:
        if col not in df.columns:
            print(f"Creating dummy column for '{col}' (Test purpose only)")
            df[col] = np.random.rand(len(df))
    # -------------------------------------------------------

    print(f"\n>>> Starting Multi-Model Portfolio Analysis <<<")

    metrics_summary, histories = compare_quantile_portfolios(
        df,
        score_cols=SCORE_COLUMNS,
        target_quantile=USER_TARGET_RANK,
        n_quantiles=USER_N_QUANTILES,
        date_col='date',
        dateym_col='dateym',
        ticker_col='ticker',
        close_col='close',
        start_ym=USER_START_YM,
        end_ym=USER_END_YM
    )

    if not metrics_summary.empty:
        print("\n[Comparison of Portfolio Performance]")
        # 見やすいように数値を丸めて表示
        print(metrics_summary.round(4))

        # 例: 一番リターン/リスクが高いモデルを表示
        best_model = metrics_summary['Risk-Return Ratio'].idxmax()
        print(f"\nBest Model by Risk-Return: {best_model}")

    else:
        print("計算結果が得られませんでした。")

else:
    print("エラー: 変数 'df' が見つかりません。")