<a href="https://colab.research.google.com/github/rgcatarino/DGU_2009_Replication/blob/main/DGU_2009_Replication_FF_Research_Data_Factors.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DGU (2009) — Geral

Notebook para reproduzir os testes dos modelos de alocação de **DeMiguel, Garlappi & Uppal (2009)** usando janela móvel (M=120), custos de **50 bps**, e resumo no estilo do artigo.



#Imports

In [10]:
!pip -q install cvxpy


import os, sys, importlib.util, pandas as pd, numpy as np
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
from functools import lru_cache
import json
import math
from math import erf, sqrt, isfinite
import importlib.util, sys


In [2]:
PATH_PY   = "/content/dgu_backtest_fffactors_v0.py"

PATH_SHARPE = "/content/dgu_table3_sharpe.csv"
PATH_CEQ = "/content/table4_ceq_returns.csv"
PATH_TURNOVER = "/content/table5_panelA_turnover.csv"
PATH_TURNOVER_REL = PATH_TURNOVER




assert Path(PATH_PY).exists(), "Arquivo .py não encontrado."


BASE_DGU_TYPE = "SMB_HML"

# Importa módulo
spec = importlib.util.spec_from_file_location("dgu_backtest", PATH_PY)
dgu  = importlib.util.module_from_spec(spec)
sys.modules["dgu_backtest"] = dgu
spec.loader.exec_module(dgu)

print("Módulo importado com sucesso. Funções:", [x for x in dir(dgu) if not x.startswith("_")])

Módulo importado com sucesso. Funções: ['Dict', 'FACTOR_COLS', 'FF3_ALIASES', 'List', 'Optional', 'Result', 'Tuple', 'dataclass', 'erf', 'isfinite', 'np', 'pd', 'rolling_backtest', 'sqrt']


In [3]:
def load_tbill90_csv(path_csv: str) -> pd.DataFrame:
    """
    Lê um CSV com colunas DATE e RF e retorna:
      - índice DATE no fim de mês (aaaa-mm-dd)
      - coluna RF como float em DECIMAL MENSAL

    Regras de escala:
      - Se RF parecer % anual (e a coluna se chamar 'TB3MS' em outro arquivo), converte para mensal equivalente.
      - Se RF parecer % mensal, divide por 100.
      - Caso contrário, assume que já está em decimal mensal.
    """
    df = pd.read_csv(path_csv)

    # garante nomes canônicos
    # (se viesse 'TB3MS' ao invés de 'RF', renomeie conforme necessidade)
    if "DATE" not in df.columns:
        raise ValueError("CSV precisa ter coluna DATE.")
    if "RF" not in df.columns:
        # se vier TB3MS como nome da coluna de taxa:
        if "TB3MS" in df.columns:
            df = df.rename(columns={"TB3MS": "RF"})
        else:
            raise ValueError("CSV precisa ter coluna RF (ou TB3MS).")

    # datas -> fim de mês
    dt = pd.to_datetime(df["DATE"], errors="coerce")
    dt = dt.dt.to_period("M").dt.to_timestamp("M")

    rf_raw = pd.to_numeric(df["RF"], errors="coerce")

    # Heurística de escala
    med = rf_raw.dropna().abs().median()
    if pd.isna(med):
        med = 0.0

    def monthly_from_tb3ms_percent(tb3ms_percent: pd.Series) -> pd.Series:
        y = tb3ms_percent / 100.0
        r3m = 1.0 + y * (90.0/360.0)     # HPR 3m (discount basis aprox.)
        return (r3m ** (1.0/3.0)) - 1.0  # mensal equivalente (decimal)

    # Se os valores forem grandes (>1), tratamos como % (anual ou mensal)
    if med > 1.0:
        # Para um CSV chamado TB3MS.csv, normalmente isso é % anual -> use conversão 90d
        # Se já for % mensal (FF-style), você pode trocar a linha abaixo por (rf_raw/100.0)
        rf = monthly_from_tb3ms_percent(rf_raw)
    else:
        # valores pequenos: ou já estão em decimal ou são % mensal muito baixos
        # Se mediana entre 0.2% e 10% (~0.002–0.10), é % mensal => /100
        if 0.002 <= med <= 0.10:
            rf = rf_raw / 100.0
        else:
            rf = rf_raw  # já decimal

    out = (
        pd.DataFrame({"DATE": dt, "RF": rf})
        .dropna()
        .drop_duplicates(subset=["DATE"])
        .set_index("DATE")
        .sort_index()
    )
    out["RF"] = out["RF"].astype(float)
    return out

In [4]:
def _load_ff(path_csv: str) -> pd.DataFrame:
    raw = pd.read_csv(path_csv, dtype=str)
    first = raw.columns[0]
    mask  = raw[first].astype(str).str.match(r"^\d{6}$").fillna(False)
    df    = raw.loc[mask].copy()
    df["DATE"] = pd.to_datetime(df[first].astype(str)+"01", format="%Y%m%d") + pd.offsets.MonthEnd(0)
    for c in df.columns:
        if c not in [first, "DATE"]:
            df[c] = pd.to_numeric(df[c].str.strip(), errors="coerce")/100.0  # % -> dec
    df = df.set_index("DATE").sort_index()
    ren = {}
    for c in df.columns:
        lc = c.lower()
        if lc in ("mkt-rf","mktrf","mkt_rf","mkt - rf"): ren[c] = "MktRF"
        elif lc == "rf": ren[c] = "RF"
        elif lc == "smb": ren[c] = "SMB"
        elif lc == "hml": ren[c] = "HML"
        elif lc in ("umd","mom"): ren[c] = "UMD"
    if ren: df = df.rename(columns=ren)
    return df



In [5]:
def excess_returns(rets, rf, date_col="DATE", rf_col="RF"):
    # --- rets: DataFrame (colunas = ativos). Aceita DATE como coluna ou índice
    R = rets.copy() if isinstance(rets, pd.DataFrame) else rets.to_frame()
    if date_col in getattr(R, "columns", []):
        R = R.set_index(date_col)
    R.index = pd.to_datetime(R.index, errors="coerce").to_period("M").to_timestamp("M")
    R = R[~R.index.duplicated(keep="last")].sort_index()

    # --- rf: Series OU DataFrame; tenta usar a coluna RF (ou a 1ª numérica)
    if isinstance(rf, pd.Series):
        s = rf.copy()
    else:
        if rf_col in rf.columns:
            s = rf[rf_col].copy()
        else:
            s = rf.select_dtypes("number").iloc[:, 0].copy()
        if date_col in rf.columns:
            s.index = rf[date_col]
    s.index = pd.to_datetime(s.index, errors="coerce").to_period("M").to_timestamp("M")
    s = s[~s.index.duplicated(keep="last")].sort_index()

    # Heurística: se RF parece estar em %, converte para decimal
    if s.dropna().abs().median() > 0.2:
        s = s / 100.0

    # --- alinha e subtrai mês a mês
    s = s.reindex(R.index)
    R_excess = R.sub(s, axis=0)

    return R_excess

In [6]:

@lru_cache(None)
def _load_table(path: str = "/content/dgu_table3_sharpe.csv") -> pd.DataFrame:
    """
    Lê e normaliza a tabela de Sharpe do DGU.
    """
    df = pd.read_csv(path)
    # Normaliza nomes de colunas
    df.columns = [str(c).strip() for c in df.columns]
    if "Strategy" not in df.columns:
        raise ValueError(
            "Arquivo não contém a coluna obrigatória 'Strategy'. "
            f"Colunas encontradas: {list(df.columns)}"
        )
    # Coluna auxiliar normalizada para busca robusta
    df["__Strategy_norm__"] = df["Strategy"].astype(str).str.strip().str.casefold()
    return df

def get_value_by_strategy(strategy: str, column: str,
                          df: None):
    """
    Retorna o valor na célula (Strategy == strategy, Coluna == column).

    Parâmetros
    ----------
    strategy : str
        Nome da estratégia exatamente como aparece na tabela (case-insensitive).
    column : str
        Nome da outra coluna desejada (case-insensitive).
    path : str, opcional
        Caminho do CSV. Padrão: '/content/dgu_table3_sharpe.csv'.

    Retorna
    -------
    float ou objeto original:
        Tenta converter para float; se não for possível, retorna o valor original.
    """


    # Mapeia colunas ignorando caixa e espaços
    colmap = {c.casefold(): c for c in df.columns}
    colkey = str(column).strip().casefold()
    if colkey not in colmap:
        # Sugestão de colunas “não-auxiliares”
        cols_ok = [c for c in df.columns if c != "__Strategy_norm__"]
        raise ValueError(
            f"Coluna '{column}' não encontrada. Disponíveis: {cols_ok}"
        )
    col_real = colmap[colkey]

    # Filtra a linha da estratégia (robusto a caixa e espaços)
    strat_key = str(strategy).strip().casefold()
    row = df.loc[df["__Strategy_norm__"] == strat_key]
    if row.empty:
        strats = sorted(df["Strategy"].dropna().unique().tolist())
        raise ValueError(
            f"Estratégia '{strategy}' não encontrada. Disponíveis: {strats}"
        )

    val = row.iloc[0][col_real]
    # Tenta converter para float, mas devolve o original se não der
    try:
        return float(val)
    except (TypeError, ValueError):
        return val

# -------------------------
# Exemplo de uso (ajuste os nomes conforme seu CSV):
# v = get_value_by_strategy("EW", "Industry")
# print(v)


In [7]:
df_sharpe = _load_table(PATH_SHARPE)
df_ceq = _load_table(PATH_CEQ)
df_turnover = _load_table(PATH_TURNOVER)
df_turnover_rel = _load_table(PATH_TURNOVER_REL)

Carrega base de dados

In [11]:
if BASE_DGU_TYPE == "SMB_HML":
  PATH_PANEL ="/content/F-F_Research_Data_Factors.txt" # F-F_Research_Data_Factors.txt para teste com base original e F-F_Research_Data_Factors_2025.txt para dados atualizados
  START = "1963-07-31"
  END   = "2024-11-30"
  sel_cols = ["MktRF","SMB","HML"]
  # 1) Carrega fatores (seu CSV: DATE, RF, SMB, HML, Mkt-RF)
  base_data = pd.read_csv(PATH_PANEL)
  base_data["DATE"] = pd.to_datetime(base_data["DATE"].astype(str).str.slice(0,6), format="%Y%m") + pd.offsets.MonthEnd(0)
  base_data = base_data.set_index("DATE").sort_index()
  base_data = base_data.loc[START:END]
  # 2) Prepara entradas — 3 séries (excesso já embutido)
  rets = base_data[sel_cols].copy()
  #rets = rets.rename(columns={'MktRF': 'MKT'})
  rf   = base_data["RF"].copy()
  mkt_vm = base_data["MktRF"].copy()
  RETURNS_EXCESS = True,
  vl_drop_mkt_from_universe=False



assert Path(PATH_PANEL).exists(), "Txt da base não encontrado."


Rolling Backtest

In [12]:
# --- Alinha período DGU (1963-07 a 2004-11) ---

rets = rets.loc[(rets.index >= START) & (rets.index <= END)]

# --- Backtest (janela M=120, custo 50 bps, NET) ---
M = 120; COST_BPS = 50.0

results = dgu.rolling_backtest(
    rets_m=rets,
    rf_m=rf, # Pass the 'RF' column as a Series
    M=M,returns_are_excess=RETURNS_EXCESS,
    vw_mkt_m=mkt_vm,
    ignore_col=None,
    drop_mkt_from_universe=vl_drop_mkt_from_universe

)

print("Período aplicado:", rets.index.min().date(), "→", rets.index.max().date(), "| #meses:", len(rets))
print("Estratégias:", sorted(list(results.keys())))

Período aplicado: 1963-07-31 → 2004-11-30 | #meses: 497
Estratégias: ['BS', 'BS-C', 'DM', 'EW', 'EW-MIN', 'G-MIN-C', 'MIN', 'MIN-C', 'MP', 'MV', 'MV (in sample)', 'MV-C', 'MV-MIN', 'VW']


In [13]:
# ---------- helpers ----------

def _ann_factors(freq: str = "M"):
    f = freq.upper()
    if f in ("M", "ME"):   k = 12
    elif f in ("W", "WEEK"): k = 52
    elif f in ("D", "B", "BD"): k = 252
    else: k = 12
    return k, np.sqrt(k)

def _as_series(x) -> pd.Series:
    if x is None:
        return pd.Series(dtype=float)
    if isinstance(x, pd.Series):
        return x
    try:
        return pd.Series(x)
    except Exception:
        return pd.Series(dtype=float)

def _safe_mean(s: pd.Series) -> float:
    s = _as_series(s).dropna()
    return float(s.mean()) if len(s) else np.nan

def _ensure_decimal_returns(R: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
    """Se R vier em %, converte para decimal; senão retorna como veio."""
    if R is None or not isinstance(R, pd.DataFrame) or R.empty:
        return R
    try:
        q95 = np.nanpercentile(np.abs(R.values.astype(float)), 95)
        return (R / 100.0) if q95 > 0.5 else R
    except Exception:
        return R

def _normalize_risky_only(W: pd.DataFrame, mode: str = "sum") -> pd.DataFrame:
    """
    mode='sum'    -> simplex (soma=1)  [paper]
    mode='abssum' -> divide por |sum(w)| (estilo PS_RO)
    """
    s = W.sum(axis=1)
    if mode == "abssum":
        s = s.abs()
    s = s.replace(0, np.nan)
    return W.div(s, axis=0)

def _compute_turnover_paper_from_weights(
    weights: Optional[pd.DataFrame],
    asset_rets: Optional[pd.DataFrame],
    mode: str = "sum"  # "sum" (paper) ou "abssum" (PS_RO)
) -> pd.Series:
    """
    TO_t = 0.5 * || w_t - w_t^+ ||_1, com w_t^+ = normalize( w_{t-1} * (1 + R_{t-1}) ).
    Usa DRIFT pelo retorno do mês anterior (t-1) — como no paper/.MAT.
    """
    if weights is None or not isinstance(weights, pd.DataFrame) or weights.shape[0] < 2:
        return pd.Series(dtype=float)
    if asset_rets is None or not isinstance(asset_rets, pd.DataFrame) or asset_rets.empty:
        return pd.Series(dtype=float)

    W = weights.copy().astype(float)
    R = _ensure_decimal_returns(asset_rets.copy())

    # alinhar e preencher retornos faltantes com 0.0
    R = R.reindex(index=W.index, columns=W.columns).fillna(0.0)

    # normaliza o alvo do mês t e aplica DRIFT do mês t-1
    Wn   = _normalize_risky_only(W, mode=mode)
    Wlag = Wn.shift(1)
    Rlag = R.shift(1)  # DRIFT com r_{t-1}
    Wpre = _normalize_risky_only(Wlag * (1.0 + Rlag), mode=mode)

    I = Wn.index.intersection(Wpre.index)
    return 0.5 * (Wn.loc[I].sub(Wpre.loc[I])).abs().sum(axis=1)

def _guess_weights_df(res) -> Optional[pd.DataFrame]:
    for attr in ["weights_oos", "weights_history", "weights", "w_oos", "w_t"]:
        if hasattr(res, attr):
            w = getattr(res, attr)
            if isinstance(w, pd.DataFrame) and w.shape[0] >= 2:
                return w.copy()
            try:
                wdf = pd.DataFrame(w)
                if wdf.shape[0] >= 2:
                    return wdf
            except Exception:
                pass
    return None

def _guess_asset_returns_df(res) -> Optional[pd.DataFrame]:
    for attr in ["asset_rets_oos", "rets_risky_oos", "r_risky_oos", "rets_assets_oos", "R_assets_oos", "R_oos"]:
        if hasattr(res, attr):
            r = getattr(res, attr)
            if isinstance(r, pd.DataFrame) and r.shape[0] >= 2:
                return r.copy()
            try:
                rdf = pd.DataFrame(r)
                if rdf.shape[0] >= 2:
                    return rdf
            except Exception:
                pass
    return None

def _compute_tau_series_no_prenorm(
    weights: Optional[pd.DataFrame],
    asset_rets: Optional[pd.DataFrame],
) -> pd.Series:
    """
    τ_t = sum_i | w_t(i) - w_t^+(i) |, com w_t^+ ∝ w_{t-1} ⊙ (1 + r_{t-1}).
    NÃO pré-normaliza w_t nem w_{t-1}; normaliza apenas o w_plus por 1'(...).
    Alinha por índice/colunas e usa r em decimal.
    """
    if weights is None or not isinstance(weights, pd.DataFrame) or weights.shape[0] < 2:
        return pd.Series(dtype=float)
    if asset_rets is None or not isinstance(asset_rets, pd.DataFrame) or asset_rets.empty:
        return pd.Series(dtype=float)

    W = weights.copy().astype(float)
    R = _ensure_decimal_returns(asset_rets.copy())

    # alinhar por índice e colunas
    R = R.reindex(index=W.index, columns=W.columns)

    # DRIFT com t-1
    Wlag = W.shift(1)
    Rlag = R.shift(1).fillna(0.0)

    Wpre = Wlag * (1.0 + Rlag)                       # NÃO pré-normaliza
    denom = Wpre.sum(axis=1).replace(0.0, np.nan)
    Wpre = Wpre.div(denom, axis=0)                   # normaliza apenas w_plus
    Wpre = Wpre.fillna(Wlag)                         # fallback seguro

    I = W.index.intersection(Wpre.index)
    return (W.loc[I] - Wpre.loc[I]).abs().sum(axis=1)  # sem 0.5 — escala “manual”

# ---------- sumário no formato do artigo ----------
def build_summary(
    results: Dict[str, object],
    gamma: float = 1.0,
    freq: str = "M",
    as_percent: bool = False,
    turnover_mode: str = "sum",
) -> pd.DataFrame:
    k, sqrt_k = _ann_factors(freq)

    # ----- baseline EW p/ "Turnover / 1/N"
    turn_ew_mean = None
    if "EW" in results:
        ew_res = results["EW"]
        if hasattr(ew_res, "turnover_paper") and ew_res.turnover_paper is not None:
            tew = _as_series(ew_res.turnover_paper).dropna()
            if len(tew): turn_ew_mean = float(tew.mean())
        if turn_ew_mean in (None, 0, np.nan):
            w_ew = _guess_weights_df(ew_res); r_ew = _guess_asset_returns_df(ew_res)
            if w_ew is not None and r_ew is not None:
                tew2 = _compute_turnover_paper_from_weights(w_ew, r_ew, mode=turnover_mode)
                if len(tew2): turn_ew_mean = float(tew2.mean())
        if turn_ew_mean in (None, 0, np.nan) and hasattr(ew_res, "turnover"):
            tew_legacy = _as_series(ew_res.turnover).dropna()
            if len(tew_legacy): turn_ew_mean = float(tew_legacy.mean())

    rows = []
    for name, res in results.items():
        # --- retornos OOS em excesso (mensais) ---
        r = _as_series(getattr(res, "rets_net_excess", None)).dropna()
        if len(r) == 0:
            sr_m = sr_a = ceq_m = ceq_a = np.nan
        else:
            mu = float(r.mean())
            var_m = float(r.var(ddof=1))
            sd = var_m ** 0.5
            sr_m = np.nan if (sd == 0 or np.isnan(sd)) else (mu / sd)
            sr_a = sr_m * sqrt_k if np.isfinite(sr_m) else np.nan
            ceq_m = mu - 0.5 * gamma * var_m
            ceq_a = k * mu - 0.5 * gamma * (k * var_m)

        # --- turnover padrão (engine / reconstrução / legado) ---
        t_series = pd.Series(dtype=float)
        if hasattr(res, "turnover_paper") and res.turnover_paper is not None:
            t_series = _as_series(res.turnover_paper).dropna()
        if len(t_series) == 0:
            w_df = _guess_weights_df(res); r_df = _guess_asset_returns_df(res)
            if w_df is not None and r_df is not None:
                t_series = _compute_turnover_paper_from_weights(w_df, r_df, mode=turnover_mode)
        if len(t_series) == 0 and hasattr(res, "turnover") and res.turnover is not None:
            t_series = _as_series(res.turnover).dropna()

        # ========== NOVO: override explícito APENAS para MV-MIN ==========
        # Usa results["MV-MIN"].turnover (a série que você já gerou/corrigiu).
        if str(name).upper() == "MV-MIN" and hasattr(res, "turnover") and res.turnover is not None:
            ts_mvmin = _as_series(res.turnover).dropna()
            # tentar converter índice p/ datetime e valores para float (seguro p/ dict com strings)
            try:
                ts_mvmin.index = pd.to_datetime(ts_mvmin.index)
            except Exception:
                pass
            ts_mvmin = pd.to_numeric(ts_mvmin, errors="coerce").dropna()
            if len(ts_mvmin):
                t_series = ts_mvmin.copy()
        # ================================================================

        t_abs = float(t_series.mean()) if len(t_series) else np.nan

        # ---------- HOTFIX APENAS PARA MP (inalterado) ----------
        t_rel_override = None
        if str(name).upper() == "MP" and "EW" in results:
            try:
                w_mp = _guess_weights_df(res);    r_mp = _guess_asset_returns_df(res)
                ew_res = results["EW"]
                w_ew = _guess_weights_df(ew_res); r_ew = _guess_asset_returns_df(ew_res)

                if (w_mp is not None) and (r_mp is not None) and (w_ew is not None) and (r_ew is not None):
                    common_cols = [c for c in w_mp.columns if c in w_ew.columns]
                    if not common_cols:
                        common_cols = list(w_mp.columns)
                    w_mp2 = w_mp[common_cols]; r_mp2 = r_mp[common_cols]
                    w_ew2 = w_ew[common_cols]; r_ew2 = r_ew[common_cols]

                    tau_mp_man = _compute_tau_series_no_prenorm(w_mp2, r_mp2)
                    tau_ew_man = _compute_tau_series_no_prenorm(w_ew2, r_ew2)

                    I = tau_mp_man.index.intersection(tau_ew_man.index)

                    tau_ew_mod = _as_series(getattr(ew_res, "turnover_paper", None)).dropna()
                    if len(tau_ew_mod) == 0:
                        tau_ew_mod = _compute_turnover_paper_from_weights(w_ew2, r_ew2, mode=turnover_mode)
                    ew_abs_mod = float(tau_ew_mod.loc[I].mean()) if len(I) else float(tau_ew_mod.mean())

                    ew_abs_man = float(tau_ew_man.loc[I].mean()) if len(I) else float(tau_ew_man.mean())
                    S = ew_abs_mod / ew_abs_man if (np.isfinite(ew_abs_mod) and np.isfinite(ew_abs_man) and ew_abs_man>0) else 1.0

                    mp_abs_scaled = float(tau_mp_man.loc[I].mean()) * S if len(I) else float(tau_mp_man.mean()) * S

                    t_abs = mp_abs_scaled
                    t_rel_override = (mp_abs_scaled / ew_abs_mod) if ew_abs_mod > 0 else None
            except Exception:
                pass
        # ---------- FIM HOTFIX MP ----------

        # --- denominador alinhado (padrão) ---
        turn_ew_mean_this = np.nan
        if "EW" in results:
            ew_res = results["EW"]
            tau_ew = pd.Series(dtype=float)
            if hasattr(ew_res, "turnover_paper") and ew_res.turnover_paper is not None:
                tau_ew = _as_series(ew_res.turnover_paper).dropna()
            if len(tau_ew) == 0:
                w_ew = _guess_weights_df(ew_res); r_ew = _guess_asset_returns_df(ew_res)
                if w_ew is not None and r_ew is not None:
                    w_df = _guess_weights_df(res)
                    if w_df is not None:
                        common_cols = [c for c in w_df.columns if c in w_ew.columns]
                        if len(common_cols):
                            w_ew = w_ew[common_cols]; r_ew = r_ew[common_cols]
                    tau_ew = _compute_turnover_paper_from_weights(w_ew, r_ew, mode=turnover_mode)
            # >>> alinhar datas com a série usada para esta estratégia (inclusive MV-MIN)
            if len(tau_ew) and len(t_series):
                I = tau_ew.index.intersection(t_series.index)
                if len(I):
                    turn_ew_mean_this = float(tau_ew.loc[I].mean())

        # relativo final
        if t_rel_override is not None:
            t_rel = t_rel_override
        elif np.isfinite(t_abs) and np.isfinite(turn_ew_mean_this) and turn_ew_mean_this > 0:
            t_rel = t_abs / turn_ew_mean_this
        elif hasattr(res, "turnover_rel_to_EW") and res.turnover_rel_to_EW is not None:
            t_rel = float(res.turnover_rel_to_EW)
        else:
            t_rel = np.nan

        row = {
            "Strategy": name,
            "Sharpe (monthly)": sr_m,
            "Sharpe (annual)": sr_a,
            "CEQ (monthly, γ=%.0f)" % gamma: ceq_m,
            "CEQ (annual, γ=%.0f)" % gamma: ceq_a,
            "Avg Turnover (abs.)": t_abs,
            "Turnover / 1/N": t_rel,
            "T (OOS)": int(len(r)) if len(r) else 0,
        }

        for attr in ["ceq_mat_pm", "ceq_mat", "CEQ_mat_pm", "CEQ_mat"]:
            if hasattr(res, attr):
                row["CEQ (monthly) [from .MAT]"] = float(getattr(res, attr))
                break

        if as_percent:
            row["CEQ (monthly, γ=%.0f)" % gamma] *= 100.0
            row["CEQ (annual, γ=%.0f)" % gamma]  *= 100.0
            if "CEQ (monthly) [from .MAT]" in row:
                row["CEQ (monthly) [from .MAT]"] *= 100.0

        rows.append(row)

    df = pd.DataFrame(rows).set_index("Strategy").sort_index()
    return df





In [15]:
# ---------- uso ----------
tbl = build_summary(results, gamma=1.0, freq="M", as_percent=False).round(4)

# ordem desejada (a da figura)
strategy_order = [
    "EW",
    "MV (in sample)",
    "MV",
    "BS",
    "DM",
    "MIN",
    "VW",
    "MP",
    "MV-C",
    "BS-C",
    "MIN-C",
    "G-MIN-C",
    "MV-MIN",
    "EW-MIN",
]

# mantém apenas as que existem no tbl e na ordem da figura
order_existing = [s for s in strategy_order if s in tbl.index]
tbl = tbl.loc[order_existing]

display(tbl)

out_csv = f"/content/DGU_2009_{BASE_DGU_TYPE}_Summary_latest.csv"
tbl.to_csv(out_csv, float_format="%.6f")
print("Resumo salvo em:", out_csv)


Unnamed: 0_level_0,Sharpe (monthly),Sharpe (annual),"CEQ (monthly, γ=1)","CEQ (annual, γ=1)",Avg Turnover (abs.),Turnover / 1/N,T (OOS)
Strategy,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
EW,0.224,0.776,0.0039,0.0473,0.0238,1.0,377
MV (in sample),0.2866,0.9928,0.0047,0.0559,0.0236,0.9891,377
MV,0.2186,0.7572,0.0045,0.0538,0.0673,2.8226,377
BS,0.2536,0.8786,0.0043,0.0516,0.0438,1.8375,377
DM,0.0016,0.0055,-0.0084,-0.1011,1.7977,75.4499,377
MIN,0.2493,0.8634,0.0039,0.0464,0.0264,1.108,377
VW,0.1138,0.3941,0.0042,0.0507,0.0,0.0,377
MP,-0.0002,-0.0006,-0.0026,-0.0313,1.4045,58.9458,377
MV-C,0.1082,0.3749,0.003,0.0364,0.0983,4.1263,377
BS-C,0.1514,0.5243,0.0038,0.0456,0.0864,3.627,377


Resumo salvo em: /content/DGU_2009_SMB_HML_Summary_latest.csv


In [16]:
def sharpe_monthly_from_results(results: dict, tbl: pd.DataFrame,
                                col_name: str = "Sharpe (monthly)",
                                label_map: dict = None) -> pd.Series:
    """
    Lê o Sharpe mensal diretamente do 'tbl' (saída do build_summary),
    usando os nomes das estratégias presentes em 'results' como guia.

    Parâmetros
    ----------
    results : dict
        Dicionário de resultados apenas para definir o conjunto de estratégias (chaves).
    tbl : pd.DataFrame
        Tabela-resumo com índice = nome da estratégia e coluna 'Sharpe (monthly)'.
    col_name : str
        Nome da coluna do Sharpe mensal em 'tbl'.
    label_map : dict
        Mapeamento opcional de rótulos (ex.: {"MV (in sample)": "MV (in sample)"}).

    Retorno
    -------
    pd.Series
        Série com o Sharpe mensal para cada estratégia (nomeada 'Sharpe_m_tbl').
    """

    # mapeamento padrão (pode ajustar conforme seus rótulos no tbl)
    if label_map is None:
        label_map = {
            "EW": "EW",
            "MV": "MV",
            "BS": "BS",
            "DM": "DM",
            "MIN": "MIN",
            "VW": "VW",
            "MP": "MP",
            "MV-C": "MV-C",
            "BS-C": "BS-C",
            "MIN-C": "MIN-C",
            "G-MIN-C": "G-MIN-C",
            "MV-MIN": "MV-MIN",
            "EW-MIN": "EW-MIN",
            "MV (in sample)": "MV (in sample)",
            "MV (in-sample)": "MV (in sample)",
            "MV_in_sample": "MV (in sample)",
            "MV in sample": "MV (in sample)",
        }

    def _fetch_tbl_sharpe_monthly(_tbl: pd.DataFrame, key: str, col: str) -> float:
        # tentativa direta
        if key in _tbl.index:
            val = _tbl.loc[key, col]
            return float(val) if pd.notna(val) else np.nan
        # tentativa case-insensitive
        key_norm = str(key).strip().lower()
        matches = [idx for idx in _tbl.index if str(idx).strip().lower() == key_norm]
        if matches:
            val = _tbl.loc[matches[0], col]
            return float(val) if pd.notna(val) else np.nan
        return np.nan

    sr = {}
    for name in results.keys():
        canon = label_map.get(name, name)
        sr[name] = _fetch_tbl_sharpe_monthly(tbl, canon, col_name)

    return pd.Series(sr, name="Sharpe_m_tbl")



In [17]:
mine=sharpe_monthly_from_results(results, tbl)  # lê do tbl
# --- Sharpe (mensal) do PAPER — Table 3 (coluna Industry portfolios) ---
paper_industry_sr = {
    "EW":       get_value_by_strategy("EW", BASE_DGU_TYPE ,df_sharpe),   # 1/N
    "MV (in sample)": get_value_by_strategy("MV (in sample)", BASE_DGU_TYPE ,df_sharpe),
    "MV":       get_value_by_strategy("MV", BASE_DGU_TYPE ,df_sharpe),
    "BS":       get_value_by_strategy("BS", BASE_DGU_TYPE ,df_sharpe),
    "DM":       get_value_by_strategy("DM", BASE_DGU_TYPE ,df_sharpe),
    "MIN":      get_value_by_strategy("MIN", BASE_DGU_TYPE ,df_sharpe),
    "VW":       get_value_by_strategy("VW", BASE_DGU_TYPE ,df_sharpe),
    "MP":       get_value_by_strategy("MP", BASE_DGU_TYPE ,df_sharpe),
    "MV-C":     get_value_by_strategy("MV-C", BASE_DGU_TYPE ,df_sharpe),
    "BS-C":     get_value_by_strategy("BS-C", BASE_DGU_TYPE ,df_sharpe),
    "MIN-C":    get_value_by_strategy("MIN-C", BASE_DGU_TYPE ,df_sharpe),
    "G-MIN-C":  get_value_by_strategy("G-MIN-C", BASE_DGU_TYPE ,df_sharpe),
    "MV-MIN":   get_value_by_strategy("MV-MIN", BASE_DGU_TYPE ,df_sharpe),
    "EW-MIN":   get_value_by_strategy("EW-MIN", BASE_DGU_TYPE ,df_sharpe),
}

# --- (Opcional) Mapeie rótulos se seus nomes diferirem dos do paper ---
label_map = {
    "EW": "EW",
    "MV": "MV",
    "BS": "BS",
    "DM": "DM",
    "MIN": "MIN",
    "VW": "VW",
    "MP": "MP",
    "MV-C": "MV-C",
    "BS-C": "BS-C",
    "MIN-C": "MIN-C",
    "G-MIN-C": "G-MIN-C",
    "MV-MIN": "MV-MIN",
    "EW-MIN": "EW-MIN",
    "MV (in sample)": "MV (in sample)",
}

# Interseção entre o que você rodou e o que o paper reporta
common = [k for k in mine.index if label_map.get(k, k) in paper_industry_sr]

cmp = pd.DataFrame({
    "Sharpe_m_calc": mine.reindex(common),
    "Sharpe_m_paper": pd.Series({k: paper_industry_sr[label_map[k]] for k in common})
})
cmp["Δ (calc- paper)"] = cmp["Sharpe_m_calc"] - cmp["Sharpe_m_paper"]
cmp["Δ%"] = 100 * cmp["Δ (calc- paper)"] / cmp["Sharpe_m_paper"]

# Flag simples (ok se dentro de 0.01 ponto absoluto ou 10% relativo)
cmp["flag"] = np.where(
    (cmp["Δ (calc- paper)"].abs() <= 0.01) | (cmp["Δ%"].abs() <= 10),
    "ok", "diff"
)

# (Opcional) também exiba Sharpe anual (√12) lado a lado
cmp["Sharpe_a_calc"]  = cmp["Sharpe_m_calc"] * np.sqrt(12)
cmp["Sharpe_a_paper"] = cmp["Sharpe_m_paper"] * np.sqrt(12)

display(cmp.round({
    "Sharpe_m_calc": 4, "Sharpe_m_paper": 4, "Δ (calc- paper)": 4, "Δ%": 1,
    "Sharpe_a_calc": 4, "Sharpe_a_paper": 4
}))

Unnamed: 0,Sharpe_m_calc,Sharpe_m_paper,Δ (calc- paper),Δ%,flag,Sharpe_a_calc,Sharpe_a_paper
EW,0.224,0.224,0.0,0.0,ok,0.776,0.776
MV,0.2186,0.2186,0.0,0.0,ok,0.7573,0.7573
BS,0.2536,0.2536,0.0,0.0,ok,0.8785,0.8785
DM,0.0016,0.0016,0.0,0.0,ok,0.0055,0.0055
MIN,0.2493,0.2493,0.0,0.0,ok,0.8636,0.8636
VW,0.1138,0.1138,0.0,0.0,ok,0.3942,0.3942
MP,-0.0002,-0.0002,0.0,-0.0,ok,-0.0007,-0.0007
MV-C,0.1082,0.1084,-0.0002,-0.2,ok,0.3748,0.3755
BS-C,0.1514,0.1514,0.0,0.0,ok,0.5245,0.5245
MIN-C,0.2493,0.2493,0.0,0.0,ok,0.8636,0.8636


In [18]:
def ceq_monthly_from_results(results: dict,
                             tbl: pd.DataFrame,
                             gamma: float = 1.0,
                             label_map: dict = None) -> pd.Series:
    """
    Lê o CEQ mensal diretamente de 'tbl' (saída do build_summary),
    usando os nomes das estratégias presentes em 'results' como guia.

    - Procura por colunas do tipo:
        "CEQ (monthly, γ=1)", "CEQ (monthly, g=1)", "CEQ (monthly, gamma=1)", "CEQ (monthly)"
    - Se os valores aparentarem estar em p.p. (percentual), converte para decimal.
    """

    # 1) coluna alvo em 'tbl'
    g_int = int(gamma) if float(gamma).is_integer() else gamma
    col_candidates = [
        f"CEQ (monthly, γ={g_int})",
        f"CEQ (monthly, g={g_int})",
        f"CEQ (monthly, gamma={g_int})",
        "CEQ (monthly)"  # fallback
    ]
    col = next((c for c in col_candidates if c in tbl.columns), None)
    if col is None:
        raise KeyError(f"Não encontrei coluna de CEQ mensal no tbl. Procurei: {col_candidates}")

    # 2) mapeamento de rótulos (ajuste se seus nomes diferirem)
    if label_map is None:
        label_map = {
            "EW": "EW",
            "MV": "MV",
            "BS": "BS",
            "DM": "DM",
            "MIN": "MIN",
            "VW": "VW",
            "MP": "MP",
            "MV-C": "MV-C",
            "BS-C": "BS-C",
            "MIN-C": "MIN-C",
            "G-MIN-C": "G-MIN-C",
            "MV-MIN": "MV-MIN",
            "EW-MIN": "EW-MIN",
            "MV (in sample)": "MV (in sample)",
            "MV (in-sample)": "MV (in sample)",
            "MV_in_sample": "MV (in sample)",
            "MV in sample": "MV (in sample)",
        }

    def _fetch_tbl_ceq_monthly(_tbl: pd.DataFrame, key: str, colname: str) -> float:
        # acesso direto
        if key in _tbl.index:
            v = _tbl.loc[key, colname]
            return float(v) if pd.notna(v) else np.nan
        # case-insensitive
        kn = str(key).strip().lower()
        matches = [idx for idx in _tbl.index if str(idx).strip().lower() == kn]
        if matches:
            v = _tbl.loc[matches[0], colname]
            return float(v) if pd.notna(v) else np.nan
        return np.nan

    # 3) monta a série a partir do tbl
    ceq = {}
    for name in results.keys():
        canon = label_map.get(name, name)
        ceq[name] = _fetch_tbl_ceq_monthly(tbl, canon, col)

    s = pd.Series(ceq, name="CEQ_m_tbl")

    # 4) auto-correção de escala (se estiver em percentuais, converte p/ decimal)
    # Heurística robusta: CEQs reais ~< 2%/mês → |mediana| > 0.2 sugere p.p.
    if s.dropna().abs().median() > 0.2:
        s = s / 100.0

    return s


In [19]:
mine=ceq_monthly_from_results(results, tbl, gamma=1.0)
# --- CEQs do paper (Table 4, Industry Portf., todos em DECIMAL por mês) ---
paper_ceq = {
    "EW":      get_value_by_strategy("EW", BASE_DGU_TYPE ,df_ceq),   # 1/N
    "MV (in sample)": get_value_by_strategy("MV (in sample)", BASE_DGU_TYPE ,df_ceq),
    "MV":     get_value_by_strategy("MV", BASE_DGU_TYPE ,df_ceq),
    "BS":     get_value_by_strategy("BS", BASE_DGU_TYPE ,df_ceq),
    "DM":     get_value_by_strategy("DM", BASE_DGU_TYPE ,df_ceq),   # ω = 0.50
    "MIN":     get_value_by_strategy("MIN", BASE_DGU_TYPE ,df_ceq),
    "VW":      get_value_by_strategy("VW", BASE_DGU_TYPE ,df_ceq),
    "MP":      get_value_by_strategy("MP", BASE_DGU_TYPE ,df_ceq),
    "MV-C":    get_value_by_strategy("MV-C", BASE_DGU_TYPE ,df_ceq),
    "BS-C":    get_value_by_strategy("BS-C", BASE_DGU_TYPE ,df_ceq),
    "MIN-C":   get_value_by_strategy("MIN-C", BASE_DGU_TYPE ,df_ceq),
    "G-MIN-C": get_value_by_strategy("G-MIN-C", BASE_DGU_TYPE ,df_ceq),
    "MV-MIN": get_value_by_strategy("MV-MIN", BASE_DGU_TYPE ,df_ceq),
    "EW-MIN": get_value_by_strategy("EW-MIN", BASE_DGU_TYPE ,df_ceq),
}

# Mapeamento simples de labels (ajuste caso seu results use rótulos diferentes)
label_map = {
    "EW": "EW",
    "MV": "MV",
    "MIN": "MIN",
    "VW": "VW",
    "MP": "MP",
    "MV-C": "MV-C",
    "BS-C": "BS-C",
    "MIN-C": "MIN-C",
    "G-MIN-C": "G-MIN-C",
    "MV-MIN": "MV-MIN",
    "EW-MIN": "EW-MIN",
    "BS": "BS",
    "DM": "DM",
    "MV (in sample)": "MV (in sample)",
}

# Constrói DataFrame de comparação só para as estratégias que você de fato rodou
common = []
for k in mine.index:
    pkey = label_map.get(k, k)
    if pkey in paper_ceq:
        common.append(k)

cmp = pd.DataFrame({
    "CEQ_m_calc": mine.reindex(common),
    "CEQ_m_paper": pd.Series({k: paper_ceq[label_map[k]] for k in common})
})
cmp["Δ (calc- paper)"] = cmp["CEQ_m_calc"] - cmp["CEQ_m_paper"]
cmp["|Δ| (bps)"] = 10000 * cmp["Δ (calc- paper)"]   # diferença em bps/mês
# “ok” se dentro de 5 bps/mês (pode ajustar)
cmp["flag"] = np.where(cmp["|Δ| (bps)"].abs() <= 5, "ok", "diff")

# >>> Exibição sem notação científica
fmt = {
    "CEQ_m_calc": "{:.4f}",
    "CEQ_m_paper": "{:.4f}",
    "Δ (calc- paper)": "{:.4f}",
    "|Δ| (bps)": "{:.1f}",
}
display(cmp.style.format(fmt))


Unnamed: 0,CEQ_m_calc,CEQ_m_paper,Δ (calc- paper),|Δ| (bps),flag
EW,0.0039,0.0039,0.0,0.0,ok
MV,0.0045,0.0045,0.0,0.0,ok
BS,0.0043,0.0043,0.0,0.0,ok
DM,-0.0084,-0.0084,0.0,0.0,ok
MIN,0.0039,0.0039,0.0,0.0,ok
VW,0.0042,0.0042,0.0,0.0,ok
MP,-0.0026,-0.0026,0.0,0.0,ok
MV-C,0.003,0.003,0.0,0.0,ok
BS-C,0.0038,0.0038,0.0,0.0,ok
MIN-C,0.0039,0.0039,0.0,0.0,ok


In [20]:
def _pick_col(tbl: pd.DataFrame, candidates):
    """Retorna o primeiro nome de coluna existente em tbl dentre os candidatos."""
    for c in candidates:
        if c in tbl.columns:
            return c
    return None

# ---- 1) Seus números (lendo do tbl) ----
# candidatos para nomes com _calc
col_abs_calc = _pick_col(tbl, ["Turnover_abs_calc", "Turnover (abs.)_calc", "Turnover (abs)_calc"])
col_rel_calc = _pick_col(tbl, ["Rel_turnover_calc", "Rel Turnover_calc", "Turnover_rel_calc"])

# fallbacks: nomes do build_summary
if col_abs_calc is None:
    col_abs_calc = _pick_col(tbl, ["Avg Turnover (abs.)", "Average Turnover (abs.)"])
if col_rel_calc is None:
    col_rel_calc = _pick_col(tbl, ["Turnover / 1/N", "Rel Turnover"])

# índices-alvo
idx = sorted(set(list(results.keys())) | {"EW"})

# absoluto (se não houver, fica NaN)
mine_abs = (tbl[col_abs_calc] if col_abs_calc in tbl.columns else pd.Series(dtype=float)).reindex(idx)
mine_abs.name = "Turnover_abs_calc"

# relativo (se não houver no tbl, calculamos a partir do absoluto e do EW)
if col_rel_calc in tbl.columns:
    mine_rel = tbl[col_rel_calc].reindex(idx).rename("Rel_turnover_calc")
else:
    t_ew = mine_abs.get("EW", np.nan)
    mine_rel = (mine_abs / t_ew).rename("Rel_turnover_calc")

# ---- 2) Valores do paper (Table 5, Industry portfolios) ----

paper_rel = pd.Series({
    "MV":      get_value_by_strategy("MV", BASE_DGU_TYPE, df_turnover_rel),
    "BS":      get_value_by_strategy("BS", BASE_DGU_TYPE, df_turnover_rel),
    "DM":      get_value_by_strategy("DM", BASE_DGU_TYPE, df_turnover_rel),
    "MIN":     get_value_by_strategy("MIN", BASE_DGU_TYPE, df_turnover_rel),
    "VW":      get_value_by_strategy("VW", BASE_DGU_TYPE, df_turnover_rel),
    "MP":      get_value_by_strategy("MP", BASE_DGU_TYPE, df_turnover_rel),
    "MV-C":    get_value_by_strategy("MV-C", BASE_DGU_TYPE, df_turnover_rel),
    "BS-C":    get_value_by_strategy("BS-C", BASE_DGU_TYPE, df_turnover_rel),
    "MIN-C":   get_value_by_strategy("MIN-C", BASE_DGU_TYPE, df_turnover_rel),
    "G-MIN-C": get_value_by_strategy("G-MIN-C", BASE_DGU_TYPE, df_turnover_rel),
    "MV-MIN":  get_value_by_strategy("MV-MIN", BASE_DGU_TYPE, df_turnover_rel),
    "EW-MIN":  get_value_by_strategy("EW-MIN", BASE_DGU_TYPE, df_turnover_rel),
}, name="Rel_turnover_paper")

# alinhar 'paper_rel' ao índice alvo (paper não reporta MV (in sample) no relativo)
paper_rel_aligned = pd.Series({k: paper_rel.get(k, np.nan) for k in idx}, name="Rel_turnover_paper")

# ---- 3) Monta DataFrame de comparação ----
df_calc = pd.DataFrame({
    "Turnover_abs_calc": mine_abs,
    "Rel_turnover_calc": mine_rel
})

cmp = pd.concat([df_calc, paper_rel_aligned], axis=1)
# ---- 4) Métricas de diferença e flags ----
def flag_rel(a, b, tol_ratio=0.10, tol_abs=5.0):
    """
    Marca 'ok' se:
      - ambos não-nulos e
      - diferença absoluta <= tol_abs  OU
      - diferença relativa |a-b|/max(|b|,1) <= tol_ratio.
    """
    if pd.isna(a) or pd.isna(b):
        return ""
    if abs(a - b) <= tol_abs:
        return "ok"
    denom = max(abs(b), 1.0)
    return "ok" if abs(a - b) / denom <= tol_ratio else "diff"


cmp["Δ_rel"]    = cmp["Rel_turnover_calc"] - cmp["Rel_turnover_paper"]
cmp["flag_rel"] = [
    flag_rel(cmp.loc[i, "Rel_turnover_calc"], cmp.loc[i, "Rel_turnover_paper"])
    for i in cmp.index
]

# Ordenação (EW primeiro)
order = ["EW"] + sorted([i for i in cmp.index if i != "EW"])
cmp = cmp.reindex(order)


display(cmp.round({
    "Turnover_abs_calc": 4, "Δ_abs_EW": 4,
    "Rel_turnover_calc": 2, "Rel_turnover_paper": 2, "Δ_rel": 2
}))

Unnamed: 0,Turnover_abs_calc,Rel_turnover_calc,Rel_turnover_paper,Δ_rel,flag_rel
EW,0.0238,1.0,,,
BS,0.0438,1.84,1.85,-0.01,ok
BS-C,0.0864,3.63,3.65,-0.02,ok
DM,1.7977,75.45,76.3,-0.85,ok
EW-MIN,0.0265,1.11,1.11,0.0,ok
G-MIN-C,0.0259,1.09,1.09,-0.0,ok
MIN,0.0264,1.11,1.11,-0.0,ok
MIN-C,0.0264,1.11,1.11,-0.0,ok
MP,1.4045,58.95,59.41,-0.46,ok
MV,0.0673,2.82,2.83,-0.01,ok


In [22]:
def serialize_result_object(result_obj):
    serialized_data = {}
    if hasattr(result_obj, 'rets_net_excess') and result_obj.rets_net_excess is not None:
        # Convert Series index (Timestamps) to string before converting to dict
        serialized_data['rets_net_excess'] = result_obj.rets_net_excess.rename(index=str).to_dict()
    if hasattr(result_obj, 'turnover') and result_obj.turnover is not None:
        # Convert Series index (Timestamps) to string before converting to dict
        serialized_data['turnover'] = result_obj.turnover.rename(index=str).to_dict()
    if hasattr(result_obj, 'diag') and result_obj.diag is not None:
        # Convert DataFrame to a list of dictionaries (records) - this is already fine for JSON
        serialized_data['diag'] = result_obj.diag.to_dict(orient='records')
    return serialized_data

serialized_results = {}
for strategy_name, result_obj in results.items():
    serialized_results[strategy_name] = serialize_result_object(result_obj)

output_file_path = '/content/results_export.txt'
with open(output_file_path, 'w') as f:
    json.dump(serialized_results, f, indent=4) # Use indent for readability

print(f"Results successfully exported to {output_file_path}")

Results successfully exported to /content/results_export.txt


P-valores do Sharpe

In [23]:
def _normcdf(x):
    return 0.5*(1.0 + erf(x/sqrt(2.0)))

def _as_1d(s):
    a = np.asarray(pd.Series(s).dropna().values, float).reshape(-1)
    return a

def pval_sharpe_eq_zero(r):
    r = _as_1d(r)
    T  = len(r)
    if T == 0:
        return np.nan, np.nan, T
    mu = float(np.mean(r))
    sd = float(np.std(r, ddof=1))
    if not (isfinite(sd) and sd > 0):
        return np.nan, np.nan, T
    sr = mu / sd
    se = sqrt((1.0 + 0.5*sr*sr) / T)           # usado pelos autores
    z  = sr / se
    p  = 1.0 - _normcdf(abs(z))
    return sr, p, T

def jk_pvalue_vs_bench(r_bench, r):
    r1 = _as_1d(r_bench)
    r2 = _as_1d(r)
    T  = min(len(r1), len(r2))
    r1, r2 = r1[-T:], r2[-T:]
    mu1, mu2 = float(np.mean(r1)), float(np.mean(r2))
    sd1, sd2 = float(np.std(r1, ddof=1)), float(np.std(r2, ddof=1))
    cov12    = float(np.mean((r1 - mu1)*(r2 - mu2)))
    theta = (1.0/T)*(2*sd1**2*sd2**2 - 2*sd1*sd2*cov12
                     + 0.5*(mu1**2)*(sd2**2) + 0.5*(mu2**2)*(sd1**2)
                     - (mu1*mu2/(sd1*sd2))*cov12**2)
    if not (isfinite(theta) and theta > 0 and isfinite(sd1) and sd1>0 and isfinite(sd2) and sd2>0):
        return np.nan, np.nan, T
    zjk = (sd2*mu1 - sd1*mu2)/sqrt(theta)
    pjk = 1.0 - _normcdf(abs(zjk))             # convenção dos autores
    return zjk, pjk, T

# --- usar com o dicionário 'out' do seu backtest ---
bench = "EW"
r_bench = results[bench].rets_net_excess

rows = []
for k, res in results.items():
    r = res.rets_net_excess
    sr, p0, T0 = pval_sharpe_eq_zero(r)
    if k == bench:
        rows.append(dict(strategy=k, T=T0, Sharpe=sr, pval_SR_eq_0=p0,
                         z_JK_vs_EW=np.nan, pval_JK_vs_EW=np.nan))
    else:
        zjk, pjk, Tjk = jk_pvalue_vs_bench(r_bench, r)
        rows.append(dict(strategy=k, T=min(T0,Tjk), Sharpe=sr, pval_SR_eq_0=p0,
                         z_JK_vs_EW=zjk, pval_JK_vs_EW=pjk))

pvals_df = pd.DataFrame(rows).sort_values("strategy").reset_index(drop=True)
print(pvals_df.to_string(index=False))


      strategy   T    Sharpe  pval_SR_eq_0  z_JK_vs_EW  pval_JK_vs_EW
            BS 377  0.253630  6.258935e-07   -0.683285       0.247213
          BS-C 377  0.151364  1.737468e-03    1.355025       0.087705
            DM 377  0.001596  4.876386e-01    3.042874       0.001172
            EW 377  0.224026  8.686701e-06         NaN            NaN
        EW-MIN 377  0.250253  8.562295e-07   -0.927293       0.176887
       G-MIN-C 377  0.246726  1.183497e-06   -0.678083       0.248860
           MIN 377  0.249255  9.387002e-07   -0.737411       0.230436
         MIN-C 377  0.249255  9.387002e-07   -0.737411       0.230436
            MP 377 -0.000178  4.986185e-01    3.321831       0.000447
            MV 377  0.218590  1.367712e-05    0.098802       0.460648
MV (in sample) 377  0.286599  2.464047e-08   -1.776079       0.037860
          MV-C 377  0.108229  1.807249e-02    2.081067       0.018714
        MV-MIN 377  0.254747  5.638161e-07   -0.722484       0.234998
            VW 377  

In [25]:
from math import erf, sqrt, isfinite
import numpy as np
import pandas as pd

def _normcdf(x):
    return 0.5 * (1.0 + erf(x / sqrt(2.0)))

def _as_1d(s):
    a = np.asarray(pd.Series(s).dropna().values, float).reshape(-1)
    return a

def pval_sharpe_eq_zero(r):
    r = _as_1d(r)
    T  = len(r)
    if T == 0:
        return np.nan, np.nan, T
    mu = float(np.mean(r))
    sd = float(np.std(r, ddof=1))
    if not (isfinite(sd) and sd > 0):
        return np.nan, np.nan, T
    sr = mu / sd
    # erro-padrão usado pelos autores
    se = sqrt((1.0 + 0.5 * sr * sr) / T)
    z  = sr / se
    p  = 1.0 - _normcdf(abs(z))
    return sr, p, T

def jk_pvalue_vs_bench(r_bench, r):
    r1 = _as_1d(r_bench)
    r2 = _as_1d(r)
    T  = min(len(r1), len(r2))
    r1, r2 = r1[-T:], r2[-T:]
    mu1, mu2 = float(np.mean(r1)), float(np.mean(r2))
    sd1, sd2 = float(np.std(r1, ddof=1)), float(np.std(r2, ddof=1))
    cov12    = float(np.mean((r1 - mu1) * (r2 - mu2)))
    theta = (1.0 / T) * (
        2 * sd1**2 * sd2**2 - 2 * sd1 * sd2 * cov12
        + 0.5 * (mu1**2) * (sd2**2) + 0.5 * (mu2**2) * (sd1**2)
        - (mu1 * mu2 / (sd1 * sd2)) * cov12**2
    )
    if not (isfinite(theta) and theta > 0 and
            isfinite(sd1) and sd1 > 0 and
            isfinite(sd2) and sd2 > 0):
        return np.nan, np.nan, T
    zjk = (sd2 * mu1 - sd1 * mu2) / sqrt(theta)
    pjk = 1.0 - _normcdf(abs(zjk))  # convenção dos autores
    return zjk, pjk, T

# ===================== Montagem da tabela ===================== #

bench = "EW"
r_bench = results[bench].rets_net_excess

# ordem das estratégias no painel (igual à figura)
strategy_order = [
    "EW",
    "MV (in sample)",
    "MV",
    "BS",
    "DM",
    "MIN",
    "VW",
    "MP",
    "MV-C",
    "BS-C",
    "MIN-C",
    "G-MIN-C",
    "MV-MIN",
    "EW-MIN",
]

# nomes bonitos para imprimir
display_name = {
    "EW": "EW (1/N)",
    "DM": "DM (σ_α = 1%)",
}

# mapa de ordem pelas *chaves* das estratégias
order_map = {k: i for i, k in enumerate(strategy_order)}
# mapa de ordem pelos nomes exibidos (para usar no panel_df)
name_order_map = {display_name.get(k, k): order_map[k] for k in strategy_order}

# estratégias em que NÃO queremos linha de p-valor JK no painel
skip_jk = {bench, "MV (in sample)"}

rows_stats = []   # tabela técnica com SR, p0, JK etc.
panel_rows = []   # tabela no formato da figura

for k in strategy_order:
    if k not in results:
        continue  # se alguma não existir no dict, pula

    res = results[k]
    r = res.rets_net_excess

    sr, p0, T0 = pval_sharpe_eq_zero(r)
    name = display_name.get(k, k)

    if k in skip_jk:
        # só Sharpe, sem JK
        rows_stats.append(dict(
            strategy=k,
            T=T0,
            Sharpe=sr,
            pval_SR_eq_0=p0,
            z_JK_vs_EW=np.nan,
            pval_JK_vs_EW=np.nan,
        ))
        panel_rows.append(dict(
            Estratégia=name,
            # Sharpe com 4 casas decimais
            **{"1963-2004": f"{sr:.4f}"}
        ))
    else:
        zjk, pjk, Tjk = jk_pvalue_vs_bench(r_bench, r)
        rows_stats.append(dict(
            strategy=k,
            T=min(T0, Tjk),
            Sharpe=sr,
            pval_SR_eq_0=p0,
            z_JK_vs_EW=zjk,
            pval_JK_vs_EW=pjk,
        ))
        # 1ª linha: Sharpe (4 casas)
        panel_rows.append(dict(
            Estratégia=name,
            **{"1963-2004": f"{sr:.4f}"}
        ))
        # 2ª linha: p-valor JK vs EW entre parênteses (2 casas)
        panel_rows.append(dict(
            Estratégia="",
            **{"1963-2004": f"({pjk:.2f})"}
        ))

# -------- DataFrames finais com ordem garantida -------- #

# pvals_df na ordem de strategy_order
pvals_df = pd.DataFrame(rows_stats)
pvals_df["ord"] = pvals_df["strategy"].map(order_map)
pvals_df = pvals_df.sort_values("ord").drop(columns="ord").reset_index(drop=True)

# panel_df na mesma ordem (incluindo linhas em branco logo abaixo de cada estratégia)
panel_df = pd.DataFrame(panel_rows, columns=["Estratégia", "1963-2004"])
panel_df["ord"] = panel_df["Estratégia"].map(name_order_map)
panel_df["ord"] = panel_df["ord"].ffill()          # linhas em branco herdam a ordem da estratégia de cima
panel_df["idx0"] = np.arange(len(panel_df))        # para manter estabilidade dentro de cada bloco
panel_df = (
    panel_df.sort_values(["ord", "idx0"], kind="mergesort")
            .drop(columns=["ord", "idx0"])
            .reset_index(drop=True)
)

print("Tabela de estatísticas (completa):")
print(pvals_df.to_string(index=False))




Tabela de estatísticas (completa):
      strategy   T    Sharpe  pval_SR_eq_0  z_JK_vs_EW  pval_JK_vs_EW
            EW 377  0.224026  8.686701e-06         NaN            NaN
MV (in sample) 377  0.286599  2.464047e-08         NaN            NaN
            MV 377  0.218590  1.367712e-05    0.098802       0.460648
            BS 377  0.253630  6.258935e-07   -0.683285       0.247213
            DM 377  0.001596  4.876386e-01    3.042874       0.001172
           MIN 377  0.249255  9.387002e-07   -0.737411       0.230436
            VW 377  0.113771  1.383518e-02    2.813974       0.002447
            MP 377 -0.000178  4.986185e-01    3.321831       0.000447
          MV-C 377  0.108229  1.807249e-02    2.081067       0.018714
          BS-C 377  0.151364  1.737468e-03    1.355025       0.087705
         MIN-C 377  0.249255  9.387002e-07   -0.737411       0.230436
       G-MIN-C 377  0.246726  1.183497e-06   -0.678083       0.248860
        MV-MIN 377  0.254747  5.638161e-07   -0.722484 

P-valores do CEQ

In [27]:
import numpy as np, pandas as pd
from math import erf, sqrt, isfinite

# ===== Helpers =====
def _normcdf(x):
    return 0.5*(1.0 + erf(x/sqrt(2.0)))

def _as_1d(s):
    a = np.asarray(pd.Series(s).dropna().values, float).reshape(-1)
    return a

# CEQ = mean - (gamma/2)*var  (var amostral: ddof=1)
def ceq_value(r, gamma=1.0):
    r = _as_1d(r)
    if len(r) == 0:
        return np.nan, 0
    mu  = float(np.mean(r))
    var = float(np.var(r, ddof=1))
    return (mu - 0.5*gamma*var), len(r)

# Teste assintótico de diferença de CEQ (vs. EW)
def ceq_pvalue_vs_bench(r_bench, r, gamma=1.0):
    r1 = _as_1d(r_bench)
    r2 = _as_1d(r)
    T  = min(len(r1), len(r2))
    if T == 0:
        return np.nan, np.nan, 0

    r1, r2 = r1[-T:], r2[-T:]

    # Diferença de CEQ: (EW - estratégia)
    mu1, mu2   = float(np.mean(r1)), float(np.mean(r2))
    var1, var2 = float(np.var(r1, ddof=1)), float(np.var(r2, ddof=1))
    diff_ceq   = (mu1 - 0.5*gamma*var1) - (mu2 - 0.5*gamma*var2)

    # Omega por blocos (mesmas fórmulas do .m)
    C = np.cov(r1, r2, ddof=1)    # [[var1, cov12],[cov12, var2]]
    Z = np.zeros((2,2), float)
    B = 2.0 * np.square(C)        # 2 * (elementwise square)
    Omega = np.block([[C, Z],
                      [Z, B]])

    R = np.array([1.0, -1.0, -0.5*gamma, 0.5*gamma], float)  # [μ1, μ2, s1^2, s2^2]
    denom2 = float(R @ Omega @ R.T)
    if not (isfinite(denom2) and denom2 > 0.0):
        return np.nan, np.nan, T

    z = (diff_ceq * sqrt(T)) / sqrt(denom2)
    p = 1.0 - _normcdf(abs(z))    # convenção (unicaudal)
    return z, p, T

# ===================== Montagem da tabela ===================== #

gamma = 1.0
bench = "EW"
r_bench = results[bench].rets_net_excess

# ordem das estratégias no painel (pode ajustar se quiser)
strategy_order = [
    "EW",
    "MV (in sample)",
    "MV",
    "BS",
    "DM",
    "MIN",
    "VW",
    "MP",
    "MV-C",
    "BS-C",
    "MIN-C",
    "G-MIN-C",
    "MV-MIN",
    "EW-MIN",
]

# nomes bonitos
display_name = {
    "EW": "EW (1/N)",
    "DM": "DM (σ_α = 1%)",
}

# mapa de ordem (pelas chaves) e pelos nomes exibidos
order_map      = {k: i for i, k in enumerate(strategy_order)}
name_order_map = {display_name.get(k, k): order_map[k] for k in strategy_order}

# agora pulamos p-valor de EW **e** de MV (in sample)
skip_pval = {bench, "MV (in sample)"}

rows_stats = []   # tabela técnica com CEQ, p-valor etc.
panel_rows = []   # tabela no formato da figura

for k in strategy_order:
    if k not in results:
        continue

    res = results[k]
    r = res.rets_net_excess

    ceq_k, T_k = ceq_value(r, gamma=gamma)
    name = display_name.get(k, k)

    if k in skip_pval:
        rows_stats.append(dict(
            strategy=k,
            T=T_k,
            CEQ=ceq_k,
            z_CEQ_vs_EW=np.nan,
            pval_CEQ_vs_EW=np.nan,
        ))
        # 1ª (e única) linha: CEQ
        panel_rows.append(dict(
            Estratégia=name,
            **{"1963-2004": f"{ceq_k:.4f}"}
        ))
    else:
        z, p, T = ceq_pvalue_vs_bench(r_bench, r, gamma=gamma)
        rows_stats.append(dict(
            strategy=k,
            T=min(T_k, T),
            CEQ=ceq_k,
            z_CEQ_vs_EW=z,
            pval_CEQ_vs_EW=p,
        ))
        # 1ª linha: CEQ
        panel_rows.append(dict(
            Estratégia=name,
            **{"1963-2004": f"{ceq_k:.4f}"}
        ))
        # 2ª linha: p-valor CEQ vs EW entre parênteses (2 casas)
        panel_rows.append(dict(
            Estratégia="",
            **{"1963-2004": f"({p:.2f})"}
        ))

# -------- DataFrames finais com ordem garantida -------- #

ceq_df = pd.DataFrame(rows_stats)
ceq_df["ord"] = ceq_df["strategy"].map(order_map)
ceq_df = ceq_df.sort_values("ord").drop(columns="ord").reset_index(drop=True)

panel_df = pd.DataFrame(panel_rows, columns=["Estratégia", "1963-2004"])
panel_df["ord"]  = panel_df["Estratégia"].map(name_order_map)
panel_df["ord"]  = panel_df["ord"].ffill()
panel_df["idx0"] = np.arange(len(panel_df))
panel_df = (
    panel_df.sort_values(["ord", "idx0"], kind="mergesort")
            .drop(columns=["ord", "idx0"])
            .reset_index(drop=True)
)

print("Tabela de CEQ (completa):")
print(ceq_df.to_string(index=False))



Tabela de CEQ (completa):
      strategy   T       CEQ  z_CEQ_vs_EW  pval_CEQ_vs_EW
            EW 377  0.003941          NaN             NaN
MV (in sample) 377  0.004662          NaN             NaN
            MV 377  0.004482    -0.497519        0.309411
            BS 377  0.004299    -0.471106        0.318782
            DM 377 -0.008423     1.803211        0.035678
           MIN 377  0.003868     0.125611        0.450020
            VW 377  0.004226    -0.154040        0.438789
            MP 377 -0.002606     1.775119        0.037939
          MV-C 377  0.003033     0.585504        0.279104
          BS-C 377  0.003803     0.107738        0.457102
         MIN-C 377  0.003868     0.125611        0.450020
       G-MIN-C 377  0.003802     0.244283        0.403506
        MV-MIN 377  0.004373    -0.577126        0.281927
        EW-MIN 377  0.003859     0.170471        0.432320


Return Loss

In [28]:
def _get_series(obj, *names):
    """Retorna a primeira Series válida (não-vazia) encontrada em obj para os nomes dados."""
    for nm in names:
        if hasattr(obj, nm):
            s = getattr(obj, nm)
            if s is None:
                continue
            s = pd.Series(s)
            if s.size and s.notna().any():
                return s
    return None

def _rf_as_series(rf_m):
    """Aceita Series ou DataFrame; se DF, tenta coluna 'RF' ou a primeira disponível."""
    if isinstance(rf_m, pd.Series):
        return rf_m.astype(float)
    if isinstance(rf_m, pd.DataFrame):
        if 'RF' in rf_m.columns:
            return rf_m['RF'].astype(float)
        # pega a primeira coluna
        return rf_m.iloc[:, 0].astype(float)
    # último recurso: tentar converter
    return pd.Series(rf_m, dtype=float)

def compute_return_loss(results: dict, rf_m, cost_bps: float = 50.0) -> pd.DataFrame:
    """
    Calcula o return-loss relativo ao 1/N (EW) aplicando custos proporcionais c sobre o turnover.
    results[k] deve conter:
      - rets_net_excess (ou rets_excess): retornos OOS **em excesso** (GROSS)
      - turnover_paper (preferido) ou turnover: série mensal de turnover
    rf_m: série (ou DataFrame com coluna 'RF') do risk-free em decimal/mês.
    """
    c = float(cost_bps) / 10000.0  # ex.: 50 bps -> 0.005
    rf_s = _rf_as_series(rf_m)

    net_excess = {}
    for k, res in results.items():
        # retornos em excesso (GROSS)
        rex = _get_series(res, "rets_net_excess", "rets_excess")
        if rex is None or rex.dropna().empty:
            continue
        rex = rex.dropna().astype(float)

        # risk-free alinhado
        rf = rf_s.reindex(rex.index).astype(float)

        # turnover: preferir turnover_paper
        tau = _get_series(res, "turnover_paper", "turnover")
        if tau is None:
            # fallback: sem turnover disponível -> custo zero
            tau = pd.Series(0.0, index=rex.index)
        tau = tau.reindex(rex.index).fillna(0.0).astype(float)

        # aplica custo proporcional no retorno TOTAL e volta para excesso
        net_factor = (1.0 + rex + rf) * (1.0 - c * tau)
        rex_net = net_factor - 1.0 - rf

        net_excess[k] = rex_net.dropna()

    if "EW" not in net_excess:
        raise KeyError("A chave 'EW' não foi encontrada em results (necessária para o benchmark 1/N).")

    # estatísticas por estratégia (usando retornos líquidos em excesso)
    rows = {}
    for k, s in net_excess.items():
        s = pd.Series(s).dropna()
        if s.empty:
            continue
        mu = float(s.mean())
        sd = float(s.std(ddof=1))
        shp = mu / sd if (sd > 0 and np.isfinite(sd)) else np.nan
        rows[k] = {"mu": mu, "sd": sd, "sharpe": shp}

    stats = pd.DataFrame(rows).T

    # Sharpe do EW
    mu_ew = stats.loc["EW", "mu"]
    sd_ew = stats.loc["EW", "sd"]
    if not (sd_ew > 0 and np.isfinite(sd_ew)):
        raise ValueError("Desvio-padrão do EW inválido para calcular o Sharpe.")
    sr_ew = mu_ew / sd_ew

    # return-loss_k = SR_EW * sd_k - mu_k
    stats["return_loss_vs_EW"] = sr_ew * stats["sd"] - stats["mu"]
    return stats[["mu", "sd", "sharpe", "return_loss_vs_EW"]].sort_values("return_loss_vs_EW")


In [29]:
rl = compute_return_loss(results, rf, cost_bps=50.0)
print(rl.round(6))


                      mu        sd    sharpe  return_loss_vs_EW
MV (in sample)  0.004683  0.016763  0.279382          -0.001040
MV-MIN          0.004288  0.017805  0.240846          -0.000419
BS              0.004232  0.017566  0.240931          -0.000415
EW-MIN          0.003852  0.015936  0.241730          -0.000389
MIN             0.003864  0.016041  0.240870          -0.000378
MIN-C           0.003864  0.016041  0.240870          -0.000378
G-MIN-C         0.003798  0.015933  0.238403          -0.000336
EW              0.003990  0.018359  0.217313           0.000000
MV              0.004375  0.021566  0.202883           0.000311
BS-C            0.003750  0.027658  0.135568           0.002261
MV-C            0.003085  0.033140  0.093102           0.004116
VW              0.005319  0.046748  0.113771           0.004840
MP             -0.000537  0.071884 -0.007468           0.016158
DM             -0.007660  0.141127 -0.054275           0.038328
