# 30/30 Momentum test (monthly)

We test a simple cross-sectional momentum rule:

- Signal at month **t**: **12M momentum excluding current month**  
  `log(P[t-1] / P[t-13])`
- Rank tickers by that signal and select:
  - **Winners** = top 30%  → `selected = 1`
  - **Losers**  = bottom 30% → `selected = -1`
  - Others → `selected = 0`
- Next-month return (for each asset): `log(P[t] / P[t-1])`

Portfolio columns (log space, equal-weight within buckets):

- `winner` = average log return of winners
- `loser_long` = average log return of losers **if held long**
- `wml_spread` = `winner - loser_long`
- `wml_cap05` = `2 * wml_spread` (your “0.5 capital” convention)

Outputs:
- A wide Excel sheet (2 header rows) in `data/analysis/`
- Decade summary table (first/last decades can be partial)


## Step 0 — Locate repo root and ensure imports come from this repo

This avoids accidentally importing an old `site-packages` version while you’re editing code in `src/`.


In [None]:
from pathlib import Path
import sys

def find_repo_root(start: Path | None = None) -> Path:
    """Walk upwards from `start` (or CWD) until we find a repo marker."""
    start = (start or Path.cwd()).resolve()
    markers = ("pyproject.toml", ".git", "setup.cfg", "requirements.txt")
    for p in (start, *start.parents):
        if any((p / m).exists() for m in markers):
            return p
    return start

REPO_ROOT = find_repo_root()

# Ensure the repo's src/ is imported (not an old site-packages install)
SRC = REPO_ROOT / "src"
if str(SRC) not in sys.path:
    sys.path.insert(0, str(SRC))

print("CWD:", Path.cwd())
print("REPO_ROOT:", REPO_ROOT)
print("SRC:", SRC)


## Step 1 — Load bars from SQLite via infra

Uses `SQLiteBarStore.list_bars_multi()` to bulk-load bars. Adjust the `tickers=` list if you want to restrict the universe.


In [None]:
from pathlib import Path
import pandas as pd
import numpy as np

from scanner_bot.infra.db.sqllite_barstore import SQLiteBarStore

DB_PATH = (REPO_ROOT / "data" / "db" / "scanner.db").resolve()

store = SQLiteBarStore(DB_PATH)

# Option A: all tickers in DB (fine if DB isn't huge)
bars = store.list_bars_multi(interval="1d", tickers=None)

# Option B: restrict to a list (uncomment)
# tickers = ["AAPL", "MSFT", "NVDA"]
# bars = store.list_bars_multi(interval="1d", tickers=tickers)

def _px(b):
    # prefer adj_close when present
    return b.adj_close if getattr(b, "adj_close", None) is not None else b.close

df = pd.DataFrame(
    {
        "date": [b.date for b in bars],
        "ticker": [b.ticker for b in bars],
        "px": [_px(b) for b in bars],
    }
).dropna(subset=["px"])

df["date"] = pd.to_datetime(df["date"])
df.head(), df["ticker"].nunique(), df["date"].min(), df["date"].max()





## Step 2 — Resample to month-end prices

We use the last available daily price each month as the month-end close.


In [None]:
px_m = (
    df.set_index("date")
      .groupby("ticker")["px"]
      .resample("M")
      .last()
      .unstack("ticker")
      .sort_index()
)

print("Months:", len(px_m), "| Tickers:", px_m.shape[1])
px_m.tail()


## Step 3 — Parameters / sanity check


In [None]:
import pandas as pd
import numpy as np
from pathlib import Path

TOP_PCT = 0.30
BOT_PCT = 0.30
LOOKBACK_MONTHS = 12      # "last 12 months"
EXCLUDE_CURRENT = True    # use Pt-1 / Pt-13
MIN_TICKERS = 10          # skip months with too few names

# must already exist
assert "px_m" in globals(), "Expected px_m (month-end price matrix) to exist."
px_m = px_m.sort_index()

print("px_m shape:", px_m.shape)
print("date range:", px_m.index.min(), "->", px_m.index.max())
print("example tickers:", list(px_m.columns[:10]))


## Step 4 — Log returns and 12M momentum excluding current month

- Monthly log return: `log(P[t]/P[t-1])`
- 12M momentum (exclude current month): `log(P[t-1]/P[t-13])`


In [None]:
# log return for current month: log(Pt / Pt-1)
logret_1m = np.log(px_m / px_m.shift(1))

# 12M momentum excluding current period: log(Pt-1 / Pt-13)
# (Pt-1 is shift(1), Pt-13 is shift(13))
mom_12m_excl = np.log(px_m.shift(1) / px_m.shift(13))

# optional: for readability in excel
ret_1m_simple = np.exp(logret_1m) - 1
mom_12m_simple = np.exp(mom_12m_excl) - 1

logret_1m.head()


## Step 5 — Winners/Losers selection and portfolio returns (log space)

Selection size per month:
- `k = floor(N * 30%)`


In [None]:
def monthly_select_and_portfolio(
    px_m: pd.DataFrame,
    logret_1m: pd.DataFrame,
    mom_12m_excl: pd.DataFrame,
    top_pct: float = 0.30,
    bot_pct: float = 0.30,
    min_tickers: int = 10,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Returns:
      panel: rows = (date, ticker) with price, returns, momentum, selection
      port : rows = date with winners, losers (short contrib), wml, counts
    """
    panel_rows = []
    port_rows = []

    for dt in px_m.index:
        s = mom_12m_excl.loc[dt].dropna()
        r = logret_1m.loc[dt].dropna()
        p = px_m.loc[dt].dropna()

        common = s.index.intersection(r.index).intersection(p.index)
        n = len(common)
        if n < min_tickers:
            continue

        k = int(np.floor(n * top_pct))
        k2 = int(np.floor(n * bot_pct))
        if k <= 0 or k2 <= 0:
            continue

        s = s.loc[common]
        r = r.loc[common]
        p = p.loc[common]

        ranked = s.sort_values(ascending=False)
        winners = ranked.index[:k]
        losers  = ranked.index[-k2:]

        # selection vector: 1 / 0 / -1
        sel = pd.Series(0, index=common, dtype=int)
        sel.loc[winners] = 1
        sel.loc[losers] = -1

        # portfolio log returns (equal weight)
        winners_ret = float(r.loc[winners].mean())          # log return of winners (long)
        losers_long = float(r.loc[losers].mean())           # log return of losers if held long

        wml_spread = winners_ret - losers_long              # spread in log space (your current W-L)
        wml_cap05  = 2.0 * wml_spread                       # “return on 0.5 capital” (your convention)

        port_rows.append(
            {
                "date": dt,
                "n_tickers": n,
                "k_winners": k,
                "k_losers": k2,
                "winners_logret": winners_ret,
                "losers_long_logret": losers_long,
                "wml_spread_logret": wml_spread,
                "wml_cap05_logret": wml_cap05,
            }
        )

        # per-ticker panel rows for excel
        out = pd.DataFrame(
            {
                "date": dt,
                "ticker": common,
                "price": p.values,
                "logret_1m": r.values,
                "mom_12m_excl_log": s.values,
                "selected": sel.values,  # 1 / 0 / -1
            }
        )
        panel_rows.append(out)

    panel = pd.concat(panel_rows, ignore_index=True) if panel_rows else pd.DataFrame()
    port = pd.DataFrame(port_rows).set_index("date").sort_index() if port_rows else pd.DataFrame()
    return panel, port

panel, port = monthly_select_and_portfolio(
    px_m=px_m,
    logret_1m=logret_1m,
    mom_12m_excl=mom_12m_excl,
    top_pct=TOP_PCT,
    bot_pct=BOT_PCT,
    min_tickers=MIN_TICKERS,
)

print("panel rows:", len(panel), "| portfolio months:", len(port))
port.head()


## Step 6 — Optional convenience columns + selection sanity checks


In [None]:
# Optional convenience columns (simple returns) + sanity checks

if not panel.empty:
    panel["ret_1m"] = np.exp(panel["logret_1m"]) - 1
    panel["mom_12m_excl"] = np.exp(panel["mom_12m_excl_log"]) - 1

# Sanity checks per month: number of selected winners/losers equals floor(N * 30%)
if not panel.empty and not port.empty:
    sel_counts = (
        panel.groupby("date")["selected"]
             .agg(
                 winners=lambda x: int((x == 1).sum()),
                 losers=lambda x: int((x == -1).sum()),
             )
    )
    chk = port.join(sel_counts, how="left")
    chk["ok_winners"] = chk["winners"] == chk["k_winners"]
    chk["ok_losers"] = chk["losers"] == chk["k_losers"]
    print(chk[["k_winners","winners","ok_winners","k_losers","losers","ok_losers"]].tail(12))

# Optional: convert leg log-returns to simple returns (these are true leg returns)
if not port.empty:
    port["winners_ret"] = np.exp(port["winners_logret"]) - 1
    port["losers_long_ret"] = np.exp(port["losers_long_logret"]) - 1

port.tail()


## Step 7 — Build the wide output table for Excel

One row per month, 2 header rows:

Top header: `DATE | <TICKER> ... | PORTFOLIO`  
Second header: `price | logret_1m | logret_12m | selected` (per ticker) and `winner | loser_long | wml_cap05` (portfolio).


In [None]:
# --- Wide output table with 2 header rows: (TICKER, metric) and (PORTFOLIO, metric) ---

# Per-ticker blocks (MultiIndex columns: (ticker, metric))
price_w   = panel.pivot(index="date", columns="ticker", values="price")
lr1_w     = panel.pivot(index="date", columns="ticker", values="logret_1m")
lr12_w    = panel.pivot(index="date", columns="ticker", values="mom_12m_excl_log")  # log(P[t-1]/P[t-13])
sel_w     = panel.pivot(index="date", columns="ticker", values="selected")

# Build as (metric, ticker) then swap to (ticker, metric)
tickers_block = pd.concat(
    {
        "price": price_w,
        "logret_1m": lr1_w,
        "logret_12m": lr12_w,
        "selected": sel_w,
    },
    axis=1
)
tickers_block = tickers_block.swaplevel(0, 1, axis=1).sort_index(axis=1, level=0)

# Portfolio block (PORTFOLIO, metric)
# - loser_long is the losers' return if held long
# - wml_cap05 is 2 * (winner - loser_long) (your 0.5-capital convention)
port_cols = pd.DataFrame(
    {
        "winner": port["winners_logret"],
        "loser_long": port["losers_long_logret"],
        "wml_cap05": port["wml_cap05_logret"],
    },
    index=port.index
)
port_cols.columns = pd.MultiIndex.from_product([["PORTFOLIO"], port_cols.columns])

# Combine
out = tickers_block.join(port_cols, how="inner").sort_index()

# Add DATE as first column (kept for display; Excel export uses DATE as index due to pandas limitation)
out = out.reset_index()
out.columns = pd.MultiIndex.from_tuples([("DATE", "")] + list(out.columns[1:]))

out.head()


## Step 8 — Prepare Excel output path

We keep `DATE` as index on export (pandas limitation with MultiIndex columns and `index=False`).


In [None]:
analysis_dir = (REPO_ROOT / "data" / "analysis")
analysis_dir.mkdir(parents=True, exist_ok=True)
out_path = analysis_dir / "momentum_30_30_monthly_wide.xlsx"

# pandas can't write MultiIndex columns with index=False -> use DATE as index
if ("DATE", "") in out.columns:
    out_xl = out.set_index(("DATE", ""))
else:
    out_xl = out.copy()

out_xl.index.name = "DATE"
out_path


## Step 9 — Decade performance

A decade starts on years ending with **0** (e.g., 1990–1999, 2000–2009).  
First/last decades can be partial depending on your data range.


In [None]:
import numpy as np
import pandas as pd

def _max_dd_from_logrets(r_log: pd.Series) -> float:
    r_log = r_log.dropna()
    if r_log.empty:
        return np.nan
    equity = np.exp(r_log.cumsum())  # log-additive -> equity curve
    dd = equity / equity.cummax() - 1.0
    return float(dd.min())

def _stats_from_logrets(r_log: pd.Series) -> dict:
    r_log = r_log.dropna()
    if r_log.empty:
        return {
            "months": 0,
            "total_log": np.nan,
            "total_return": np.nan,
            "cagr": np.nan,
            "vol": np.nan,
            "sharpe": np.nan,
            "maxdd": np.nan,
        }

    months = len(r_log)
    total_log = float(r_log.sum())
    total_return = float(np.exp(total_log) - 1.0)

    years = months / 12.0
    cagr = float(np.exp(total_log / years) - 1.0) if years > 0 else np.nan

    vol = float(r_log.std(ddof=1) * np.sqrt(12)) if months > 1 else np.nan
    sharpe = float((r_log.mean() * 12) / (r_log.std(ddof=1) * np.sqrt(12))) if months > 1 else np.nan

    maxdd = _max_dd_from_logrets(r_log)

    return {
        "months": months,
        "total_log": total_log,
        "total_return": total_return,
        "cagr": cagr,
        "vol": vol,
        "sharpe": sharpe,
        "maxdd": maxdd,
    }

def decade_year(dt: pd.Timestamp) -> int:
    # decades start at YYYY-01-01 where YYYY ends with 0
    return (dt.year // 10) * 10

# Ensure datetime index
port2 = port.copy()
port2.index = pd.to_datetime(port2.index)

# Decade stats based on your log columns
cols = ["winners_logret", "losers_long_logret", "wml_spread_logret", "wml_cap05_logret"]

dec_rows = []
for dy, g in port2[cols].groupby(port2.index.map(decade_year)):
    start = g.index.min().date()
    end = g.index.max().date()

    row = {"decade_start_year": dy, "start": start, "end": end}
    for c in cols:
        s = _stats_from_logrets(g[c])
        prefix = c.replace("_logret", "")
        row.update({f"{prefix}_{k}": v for k, v in s.items()})
    dec_rows.append(row)

decades = pd.DataFrame(dec_rows).sort_values("decade_start_year").set_index("decade_start_year")
decades


## Step 10 — Write workbook (monthly + decades)

This overwrites the workbook at the same path, ensuring both sheets are included.


In [None]:
# Write both the wide monthly sheet and the decade summary into one workbook.
analysis_dir = (REPO_ROOT / "data" / "analysis")
analysis_dir.mkdir(parents=True, exist_ok=True)
out_path = analysis_dir / "momentum_30_30_monthly_wide.xlsx"

assert "out_xl" in globals(), "Expected out_xl (wide monthly table) to exist."
assert "decades" in globals(), "Expected decades table to exist."

with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
    out_xl.to_excel(writer, sheet_name="monthly", index=True)
    decades.to_excel(writer, sheet_name="decades", index=True)

    ws = writer.sheets["monthly"]
    ws.freeze_panes = "B3"

print("Saved:", out_path)
