# Multi-Market Quantitative Research Pipeline (v2)

**Institutional-grade**, multi-market (US + KOSPI + KOSDAQ), cost-aware, overfitting-controlled
framework for systematic strategy discovery.

## Key Enhancements over v1
- **Multi-market**: US large-cap, KOSPI, KOSDAQ with FX-adjusted returns
- **Dynamic universe**: point-in-time universe construction per market
- **Multi-horizon**: 5-day, 21-day, 63-day forward returns
- **Transaction costs**: cost-adjusted forward returns (commission + slippage)
- **Turnover control**: strategy turnover estimation and penalty
- **Regime analysis**: bull/bear regime stability testing
- **Multiple testing correction**: FDR (Benjamini-Hochberg)
- **Strengthened overfitting control**: higher thresholds, bootstrap min-samples guard
- **Cross-market portfolio**: diversification benefit analysis

## Pipeline Steps
1. Data Layer — Per-market OHLCV download (yfinance / pykrx)
2. Feature Engine — Momentum, volatility, regime + multi-horizon cost-adjusted returns
3. Candidate Generator — Decile, decision tree, logistic rank (per market, per horizon)
4. Edge Evaluation — Net-of-cost metrics per candidate
5. Walk-Forward Validation — Rolling train/test with embargo & turnover estimation
6. Overfitting Control — Stability, bootstrap CI, FDR correction
7. Turnover & Cost Filtering — Reject high-turnover strategies
8. Strategy Scoring — Turnover-penalised composite score + correlation clustering
9. Regime Analysis — Bull/bear performance consistency
10. Portfolio Combination — Per-market & cross-market equal-weight portfolios

**Resume-safe:** every step checkpoints to Google Drive.

In [None]:
!pip install -q yfinance pandas numpy scikit-learn scipy matplotlib pyarrow joblib pykrx finance-datareader statsmodels

## 0. Configuration

In [None]:
import os
import json
import time
import gc
import logging
import warnings
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Tuple

warnings.filterwarnings('ignore')


@dataclass
class PipelineConfig:
    """Central configuration — 16 parameter groups."""

    # --- 1. Markets ---
    markets: List[str] = field(default_factory=lambda: ["US", "KOSPI", "KOSDAQ"])
    base_currency: str = "USD"
    apply_fx_conversion: bool = True

    us_tickers: List[str] = field(default_factory=lambda: [
        "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA", "BRK-B",
        "JPM", "JNJ", "V", "PG", "UNH", "HD", "MA", "DIS", "BAC", "NFLX",
        "ADBE", "CRM", "XOM", "VZ", "KO", "INTC", "PEP", "ABT", "CSCO",
        "COST", "MRK", "WMT", "AVGO", "ACN", "CVX", "NKE", "LLY", "MCD",
        "QCOM", "UPS", "BMY", "LIN", "NEE", "ORCL", "RTX", "HON", "TXN",
        "AMD", "PYPL", "CMCSA", "TMO", "DHR",
    ])

    kospi_tickers: List[str] = field(default_factory=lambda: [
        "005930", "000660", "373220", "207940", "005380", "000270",
        "035420", "006400", "105560", "051910", "005490", "034730",
        "068270", "055550", "035720", "086790", "012330", "003550",
        "028260", "033780", "000810", "032830", "017670", "010950",
        "316140", "066570", "009150", "018260", "011200", "034020",
    ])

    kosdaq_tickers: List[str] = field(default_factory=lambda: [
        "247540", "086520", "028300", "196170", "403870", "035900",
        "263750", "293490", "053800", "112040", "041510", "145020",
        "257720", "036930", "058470", "950160", "383310", "322000",
        "214150", "108320",
    ])

    market_index: Dict[str, str] = field(default_factory=lambda: {
        "US": "SPY", "KOSPI": "^KS11", "KOSDAQ": "^KQ11",
    })

    # --- 2. Dynamic Universe ---
    use_dynamic_universe: bool = True
    rebuild_universe_each_fold: bool = False
    min_market_cap: float = 0
    min_avg_volume: float = 100000
    min_listing_days: int = 252
    include_delisted: bool = False
    max_universe_size: int = 50

    # --- 3. Features ---
    momentum_windows: List[int] = field(default_factory=lambda: [5, 20, 60, 120])
    volatility_windows: List[int] = field(default_factory=lambda: [20, 60])
    regime_window: int = 60
    adaptive_quantiles: bool = True
    min_bin_size: int = 20

    # --- 4. Forward Returns ---
    forward_days_list: List[int] = field(default_factory=lambda: [5, 21, 63])
    avoid_overlapping_labels: bool = True

    # --- 5. Candidates ---
    max_candidates_total: int = 3000
    max_candidates_per_feature_pair: int = 200
    min_sample_size: int = 300

    # --- 6. Trees ---
    n_trees: int = 20
    tree_feature_subsample: float = 0.5
    tree_max_depth: int = 2
    tree_min_samples_leaf: int = 500

    # --- 7. Walk-Forward ---
    wf_train_years: int = 3
    wf_test_months: int = 12
    wf_step_months: int = 6
    wf_embargo_days: int = 5
    wf_min_folds: int = 4

    # --- 8. Overfitting ---
    min_stability: float = 0.5
    min_sharpe: float = 0.5
    min_win_rate: float = 0.52
    baseline_winrate_adjusted: bool = True
    compute_lift_against_market: bool = True

    # --- 9. Bootstrap ---
    bootstrap_n: int = 1000
    bootstrap_ci: float = 0.95
    bootstrap_min_samples: int = 200

    # --- 10. Transaction Costs ---
    transaction_cost_bps: float = 5.0
    slippage_bps: float = 2.0

    # --- 11. Turnover ---
    penalty_turnover: float = 0.1
    max_turnover: float = 3.0
    max_position_pct_of_adv: float = 0.05

    # --- 12. Correlation Filter ---
    max_strategy_correlation: float = 0.85
    cluster_strategies: bool = True

    # --- 13. Portfolio ---
    portfolio_max_strategies: int = 10
    portfolio_weight_method: str = "equal"

    # --- 14. Regime ---
    evaluate_by_regime: bool = True
    regime_split_method: str = "market_momentum"
    min_regime_performance_ratio: float = 0.7

    # --- 15. Multiple Testing ---
    apply_multiple_testing_correction: bool = True
    mtc_method: str = "fdr"

    # --- 16. Memory ---
    use_float32: bool = True
    max_ram_gb: float = 28
    batch_feature_size: int = 5
    gc_every_n_candidates: int = 100

    # --- Scoring weights ---
    w_stability: float = 0.30
    w_sharpe: float = 0.30
    w_lift: float = 0.20
    w_sample: float = 0.20

    # --- Paths ---
    drive_root: str = "/content/drive/MyDrive/quant_pipeline_v2"
    data_period: str = "10y"
    seed: int = 42
    logistic_top_pct: float = 0.20

    # --- Per-market directory helpers ---
    def data_dir(self, market: str) -> str:
        return os.path.join(self.drive_root, "data", market)

    def features_dir(self, market: str) -> str:
        return os.path.join(self.drive_root, "features", market)

    def candidates_dir(self, market: str) -> str:
        return os.path.join(self.drive_root, "candidates", market)

    def evaluation_dir(self, market: str) -> str:
        return os.path.join(self.drive_root, "evaluation", market)

    def walkforward_dir(self, market: str) -> str:
        return os.path.join(self.drive_root, "walkforward", market)

    @property
    def global_eval_dir(self) -> str:
        return os.path.join(self.drive_root, "evaluation", "_global")

    @property
    def logs_dir(self) -> str:
        return os.path.join(self.drive_root, "logs")

    @property
    def state_path(self) -> str:
        return os.path.join(self.drive_root, "state.json")

    @property
    def total_cost_bps(self) -> float:
        return self.transaction_cost_bps + self.slippage_bps


CFG = PipelineConfig()
print("Config created.  Drive root:", CFG.drive_root)
print("Markets:", CFG.markets)
print("Forward horizons:", CFG.forward_days_list)
print("Cost (round-trip bps): %.1f" % (CFG.total_cost_bps * 2))

## 1. Persistence & Resume System

In [None]:
DRIVE_MOUNTED = False
try:
    from google.colab import drive
    drive.mount('/content/drive', timeout_ms=60000)
    DRIVE_MOUNTED = True
    print("Google Drive mounted.")
except Exception as e:
    print("Drive mount failed: %s" % str(e)[:80])
    print("Using local storage (data lost on disconnect).")
    CFG.drive_root = "/content/quant_pipeline_v2"

# Create directories for all markets
for mkt in CFG.markets:
    for d in [CFG.data_dir(mkt), CFG.features_dir(mkt), CFG.candidates_dir(mkt),
              CFG.evaluation_dir(mkt), CFG.walkforward_dir(mkt)]:
        os.makedirs(d, exist_ok=True)
os.makedirs(CFG.global_eval_dir, exist_ok=True)
os.makedirs(CFG.logs_dir, exist_ok=True)
print("Directories ready.")

In [None]:
class ProgressTracker:
    """JSON-based checkpoint system for resumable execution."""

    def __init__(self, state_path: str):
        self.state_path = state_path
        self.state = self._load()

    def _load(self) -> dict:
        if os.path.exists(self.state_path):
            with open(self.state_path, 'r') as f:
                return json.load(f)
        return {"completed_steps": {}, "metadata": {}}

    def _save(self):
        with open(self.state_path, 'w') as f:
            json.dump(self.state, f, indent=2, default=str)

    def is_completed(self, step_name: str) -> bool:
        return self.state["completed_steps"].get(step_name, False)

    def mark_completed(self, step_name: str, metadata: dict = None):
        self.state["completed_steps"][step_name] = True
        if metadata:
            self.state["metadata"][step_name] = metadata
        self._save()
        print("  [CHECKPOINT] %s completed." % step_name)

    def get_metadata(self, step_name: str) -> dict:
        return self.state["metadata"].get(step_name, {})

    def reset(self, step_name: str = None):
        if step_name:
            self.state["completed_steps"].pop(step_name, None)
            self.state["metadata"].pop(step_name, None)
        else:
            self.state = {"completed_steps": {}, "metadata": {}}
        self._save()

    def summary(self):
        completed = [k for k, v in self.state["completed_steps"].items() if v]
        print("=== Progress Summary ===")
        if completed:
            for s in completed:
                print("  [DONE] %s" % s)
        else:
            print("  No steps completed yet.")


tracker = ProgressTracker(CFG.state_path)
tracker.summary()

# Uncomment to force re-run from scratch:
# tracker.reset()

In [None]:
import logging

log_file = os.path.join(CFG.logs_dir, "pipeline.log")
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%H:%M:%S",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler(log_file, mode='a'),
    ]
)
logger = logging.getLogger("pipeline")
logger.info("Pipeline v2 started. Log: %s" % log_file)

import numpy as np
import pandas as pd
np.random.seed(CFG.seed)

## 2. Market Configuration Registry

Defines per-market data sources, index tickers, currencies, and download helpers.
- **US**: yfinance, SPY, USD
- **KOSPI**: pykrx (primary) / FinanceDataReader (fallback), ^KS11, KRW
- **KOSDAQ**: pykrx / FinanceDataReader, ^KQ11, KRW

In [None]:
MARKET_REGISTRY = {
    "US": {
        "data_source": "yfinance",
        "index_ticker": "SPY",
        "currency": "USD",
        "pykrx_market": None,
    },
    "KOSPI": {
        "data_source": "pykrx",
        "index_ticker": "^KS11",
        "currency": "KRW",
        "pykrx_market": "KOSPI",
    },
    "KOSDAQ": {
        "data_source": "pykrx",
        "index_ticker": "^KQ11",
        "currency": "KRW",
        "pykrx_market": "KOSDAQ",
    },
}


def get_static_tickers(market):
    """Return static fallback ticker list for a market."""
    if market == "US":
        return list(CFG.us_tickers)
    elif market == "KOSPI":
        return list(CFG.kospi_tickers)
    elif market == "KOSDAQ":
        return list(CFG.kosdaq_tickers)
    return []


def download_fx_rates(period="10y"):
    """Download USD/KRW exchange rate via yfinance."""
    import yfinance as yf
    try:
        fx = yf.download("USDKRW=X", period=period, progress=False, auto_adjust=True)
        if isinstance(fx.columns, pd.MultiIndex):
            fx.columns = fx.columns.get_level_values(0)
        fx = fx[["Close"]].copy()
        fx.columns = ["usdkrw"]
        fx.index = pd.to_datetime(fx.index).tz_localize(None)
        return fx
    except Exception as e:
        logger.warning("FX download failed: %s" % str(e)[:60])
        return pd.DataFrame(columns=["usdkrw"])


def convert_krw_to_usd(df_krw, fx_df):
    """Convert KRW-denominated OHLCV to USD using FX rates."""
    if fx_df.empty:
        logger.warning("No FX data — returning KRW prices.")
        return df_krw
    fx_aligned = fx_df["usdkrw"].reindex(df_krw.index.get_level_values(0), method="ffill")
    fx_aligned.index = df_krw.index
    for col in ["open", "high", "low", "close"]:
        if col in df_krw.columns:
            df_krw[col] = df_krw[col] / fx_aligned
    return df_krw


print("Market registry ready. Markets:", list(MARKET_REGISTRY.keys()))

## 3. Dynamic Universe Builder

Constructs a point-in-time stock universe per market. For Korean markets uses
`pykrx.stock.get_market_ticker_list` with market-cap / volume filters. Falls
back to a static ticker list when the API is unavailable.

In [None]:
def build_universe(market, ref_date=None, config=None):
    """Build a point-in-time ticker universe for *market*.

    Returns a list of ticker strings.
    """
    if config is None:
        config = CFG
    registry = MARKET_REGISTRY[market]

    # --- Korean markets: try pykrx dynamic universe ---
    if registry["data_source"] == "pykrx" and config.use_dynamic_universe:
        try:
            from pykrx import stock
            if ref_date is None:
                import datetime
                ref_date = datetime.datetime.now()
            date_str = pd.Timestamp(ref_date).strftime("%Y%m%d")

            tickers = stock.get_market_ticker_list(date_str, market=registry["pykrx_market"])
            if not tickers:
                raise ValueError("Empty ticker list from pykrx")

            # Apply market-cap / volume filter
            try:
                cap_df = stock.get_market_cap_by_ticker(date_str, market=registry["pykrx_market"])
                if not cap_df.empty:
                    if config.min_avg_volume > 0:
                        cap_df = cap_df[cap_df.iloc[:, -1] >= config.min_avg_volume]
                    cap_df = cap_df.sort_values(cap_df.columns[0], ascending=False)
                    tickers = cap_df.head(config.max_universe_size).index.tolist()
            except Exception:
                tickers = tickers[:config.max_universe_size]

            logger.info("Dynamic universe for %s: %d tickers (ref %s)" % (
                market, len(tickers), date_str))
            return tickers
        except Exception as e:
            logger.warning("Dynamic universe failed for %s: %s — using static list" % (
                market, str(e)[:60]))

    # --- Fallback: static list ---
    static = get_static_tickers(market)
    logger.info("Static universe for %s: %d tickers" % (market, len(static)))
    return static


print("Universe builder ready.")

## 4. Data Layer

Downloads OHLCV data per market:
- **US**: yfinance
- **KOSPI / KOSDAQ**: pykrx (primary), FinanceDataReader (fallback)

Korean prices are optionally converted to USD. Market indices and FX rates
are downloaded separately.

**Safeguards:** No lookahead bias — forward returns computed later in the
Feature Engine.

In [None]:
import yfinance as yf

# Storage for loaded data (populated either from cache or download)
ohlcv_data = {}        # market -> DataFrame (MultiIndex: date, ticker)
market_indices = {}    # market -> DataFrame (index: date, col: close)
fx_rates = pd.DataFrame()

# Download FX rates once (needed for KR markets)
fx_path = os.path.join(CFG.drive_root, "data", "fx_usdkrw.parquet")
if os.path.exists(fx_path):
    fx_rates = pd.read_parquet(fx_path)
    print("FX rates loaded: %d rows" % len(fx_rates))
else:
    if any(MARKET_REGISTRY[m]["currency"] == "KRW" for m in CFG.markets):
        fx_rates = download_fx_rates(CFG.data_period)
        if not fx_rates.empty:
            fx_rates.to_parquet(fx_path)
            print("FX rates downloaded: %d rows" % len(fx_rates))

# --- Per-market download ---
for market in CFG.markets:
    STEP = "data_load_%s" % market
    processed_path = os.path.join(CFG.data_dir(market), "processed.parquet")
    idx_path = os.path.join(CFG.data_dir(market), "market_index.parquet")
    registry = MARKET_REGISTRY[market]

    if tracker.is_completed(STEP):
        logger.info("[SKIP] %s — loading cache" % STEP)
        ohlcv_data[market] = pd.read_parquet(processed_path)
        market_indices[market] = pd.read_parquet(idx_path)
        print("%s loaded: %s" % (market, str(ohlcv_data[market].shape)))
        continue

    logger.info("[RUN] %s" % STEP)
    t0 = time.time()
    tickers = build_universe(market)
    all_dfs = []
    failed = []

    if registry["data_source"] == "yfinance":
        for i, ticker in enumerate(tickers):
            if (i + 1) % 10 == 0 or i == 0:
                print("  [%d/%d] %s" % (i + 1, len(tickers), ticker))
            try:
                df = yf.download(ticker, period=CFG.data_period, progress=False, auto_adjust=True)
                if df.empty or len(df) < 252:
                    failed.append(ticker)
                    continue
                if isinstance(df.columns, pd.MultiIndex):
                    df.columns = df.columns.get_level_values(0)
                df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
                df.columns = ["open", "high", "low", "close", "volume"]
                df.index = pd.to_datetime(df.index).tz_localize(None)
                df["ticker"] = ticker
                all_dfs.append(df)
            except Exception as e:
                failed.append(ticker)

    elif registry["data_source"] == "pykrx":
        import datetime as _dt
        end_str = _dt.datetime.now().strftime("%Y%m%d")
        start_str = (_dt.datetime.now() - _dt.timedelta(days=365 * 10)).strftime("%Y%m%d")

        for i, ticker in enumerate(tickers):
            if (i + 1) % 10 == 0 or i == 0:
                print("  [%d/%d] %s" % (i + 1, len(tickers), ticker))
            try:
                from pykrx import stock as pykrx_stock
                df = pykrx_stock.get_market_ohlcv_by_date(start_str, end_str, ticker)
                if df.empty or len(df) < 252:
                    raise ValueError("Insufficient data")
                rename_map = {}
                for c in df.columns:
                    cl = c.strip()
                    if cl in ("시가", "Open"):
                        rename_map[c] = "open"
                    elif cl in ("고가", "High"):
                        rename_map[c] = "high"
                    elif cl in ("저가", "Low"):
                        rename_map[c] = "low"
                    elif cl in ("종가", "Close"):
                        rename_map[c] = "close"
                    elif cl in ("거래량", "Volume"):
                        rename_map[c] = "volume"
                df = df.rename(columns=rename_map)
                df = df[["open", "high", "low", "close", "volume"]].copy()
                df.index = pd.to_datetime(df.index).tz_localize(None)
                df["ticker"] = ticker
                all_dfs.append(df)
            except Exception as e1:
                # Fallback: FinanceDataReader
                try:
                    import FinanceDataReader as fdr
                    df2 = fdr.DataReader(ticker, start_str[:4] + "-" + start_str[4:6] + "-" + start_str[6:])
                    if df2.empty or len(df2) < 252:
                        failed.append(ticker)
                        continue
                    df2 = df2.rename(columns={
                        "Open": "open", "High": "high", "Low": "low",
                        "Close": "close", "Volume": "volume",
                    })
                    df2 = df2[["open", "high", "low", "close", "volume"]].copy()
                    df2.index = pd.to_datetime(df2.index).tz_localize(None)
                    df2["ticker"] = ticker
                    all_dfs.append(df2)
                except Exception:
                    failed.append(ticker)

    if not all_dfs:
        logger.warning("No data for market %s — skipping." % market)
        continue

    panel = pd.concat(all_dfs)
    panel = panel.set_index([panel.index, "ticker"])
    panel.index.names = ["date", "ticker"]
    panel = panel.sort_index()

    # FX conversion for KRW markets
    if registry["currency"] == "KRW" and CFG.apply_fx_conversion and not fx_rates.empty:
        panel = convert_krw_to_usd(panel, fx_rates)
        logger.info("FX conversion applied for %s" % market)

    # float32 for memory
    if CFG.use_float32:
        for col in ["open", "high", "low", "close"]:
            panel[col] = panel[col].astype(np.float32)
        panel["volume"] = panel["volume"].astype(np.float64)

    panel.to_parquet(processed_path)
    ohlcv_data[market] = panel

    # Market index
    idx_ticker = registry["index_ticker"]
    try:
        idx_df = yf.download(idx_ticker, period=CFG.data_period, progress=False, auto_adjust=True)
        if isinstance(idx_df.columns, pd.MultiIndex):
            idx_df.columns = idx_df.columns.get_level_values(0)
        idx_df = idx_df[["Close"]].copy()
        idx_df.columns = ["close"]
        idx_df.index = pd.to_datetime(idx_df.index).tz_localize(None)
        if registry["currency"] == "KRW" and CFG.apply_fx_conversion and not fx_rates.empty:
            fx_al = fx_rates["usdkrw"].reindex(idx_df.index, method="ffill")
            idx_df["close"] = idx_df["close"] / fx_al
    except Exception as e:
        logger.warning("Index download failed for %s: %s" % (market, str(e)[:60]))
        idx_df = pd.DataFrame(columns=["close"])
    idx_df.to_parquet(idx_path)
    market_indices[market] = idx_df

    elapsed = time.time() - t0
    meta = {"n_tickers": len(all_dfs), "failed": failed, "rows": len(panel), "time_sec": elapsed}
    tracker.mark_completed(STEP, meta)
    print("%s downloaded: %d/%d tickers in %.0fs" % (market, len(all_dfs), len(tickers), elapsed))
    if failed:
        print("  Failed: %s" % failed[:10])
    gc.collect()

# Summary
for m in CFG.markets:
    if m in ohlcv_data:
        p = ohlcv_data[m]
        tks = p.index.get_level_values(1).unique()
        dts = p.index.get_level_values(0).unique()
        print("%s — %s | tickers %d | %s to %s" % (
            m, str(p.shape), len(tks), dts.min().date(), dts.max().date()))

## 5. Feature Engine

Generates momentum, volatility, and regime features per market.

**Enhancements:**
- Multi-horizon forward returns: 5d, 21d, 63d
- Cost-adjusted forward returns: `net_return = raw_return - 2 * total_cost_bps / 10000`
- Adaptive quantile binning (quintiles if universe < 100, else deciles)
- float32 downcasting for memory efficiency

In [None]:
def compute_momentum_features(close: pd.Series, windows: list) -> pd.DataFrame:
    """Momentum (return) features for each window."""
    feats = {}
    for w in windows:
        feats["mom_%dd" % w] = close.pct_change(w)
    return pd.DataFrame(feats, index=close.index)


def compute_volatility_features(close: pd.Series, windows: list) -> pd.DataFrame:
    """Rolling volatility features."""
    daily_ret = close.pct_change()
    feats = {}
    for w in windows:
        feats["vol_%dd" % w] = daily_ret.rolling(w).std()
    if len(windows) >= 2:
        short_w, long_w = windows[0], windows[-1]
        feats["vol_change"] = (
            daily_ret.rolling(short_w).std()
            / daily_ret.rolling(long_w).std().replace(0, np.nan) - 1.0
        )
    return pd.DataFrame(feats, index=close.index)


def compute_regime_features(market_close: pd.Series, regime_window: int) -> pd.DataFrame:
    """Market-level regime features."""
    market_ret = market_close.pct_change()
    feats = {}
    feats["market_mom_%dd" % regime_window] = market_close.pct_change(regime_window)
    feats["market_vol_%dd" % regime_window] = market_ret.rolling(regime_window).std()
    mom = feats["market_mom_%dd" % regime_window]
    vol = feats["market_vol_%dd" % regime_window]
    vol_median = vol.rolling(252, min_periods=60).median()
    feats["regime_bull"] = ((mom > 0) & (vol < vol_median)).astype(float)
    return pd.DataFrame(feats, index=market_close.index)


def adaptive_n_bins(universe_size, config):
    """Return number of bins based on universe size."""
    if not config.adaptive_quantiles:
        return 10
    if universe_size < 5 * config.min_bin_size:
        return 5
    return 10


def to_cross_sectional_deciles(feature_series, date_level, n_bins=10):
    """Convert feature to cross-sectional quantile ranks (0 to n_bins-1)."""
    def rank_date(group):
        valid = group.dropna()
        if len(valid) < n_bins:
            return pd.Series(np.nan, index=group.index)
        ranks = valid.rank(method='first')
        deciles = pd.cut(ranks, bins=n_bins, labels=False)
        return deciles.reindex(group.index)
    return feature_series.groupby(level=date_level).transform(rank_date)


print("Feature functions defined.")

In [None]:
feature_panels = {}   # market -> DataFrame

for market in CFG.markets:
    if market not in ohlcv_data:
        continue

    STEP = "features_%s" % market
    fpath = os.path.join(CFG.features_dir(market), "all_features.parquet")

    if tracker.is_completed(STEP):
        logger.info("[SKIP] %s — loading" % STEP)
        feature_panels[market] = pd.read_parquet(fpath)
        print("%s features loaded: %s" % (market, str(feature_panels[market].shape)))
        continue

    logger.info("[RUN] %s" % STEP)
    t0 = time.time()
    panel = ohlcv_data[market]
    valid_tickers = panel.index.get_level_values(1).unique().tolist()

    # Universe size for adaptive binning
    n_bins = adaptive_n_bins(len(valid_tickers), CFG)
    logger.info("%s: %d tickers, using %d bins" % (market, len(valid_tickers), n_bins))

    # Market index for regime features
    mkt_close = market_indices.get(market, pd.DataFrame()).get("close", pd.Series(dtype=float))
    regime_feats = compute_regime_features(mkt_close, CFG.regime_window) if len(mkt_close) > 0 else pd.DataFrame()
    market_ret_20d = mkt_close.pct_change(20) if len(mkt_close) > 0 else pd.Series(dtype=float)

    all_features = []
    for ticker in valid_tickers:
        try:
            tdata = panel.loc[(slice(None), ticker), :].droplevel(1)
            close = tdata["close"]

            mom = compute_momentum_features(close, CFG.momentum_windows)

            # Market-relative return
            if len(market_ret_20d) > 0:
                stock_20d = close.pct_change(20)
                mkt_al = market_ret_20d.reindex(close.index, method='ffill')
                mom["market_relative_20d"] = stock_20d - mkt_al

            vol = compute_volatility_features(close, CFG.volatility_windows)

            reg = regime_feats.reindex(close.index, method='ffill') if len(regime_feats) > 0 else pd.DataFrame(index=close.index)

            combined = pd.concat([mom, vol, reg], axis=1)

            # Multi-horizon cost-adjusted forward returns
            for fwd_days in CFG.forward_days_list:
                raw_fwd = close.pct_change(fwd_days).shift(-fwd_days)
                net_fwd = raw_fwd - 2 * CFG.total_cost_bps / 10000
                combined["fwd_return_%dd" % fwd_days] = net_fwd

            combined["ticker"] = ticker
            combined.index.name = "date"
            all_features.append(combined)
        except Exception as e:
            logger.warning("Feature error %s/%s: %s" % (market, ticker, str(e)[:60]))

    if not all_features:
        logger.warning("No features for %s" % market)
        continue

    fp = pd.concat(all_features)
    fp = fp.reset_index().set_index(["date", "ticker"]).sort_index()

    fwd_cols = [c for c in fp.columns if c.startswith("fwd_return_")]
    feat_cols = [c for c in fp.columns if c not in fwd_cols]
    fp = fp.dropna(subset=feat_cols, how='all')

    # float32
    if CFG.use_float32:
        for c in fp.select_dtypes(include=['float64']).columns:
            fp[c] = fp[c].astype(np.float32)

    # Cross-sectional deciles
    logger.info("Computing cross-sectional deciles for %s..." % market)
    for col in feat_cols:
        fp[col + "_decile"] = to_cross_sectional_deciles(fp[col], "date", n_bins)

    fp.to_parquet(fpath)
    feature_panels[market] = fp

    elapsed = time.time() - t0
    tracker.mark_completed(STEP, {
        "n_features": len(feat_cols), "n_rows": len(fp),
        "n_bins": n_bins, "time_sec": elapsed,
    })
    print("%s features saved (%.0fs): %s" % (market, elapsed, str(fp.shape)))
    gc.collect()

# Summary
for m, fp in feature_panels.items():
    fwd_cols = [c for c in fp.columns if c.startswith("fwd_return_")]
    feat_cols = [c for c in fp.columns if not c.startswith("fwd_return_") and not c.endswith("_decile")]
    decile_cols = [c for c in fp.columns if c.endswith("_decile")]
    print("%s: %d raw features, %d decile features, horizons: %s" % (
        m, len(feat_cols), len(decile_cols), fwd_cols))

## 6. Candidate Generator

Three methods, run **per market** and **per forward-return horizon**:
- **A. Decile Conditions** — single & 2-feature decile combos with explosion control
- **B. Decision Trees** — shallow trees with feature subsampling
- **C. Logistic Rank** — quintile strategies from logistic regression

Early rejection: skip if sample < min, preliminary Sharpe < 0, or lift < 1.02x.

In [None]:
from itertools import combinations

all_candidates_list = []   # collect across markets & horizons

for market in CFG.markets:
    if market not in feature_panels:
        continue
    fp = feature_panels[market]
    fwd_cols = sorted([c for c in fp.columns if c.startswith("fwd_return_")])
    feat_cols = [c for c in fp.columns if not c.startswith("fwd_return_") and not c.endswith("_decile")]
    decile_cols = [c for c in fp.columns if c.endswith("_decile")]

    for fwd_col in fwd_cols:
        horizon = fwd_col  # e.g. fwd_return_21d
        horizon_tag = fwd_col.replace("fwd_return_", "")  # e.g. 21d
        STEP = "candidates_decile_%s_%s" % (market, horizon_tag)
        save_path = os.path.join(CFG.candidates_dir(market), "decile_%s.parquet" % horizon_tag)

        if tracker.is_completed(STEP):
            logger.info("[SKIP] %s" % STEP)
            df_cached = pd.read_parquet(save_path)
            all_candidates_list.append(df_cached)
            print("  Loaded %d decile candidates for %s/%s" % (len(df_cached), market, horizon_tag))
            continue

        logger.info("[RUN] %s" % STEP)
        t0 = time.time()
        valid = fp.dropna(subset=[fwd_col]).copy()
        # Detect n_bins from actual decile values in data
        if decile_cols:
            n_bins = int(valid[decile_cols[0]].dropna().max()) + 1
        else:
            n_bins = 10

        unconditional_mean = valid[fwd_col].mean()
        candidates = []

        # --- Single-feature conditions ---
        for col in decile_cols:
            for dv in range(n_bins):
                mask = valid[col] == dv
                n_trades = int(mask.sum())
                if n_trades < CFG.min_sample_size:
                    continue
                ret = valid.loc[mask, fwd_col]
                mr = float(ret.mean())
                # Early rejection
                if mr <= 0 and (unconditional_mean > 0):
                    continue
                wr = float((ret > 0).mean())
                candidates.append({
                    "strategy_id": "%s_%s_%s_d%d" % (market, horizon_tag, col, dv),
                    "market": market, "horizon": horizon_tag,
                    "type": "single_decile",
                    "features": col,
                    "condition": "== %d" % dv,
                    "n_trades": n_trades,
                    "mean_return": mr,
                    "win_rate": wr,
                })

        logger.info("  Single decile: %d" % len(candidates))

        # --- 2-feature combos (extreme deciles only) ---
        extreme = [0, 1, n_bins - 2, n_bins - 1] if n_bins >= 4 else list(range(n_bins))
        n_before = len(candidates)
        pair_count = 0

        for col_a, col_b in combinations(decile_cols, 2):
            if len(candidates) - n_before >= CFG.max_candidates_per_feature_pair * len(list(combinations(decile_cols, 2))):
                break
            pair_cands = 0
            for da in extreme:
                for db in extreme:
                    if pair_cands >= CFG.max_candidates_per_feature_pair:
                        break
                    mask = (valid[col_a] == da) & (valid[col_b] == db)
                    n_trades = int(mask.sum())
                    if n_trades < CFG.min_sample_size:
                        continue
                    ret = valid.loc[mask, fwd_col]
                    mr = float(ret.mean())
                    std_r = float(ret.std())
                    if std_r > 1e-8 and mr / std_r < 0:
                        continue  # preliminary Sharpe < 0
                    wr = float((ret > 0).mean())
                    candidates.append({
                        "strategy_id": "%s_%s_%s_d%d_AND_%s_d%d" % (
                            market, horizon_tag, col_a, da, col_b, db),
                        "market": market, "horizon": horizon_tag,
                        "type": "combo_decile",
                        "features": "%s, %s" % (col_a, col_b),
                        "condition": "%s==%d AND %s==%d" % (col_a, da, col_b, db),
                        "n_trades": n_trades,
                        "mean_return": mr,
                        "win_rate": wr,
                    })
                    pair_cands += 1
            pair_count += 1
            if pair_count % 50 == 0:
                print("    %d pairs, %d candidates" % (pair_count, len(candidates)))

            # Global cap
            if len(candidates) >= CFG.max_candidates_total:
                logger.info("  Hit max_candidates_total cap")
                break

        logger.info("  Combo decile: %d" % (len(candidates) - n_before))
        dc = pd.DataFrame(candidates)
        dc.to_parquet(save_path)
        all_candidates_list.append(dc)

        elapsed = time.time() - t0
        tracker.mark_completed(STEP, {"n": len(dc), "time_sec": elapsed})
        print("  Decile candidates %s/%s: %d (%.0fs)" % (market, horizon_tag, len(dc), elapsed))
        gc.collect()

print("\nTotal decile candidates across all markets/horizons: %d" % sum(len(d) for d in all_candidates_list))

In [None]:
from sklearn.tree import DecisionTreeClassifier
import pickle

for market in CFG.markets:
    if market not in feature_panels:
        continue
    fp = feature_panels[market]
    fwd_cols = sorted([c for c in fp.columns if c.startswith("fwd_return_")])
    feat_cols = [c for c in fp.columns if not c.startswith("fwd_return_") and not c.endswith("_decile")]

    for fwd_col in fwd_cols:
        horizon_tag = fwd_col.replace("fwd_return_", "")
        STEP = "candidates_tree_%s_%s" % (market, horizon_tag)
        save_path = os.path.join(CFG.candidates_dir(market), "tree_%s.parquet" % horizon_tag)

        if tracker.is_completed(STEP):
            logger.info("[SKIP] %s" % STEP)
            all_candidates_list.append(pd.read_parquet(save_path))
            continue

        logger.info("[RUN] %s" % STEP)
        t0 = time.time()
        valid = fp.dropna(subset=[fwd_col] + feat_cols).copy()
        X = valid[feat_cols].values.astype(np.float32)
        y = (valid[fwd_col].values > 0).astype(int)

        tree_strats = []
        n_feat = len(feat_cols)
        n_sub = max(3, int(n_feat * CFG.tree_feature_subsample))

        for ti in range(CFG.n_trees):
            feat_idx = np.random.choice(n_feat, n_sub, replace=False)
            feat_names = [feat_cols[j] for j in feat_idx]
            X_sub = X[:, feat_idx]
            sample_idx = np.random.choice(len(X_sub), min(len(X_sub), 50000), replace=False)

            tree = DecisionTreeClassifier(
                max_depth=CFG.tree_max_depth,
                min_samples_leaf=CFG.tree_min_samples_leaf,
                random_state=CFG.seed + ti,
            )
            tree.fit(X_sub[sample_idx], y[sample_idx])

            leaf_ids = tree.apply(X_sub)
            for leaf in np.unique(leaf_ids):
                lmask = tree.apply(X[:, feat_idx]) == leaf
                nt = int(lmask.sum())
                if nt < CFG.min_sample_size:
                    continue
                ret = valid[fwd_col].values[lmask]
                mr = float(np.nanmean(ret))
                if mr <= 0:
                    continue
                tree_strats.append({
                    "strategy_id": "%s_%s_tree_%d_leaf_%d" % (market, horizon_tag, ti, leaf),
                    "market": market, "horizon": horizon_tag,
                    "type": "decision_tree",
                    "features": ", ".join(feat_names[:5]),
                    "condition": "tree_%d/leaf_%d" % (ti, leaf),
                    "n_trades": nt,
                    "mean_return": mr,
                    "win_rate": float((ret > 0).mean()),
                })

            tp = os.path.join(CFG.candidates_dir(market), "tree_model_%s_%d.pkl" % (horizon_tag, ti))
            with open(tp, 'wb') as f:
                pickle.dump({"tree": tree, "features": feat_names, "feat_idx": feat_idx.tolist()}, f)

            if (ti + 1) % 5 == 0:
                print("    Tree %d/%d, %d candidates" % (ti + 1, CFG.n_trees, len(tree_strats)))

        tc = pd.DataFrame(tree_strats)
        tc.to_parquet(save_path)
        all_candidates_list.append(tc)

        elapsed = time.time() - t0
        tracker.mark_completed(STEP, {"n": len(tc), "time_sec": elapsed})
        print("  Tree candidates %s/%s: %d (%.0fs)" % (market, horizon_tag, len(tc), elapsed))
        gc.collect()

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

for market in CFG.markets:
    if market not in feature_panels:
        continue
    fp = feature_panels[market]
    fwd_cols = sorted([c for c in fp.columns if c.startswith("fwd_return_")])
    feat_cols = [c for c in fp.columns if not c.startswith("fwd_return_") and not c.endswith("_decile")]

    for fwd_col in fwd_cols:
        horizon_tag = fwd_col.replace("fwd_return_", "")
        STEP = "candidates_logistic_%s_%s" % (market, horizon_tag)
        save_path = os.path.join(CFG.candidates_dir(market), "logistic_%s.parquet" % horizon_tag)

        if tracker.is_completed(STEP):
            logger.info("[SKIP] %s" % STEP)
            all_candidates_list.append(pd.read_parquet(save_path))
            continue

        logger.info("[RUN] %s" % STEP)
        t0 = time.time()
        valid = fp.dropna(subset=[fwd_col] + feat_cols).copy()
        X = valid[feat_cols].values.astype(np.float32)
        y = (valid[fwd_col].values > 0).astype(int)

        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)

        dates_sorted = valid.index.get_level_values(0)
        split_date = dates_sorted.unique()[int(len(dates_sorted.unique()) * 0.8)]
        train_mask = dates_sorted <= split_date

        lr = LogisticRegression(max_iter=1000, C=0.1, penalty='l2',
                                random_state=CFG.seed, solver='lbfgs')
        lr.fit(X_scaled[train_mask], y[train_mask])

        proba = lr.predict_proba(X_scaled)[:, 1]
        quintile_edges = np.percentile(proba, [0, 20, 40, 60, 80, 100]).tolist()

        logistic_strats = []
        for q in range(5):
            if q == 4:
                qm = proba >= quintile_edges[q]
            else:
                qm = (proba >= quintile_edges[q]) & (proba < quintile_edges[q + 1])
            nt = int(qm.sum())
            if nt < CFG.min_sample_size:
                continue
            ret = valid[fwd_col].values[qm]
            logistic_strats.append({
                "strategy_id": "%s_%s_logistic_q%d" % (market, horizon_tag, q + 1),
                "market": market, "horizon": horizon_tag,
                "type": "logistic_rank",
                "features": "all_features",
                "condition": "logistic_quintile_%d" % (q + 1),
                "n_trades": nt,
                "mean_return": float(np.nanmean(ret)),
                "win_rate": float((ret > 0).mean()),
            })

        mp = os.path.join(CFG.candidates_dir(market), "logistic_model_%s.pkl" % horizon_tag)
        with open(mp, 'wb') as f:
            pickle.dump({"model": lr, "scaler": scaler, "features": feat_cols,
                         "quintile_edges": quintile_edges}, f)

        lc = pd.DataFrame(logistic_strats)
        lc.to_parquet(save_path)
        all_candidates_list.append(lc)

        elapsed = time.time() - t0
        tracker.mark_completed(STEP, {"n": len(lc), "time_sec": elapsed})
        print("  Logistic candidates %s/%s: %d (%.0fs)" % (market, horizon_tag, len(lc), elapsed))
        gc.collect()

In [None]:
all_candidates = pd.concat(all_candidates_list, ignore_index=True) if all_candidates_list else pd.DataFrame()

print("=== All Candidates ===")
print("Total: %d" % len(all_candidates))
if len(all_candidates) > 0:
    print("\nBy market:")
    print(all_candidates["market"].value_counts())
    print("\nBy horizon:")
    print(all_candidates["horizon"].value_counts())
    print("\nBy type:")
    print(all_candidates["type"].value_counts())
    print("\nTop 10 by mean return:")
    print(all_candidates.nlargest(10, "mean_return")[
        ["strategy_id", "market", "horizon", "type", "n_trades", "mean_return", "win_rate"]
    ].to_string(index=False))

## 7. Edge Evaluation Engine

For each candidate, compute net-of-cost metrics: win rate, Sharpe, max drawdown,
lift, expectancy. Results saved incrementally — already-evaluated strategies
are skipped on resume.

In [None]:
def evaluate_strategy_edge(returns):
    """Compute edge metrics for a strategy return array."""
    returns = returns[~np.isnan(returns)]
    n = len(returns)
    if n < 30:
        return None
    mean_ret = float(np.mean(returns))
    std_ret = float(np.std(returns, ddof=1))
    win_rate = float((returns > 0).mean())
    avg_win = float(np.mean(returns[returns > 0])) if (returns > 0).any() else 0.0
    avg_loss = float(np.mean(returns[returns <= 0])) if (returns <= 0).any() else 0.0
    fwd_days_approx = 21
    sharpe = (mean_ret / std_ret * np.sqrt(252 / max(1, fwd_days_approx))) if std_ret > 1e-8 else 0.0
    cum = np.cumsum(returns)
    running_max = np.maximum.accumulate(cum)
    max_dd = float(np.min(cum - running_max)) if len(cum) > 0 else 0.0
    expectancy = avg_win * win_rate + avg_loss * (1 - win_rate)
    return {
        "n_trades": n, "mean_return": mean_ret, "std_return": std_ret,
        "win_rate": win_rate, "avg_win": avg_win, "avg_loss": avg_loss,
        "sharpe": float(sharpe), "max_drawdown": max_dd,
        "expectancy": float(expectancy),
    }


def build_mask(data, stype, cand_row, sid, market, horizon_tag, feat_cols_list):
    """Build boolean mask for a strategy on the given data slice."""
    if stype == "single_decile":
        col = cand_row["features"]
        dv = int(cand_row["condition"].split("== ")[1])
        return data[col] == dv
    elif stype == "combo_decile":
        parts = cand_row["condition"].split(" AND ")
        ca, va = parts[0].split("==")
        cb, vb = parts[1].split("==")
        return (data[ca.strip()] == int(va)) & (data[cb.strip()] == int(vb))
    elif stype == "decision_tree":
        parts = sid.split("_")
        # format: {market}_{horizon}_tree_{idx}_leaf_{id}
        tree_num = parts[2] if parts[2].isdigit() else None
        leaf_id = None
        for i, p in enumerate(parts):
            if p == "tree":
                tree_num = parts[i + 1]
            if p == "leaf":
                leaf_id = int(parts[i + 1])
        tp = os.path.join(CFG.candidates_dir(market), "tree_model_%s_%s.pkl" % (horizon_tag, tree_num))
        with open(tp, 'rb') as f:
            td = pickle.load(f)
        missing = [fn for fn in td["features"] if fn not in feat_cols_list]
        if missing:
            raise ValueError("Missing features: %s" % missing)
        fi = [feat_cols_list.index(fn) for fn in td["features"]]
        X = data[feat_cols_list].values[:, fi].astype(np.float32)
        np.nan_to_num(X, copy=False)
        return pd.Series(td["tree"].apply(X) == leaf_id, index=data.index)
    elif stype == "logistic_rank":
        q_num = int(sid.split("_q")[1])
        mp = os.path.join(CFG.candidates_dir(market), "logistic_model_%s.pkl" % horizon_tag)
        with open(mp, 'rb') as f:
            ld = pickle.load(f)
        X = data[feat_cols_list].values.astype(np.float32)
        np.nan_to_num(X, copy=False)
        proba = ld["model"].predict_proba(ld["scaler"].transform(X))[:, 1]
        edges = ld["quintile_edges"]
        if q_num == 5:
            return pd.Series(proba >= edges[q_num - 1], index=data.index)
        return pd.Series((proba >= edges[q_num - 1]) & (proba < edges[q_num]), index=data.index)
    return pd.Series(False, index=data.index)


print("Edge evaluation & mask builder defined.")

In [None]:
_EDGE_COLS = ["strategy_id", "market", "horizon", "type", "n_trades",
              "mean_return", "std_return", "win_rate", "avg_win", "avg_loss",
              "sharpe", "max_drawdown", "expectancy", "lift"]

all_edge_results = []

for market in CFG.markets:
    if market not in feature_panels:
        continue
    fp = feature_panels[market]
    fwd_cols = sorted([c for c in fp.columns if c.startswith("fwd_return_")])
    feat_cols = [c for c in fp.columns if not c.startswith("fwd_return_") and not c.endswith("_decile")]

    for fwd_col in fwd_cols:
        horizon_tag = fwd_col.replace("fwd_return_", "")
        STEP = "edge_eval_%s_%s" % (market, horizon_tag)
        eval_path = os.path.join(CFG.evaluation_dir(market), "edge_%s.parquet" % horizon_tag)

        if tracker.is_completed(STEP):
            logger.info("[SKIP] %s" % STEP)
            all_edge_results.append(pd.read_parquet(eval_path))
            continue

        logger.info("[RUN] %s" % STEP)
        t0 = time.time()
        valid = fp.dropna(subset=[fwd_col] + feat_cols).copy()
        unconditional_mean = float(valid[fwd_col].mean())

        # Filter candidates for this market/horizon
        mh_cands = all_candidates[
            (all_candidates["market"] == market) & (all_candidates["horizon"] == horizon_tag)
        ]
        logger.info("Evaluating %d candidates for %s/%s" % (len(mh_cands), market, horizon_tag))

        # Incremental resume
        existing_ids = set()
        eval_rows = []
        if os.path.exists(eval_path):
            edf = pd.read_parquet(eval_path)
            existing_ids = set(edf["strategy_id"].values)
            eval_rows = edf.to_dict('records')

        to_eval = mh_cands[~mh_cands["strategy_id"].isin(existing_ids)]

        for idx, (_, row) in enumerate(to_eval.iterrows()):
            sid = row["strategy_id"]
            stype = row["type"]
            try:
                mask = build_mask(valid, stype, row, sid, market, horizon_tag, feat_cols)
                returns = valid.loc[mask, fwd_col].values
                edge = evaluate_strategy_edge(returns)
                if edge is None:
                    continue
                edge["lift"] = edge["mean_return"] - unconditional_mean
                edge["strategy_id"] = sid
                edge["market"] = market
                edge["horizon"] = horizon_tag
                edge["type"] = stype
                eval_rows.append(edge)
            except Exception as e:
                logger.warning("Eval err %s: %s" % (sid, str(e)[:60]))

            if (idx + 1) % CFG.gc_every_n_candidates == 0:
                pd.DataFrame(eval_rows).to_parquet(eval_path)
                gc.collect()
                print("    %d/%d evaluated, checkpoint" % (idx + 1, len(to_eval)))

        edf = pd.DataFrame(eval_rows) if eval_rows else pd.DataFrame(columns=_EDGE_COLS)
        edf.to_parquet(eval_path)
        all_edge_results.append(edf)

        elapsed = time.time() - t0
        tracker.mark_completed(STEP, {"n": len(edf), "time_sec": elapsed})
        print("  Edge eval %s/%s: %d (%.0fs)" % (market, horizon_tag, len(edf), elapsed))
        gc.collect()

edge_results = pd.concat(all_edge_results, ignore_index=True) if all_edge_results else pd.DataFrame(columns=_EDGE_COLS)
print("\nTotal edge results: %d" % len(edge_results))
if len(edge_results) > 0:
    print(edge_results[["mean_return", "win_rate", "sharpe", "lift"]].describe())

## 8. Walk-Forward Validation

Rolling walk-forward with embargo. Pre-filters to top candidates per market/horizon.
Computes per-fold turnover estimates.

**Controls:**
- `wf_min_folds` enforcement
- Embargo days between train and test
- Per-fold checkpointing

In [None]:
from dateutil.relativedelta import relativedelta

_WF_COLS = ["strategy_id", "market", "horizon", "fold_idx", "n_trades",
            "mean_return", "std_return", "win_rate", "avg_win", "avg_loss",
            "sharpe", "max_drawdown", "expectancy", "turnover",
            "test_start", "test_end"]

all_wf_results = []

for market in CFG.markets:
    if market not in feature_panels:
        continue
    fp = feature_panels[market]
    fwd_cols = sorted([c for c in fp.columns if c.startswith("fwd_return_")])
    feat_cols = [c for c in fp.columns if not c.startswith("fwd_return_") and not c.endswith("_decile")]

    for fwd_col in fwd_cols:
        horizon_tag = fwd_col.replace("fwd_return_", "")
        STEP = "walkforward_%s_%s" % (market, horizon_tag)
        wf_path = os.path.join(CFG.walkforward_dir(market), "wf_%s.parquet" % horizon_tag)

        if tracker.is_completed(STEP):
            logger.info("[SKIP] %s" % STEP)
            all_wf_results.append(pd.read_parquet(wf_path))
            continue

        logger.info("[RUN] %s" % STEP)
        t0 = time.time()
        valid = fp.dropna(subset=[fwd_col]).copy()

        # Pre-filter: top candidates for this market/horizon
        mh_edge = edge_results[
            (edge_results["market"] == market) & (edge_results["horizon"] == horizon_tag)
        ]
        if len(mh_edge) > 200:
            top_ids = mh_edge.nlargest(200, "sharpe")["strategy_id"].tolist()
        elif len(mh_edge) > 0:
            top_ids = mh_edge[mh_edge["sharpe"] > 0.2]["strategy_id"].tolist()
            if len(top_ids) < 20:
                top_ids = mh_edge.nlargest(min(50, len(mh_edge)), "sharpe")["strategy_id"].tolist()
        else:
            top_ids = []

        if not top_ids:
            logger.info("No candidates for WF %s/%s" % (market, horizon_tag))
            tracker.mark_completed(STEP, {"n": 0})
            continue

        logger.info("WF on %d candidates for %s/%s" % (len(top_ids), market, horizon_tag))

        all_dates = valid.index.get_level_values(0).unique().sort_values()
        min_date, max_date = all_dates.min(), all_dates.max()

        # Generate folds
        folds = []
        ts = min_date
        while True:
            te = ts + relativedelta(years=CFG.wf_train_years)
            vs = te + pd.Timedelta(days=CFG.wf_embargo_days)
            ve = vs + relativedelta(months=CFG.wf_test_months)
            if ve > max_date:
                break
            folds.append((ts, te, vs, ve))
            ts += relativedelta(months=CFG.wf_step_months)

        if len(folds) < CFG.wf_min_folds:
            logger.warning("Only %d folds for %s/%s (min %d) — skipping" % (
                len(folds), market, horizon_tag, CFG.wf_min_folds))
            tracker.mark_completed(STEP, {"n": 0, "reason": "insufficient_folds"})
            continue

        logger.info("  %d folds" % len(folds))

        # Partial resume
        partial_path = os.path.join(CFG.walkforward_dir(market), "wf_partial_%s.parquet" % horizon_tag)
        wf_rows = []
        completed_keys = set()
        if os.path.exists(partial_path):
            pdf = pd.read_parquet(partial_path)
            wf_rows = pdf.to_dict('records')
            completed_keys = set(zip(pdf["strategy_id"], pdf["fold_idx"].astype(int)))

        mh_cands = all_candidates[
            (all_candidates["market"] == market) & (all_candidates["horizon"] == horizon_tag)
        ]

        for fi, (train_s, train_e, test_s, test_e) in enumerate(folds):
            d_idx = valid.index.get_level_values(0)
            train_data = valid[(d_idx >= train_s) & (d_idx < train_e)]
            test_data = valid[(d_idx >= test_s) & (d_idx < test_e)]

            for sid in top_ids:
                if (sid, fi) in completed_keys:
                    continue
                try:
                    cr = mh_cands[mh_cands["strategy_id"] == sid].iloc[0]
                    stype = cr["type"]
                    test_mask_strat = build_mask(test_data, stype, cr, sid, market, horizon_tag, feat_cols)
                    test_returns = test_data.loc[test_mask_strat, fwd_col].values
                    if len(test_returns) < 20:
                        continue
                    edge = evaluate_strategy_edge(test_returns)
                    if edge is None:
                        continue

                    # Turnover estimation within fold
                    test_dates = test_data.index.get_level_values(0).unique().sort_values()
                    rebal_dates = test_dates[::21]
                    turnovers = []
                    prev_set = None
                    for dt in rebal_dates:
                        try:
                            dt_ix = test_data.index.get_level_values(0) == dt
                            day_sig = test_mask_strat[dt_ix]
                            sel = set(test_data.index[dt_ix][day_sig].get_level_values(1))
                        except Exception:
                            continue
                        if prev_set is not None and (prev_set or sel):
                            u = len(prev_set | sel)
                            if u > 0:
                                turnovers.append(len(prev_set ^ sel) / u)
                        prev_set = sel
                    ann_turnover = float(np.mean(turnovers) * 252 / 21) if turnovers else 0.0

                    edge["strategy_id"] = sid
                    edge["market"] = market
                    edge["horizon"] = horizon_tag
                    edge["fold_idx"] = fi
                    edge["turnover"] = ann_turnover
                    edge["test_start"] = str(test_s.date())
                    edge["test_end"] = str(test_e.date())
                    wf_rows.append(edge)
                except Exception:
                    pass

            if wf_rows:
                pd.DataFrame(wf_rows).to_parquet(partial_path)
            print("    Fold %d/%d done" % (fi + 1, len(folds)))

        wdf = pd.DataFrame(wf_rows) if wf_rows else pd.DataFrame(columns=_WF_COLS)
        wdf.to_parquet(wf_path)
        all_wf_results.append(wdf)

        if os.path.exists(partial_path):
            os.remove(partial_path)

        elapsed = time.time() - t0
        tracker.mark_completed(STEP, {"n": len(wdf), "folds": len(folds), "time_sec": elapsed})
        print("  WF %s/%s: %d results, %d folds (%.0fs)" % (
            market, horizon_tag, len(wdf), len(folds), elapsed))
        gc.collect()

wf_results = pd.concat(all_wf_results, ignore_index=True) if all_wf_results else pd.DataFrame(columns=_WF_COLS)
print("\nTotal WF results: %d" % len(wf_results))
if len(wf_results) > 0:
    print("Strategies: %d | Folds: %d" % (
        wf_results["strategy_id"].nunique(), wf_results["fold_idx"].nunique()))

## 9. Overfitting Control (Strengthened)

Filters strategies using:
- **Stability** >= 0.5 (fraction of folds with positive return)
- **Sharpe** >= 0.5 across folds
- **Win rate** >= 0.52
- **Sign-flip rejection**: penalise strategies whose return sign flips across folds
- **Bootstrap CI** with `min_samples` guard
- **FDR correction** (Benjamini-Hochberg) for multiple testing

In [None]:
STEP = "overfitting_control"
filtered_path = os.path.join(CFG.global_eval_dir, "filtered_strategies.parquet")

if tracker.is_completed(STEP):
    logger.info("[SKIP] %s" % STEP)
    filtered_strategies = pd.read_parquet(filtered_path)
    print("Loaded %d filtered strategies." % len(filtered_strategies))
else:
    logger.info("[RUN] %s" % STEP)
    t0 = time.time()

    if len(wf_results) == 0:
        print("No walk-forward results. Skipping.")
        filtered_strategies = pd.DataFrame()
        tracker.mark_completed(STEP, {"n": 0})
    else:
        strategy_stats = []
        for sid, group in wf_results.groupby("strategy_id"):
            n_folds = len(group)
            if n_folds < 2:
                continue
            fold_returns = group["mean_return"].values
            fold_sharpes = group["sharpe"].values
            fold_win_rates = group["win_rate"].values
            fold_turnovers = group["turnover"].values if "turnover" in group.columns else np.zeros(n_folds)

            stability = float((fold_returns > 0).mean())
            mean_sharpe = float(np.mean(fold_sharpes))
            sharpe_std = float(np.std(fold_sharpes, ddof=1))
            mean_win_rate = float(np.mean(fold_win_rates))
            min_win_rate = float(np.min(fold_win_rates))
            sign_flips = int(np.sum(np.diff(np.sign(fold_returns)) != 0))
            mean_turnover = float(np.mean(fold_turnovers))

            all_n = group["n_trades"].values
            total_trades = int(all_n.sum())

            # Bootstrap CI for win rate
            if total_trades >= CFG.bootstrap_min_samples:
                total_wins = int((fold_win_rates * all_n).sum())
                bs = np.random.binomial(total_trades, total_wins / max(1, total_trades), CFG.bootstrap_n)
                bs_wr = bs / total_trades
                ci_low = float(np.percentile(bs_wr, (1 - CFG.bootstrap_ci) / 2 * 100))
                ci_high = float(np.percentile(bs_wr, (1 + CFG.bootstrap_ci) / 2 * 100))
            else:
                ci_low, ci_high = 0.0, 1.0

            # Compute a p-value for win rate > 0.5 (binomial test)
            from scipy import stats as sp_stats
            if total_trades > 0:
                total_wins_int = int((fold_win_rates * all_n).sum())
                pval = sp_stats.binomtest(total_wins_int, total_trades, 0.5, alternative='greater').pvalue
            else:
                pval = 1.0

            mkt = group["market"].iloc[0] if "market" in group.columns else ""
            hor = group["horizon"].iloc[0] if "horizon" in group.columns else ""

            strategy_stats.append({
                "strategy_id": sid, "market": mkt, "horizon": hor,
                "n_folds": n_folds, "stability": stability,
                "mean_sharpe": mean_sharpe, "sharpe_std": sharpe_std,
                "mean_win_rate": mean_win_rate, "min_win_rate": min_win_rate,
                "sign_flips": sign_flips, "total_trades": total_trades,
                "mean_turnover": mean_turnover,
                "wr_ci_low": ci_low, "wr_ci_high": ci_high,
                "pval_wr": pval,
                "mean_return": float(np.mean(fold_returns)),
            })

        stats_df = pd.DataFrame(strategy_stats)

        # FDR correction
        if CFG.apply_multiple_testing_correction and len(stats_df) > 0 and "pval_wr" in stats_df.columns:
            from statsmodels.stats.multitest import multipletests
            reject, pvals_corr, _, _ = multipletests(
                stats_df["pval_wr"].fillna(1.0).values,
                alpha=0.05, method='fdr_bh',
            )
            stats_df["pval_fdr"] = pvals_corr
            stats_df["fdr_reject"] = reject
            logger.info("FDR correction: %d / %d rejected null" % (reject.sum(), len(reject)))
        else:
            stats_df["pval_fdr"] = stats_df.get("pval_wr", 1.0)
            stats_df["fdr_reject"] = True

        # Apply filters
        n_before = len(stats_df)
        mask = (
            (stats_df["stability"] >= CFG.min_stability)
            & (stats_df["mean_sharpe"] >= CFG.min_sharpe)
            & (stats_df["mean_win_rate"] >= CFG.min_win_rate)
            & (stats_df["wr_ci_low"] >= 0.48)
            & (stats_df["fdr_reject"] == True)
        )
        filtered_strategies = stats_df[mask].copy()
        filtered_strategies = filtered_strategies.sort_values("mean_sharpe", ascending=False)

        print("\nOverfitting filters:")
        print("  Before: %d" % n_before)
        print("  Stability >= %.1f: %d" % (CFG.min_stability, (stats_df["stability"] >= CFG.min_stability).sum()))
        print("  Sharpe >= %.1f: %d" % (CFG.min_sharpe, (stats_df["mean_sharpe"] >= CFG.min_sharpe).sum()))
        print("  Win rate >= %.2f: %d" % (CFG.min_win_rate, (stats_df["mean_win_rate"] >= CFG.min_win_rate).sum()))
        print("  CI lower >= 0.48: %d" % (stats_df["wr_ci_low"] >= 0.48).sum())
        print("  FDR significant: %d" % stats_df["fdr_reject"].sum())
        print("  After ALL: %d" % len(filtered_strategies))

        filtered_strategies.to_parquet(filtered_path)
        elapsed = time.time() - t0
        tracker.mark_completed(STEP, {"n": len(filtered_strategies), "time_sec": elapsed})
        gc.collect()

print("\nFiltered strategies: %d" % len(filtered_strategies))
if len(filtered_strategies) > 0:
    print(filtered_strategies[
        ["strategy_id", "market", "horizon", "stability", "mean_sharpe",
         "mean_win_rate", "wr_ci_low", "mean_turnover", "total_trades"]
    ].head(20).to_string(index=False))

## 10. Turnover & Cost Filtering

Reject strategies whose estimated annualised turnover exceeds `max_turnover`.
Apply a turnover penalty to the remaining strategies' scores.

In [None]:
STEP = "turnover_filter"
turnover_path = os.path.join(CFG.global_eval_dir, "turnover_filtered.parquet")

if tracker.is_completed(STEP):
    logger.info("[SKIP] %s" % STEP)
    turnover_filtered = pd.read_parquet(turnover_path)
    print("Loaded %d turnover-filtered strategies." % len(turnover_filtered))
else:
    logger.info("[RUN] %s" % STEP)

    if len(filtered_strategies) == 0:
        turnover_filtered = pd.DataFrame()
    else:
        before = len(filtered_strategies)
        turnover_filtered = filtered_strategies[
            filtered_strategies["mean_turnover"] <= CFG.max_turnover
        ].copy()
        print("Turnover filter (max %.1f): %d -> %d" % (
            CFG.max_turnover, before, len(turnover_filtered)))

    turnover_filtered.to_parquet(turnover_path) if len(turnover_filtered) > 0 else None
    tracker.mark_completed(STEP, {"n": len(turnover_filtered)})

print("Turnover-filtered: %d" % len(turnover_filtered))

## 11. Strategy Scoring (Turnover-Aware)

Composite score per strategy:
```
Score = w_stability * S + w_sharpe * Sh + w_lift * L + w_sample * N - penalty_turnover * T
```

Followed by correlation-based clustering to remove redundant strategies.
Keep top `portfolio_max_strategies` per market per horizon.

In [None]:
from sklearn.cluster import AgglomerativeClustering

STEP = "scoring"
scored_path = os.path.join(CFG.global_eval_dir, "scored_strategies.parquet")

if tracker.is_completed(STEP):
    logger.info("[SKIP] %s" % STEP)
    scored_strategies = pd.read_parquet(scored_path)
    print("Loaded %d scored strategies." % len(scored_strategies))
else:
    logger.info("[RUN] %s" % STEP)

    if len(turnover_filtered) == 0:
        print("No strategies passed turnover filter.")
        scored_strategies = pd.DataFrame()
        tracker.mark_completed(STEP, {"n": 0})
    else:
        df = turnover_filtered.copy()

        def norm(s):
            r = s.max() - s.min()
            return (s - s.min()) / r if r > 1e-8 else pd.Series(0.5, index=s.index)

        df["stability_norm"] = norm(df["stability"])
        df["sharpe_norm"] = norm(df["mean_sharpe"])

        # Lift from edge results
        lift_map = edge_results.set_index("strategy_id")["lift"].to_dict() if len(edge_results) > 0 else {}
        df["lift"] = df["strategy_id"].map(lift_map).fillna(0)
        df["lift_norm"] = norm(df["lift"])
        df["sample_score"] = norm(np.log1p(df["total_trades"]))

        # Turnover penalty
        turnover_norm = norm(df["mean_turnover"]) if "mean_turnover" in df.columns else 0

        df["composite_score"] = (
            CFG.w_stability * df["stability_norm"]
            + CFG.w_sharpe * df["sharpe_norm"]
            + CFG.w_lift * df["lift_norm"]
            + CFG.w_sample * df["sample_score"]
            - CFG.penalty_turnover * turnover_norm
        )

        # --- Correlation filter via clustering ---
        if CFG.cluster_strategies and len(df) > 5 and len(wf_results) > 0:
            pivot = wf_results.pivot_table(
                values="mean_return", index="fold_idx",
                columns="strategy_id", aggfunc="first",
            )
            scored_ids = df["strategy_id"].tolist()
            pivot = pivot[[c for c in pivot.columns if c in scored_ids]].dropna(axis=1, how='all').fillna(0)

            if pivot.shape[1] >= 5:
                corr = pivot.corr().values
                distance = 1 - np.abs(corr)
                np.fill_diagonal(distance, 0)
                distance = np.maximum(distance, 0)

                n_clust = max(3, min(CFG.portfolio_max_strategies, len(pivot.columns) // 3))
                clustering = AgglomerativeClustering(
                    n_clusters=n_clust, metric='precomputed', linkage='average',
                )
                labels = clustering.fit_predict(distance)
                cluster_map = dict(zip(pivot.columns, labels))
                df["cluster"] = df["strategy_id"].map(cluster_map)

                # Keep best per cluster
                df = df.sort_values("composite_score", ascending=False)
                best_per = df.dropna(subset=["cluster"]).groupby("cluster").first().reset_index(drop=True)
                # Also keep unclustered
                unclustered = df[df["cluster"].isna()]
                df = pd.concat([best_per, unclustered], ignore_index=True)
                print("Correlation clustering: %d clusters, %d strategies kept" % (n_clust, len(df)))

        # Keep top N per market per horizon
        final_parts = []
        for (m, h), grp in df.groupby(["market", "horizon"]):
            top = grp.nlargest(CFG.portfolio_max_strategies, "composite_score")
            final_parts.append(top)

        scored_strategies = pd.concat(final_parts, ignore_index=True) if final_parts else pd.DataFrame()
        scored_strategies = scored_strategies.sort_values("composite_score", ascending=False).reset_index(drop=True)
        scored_strategies["rank"] = range(1, len(scored_strategies) + 1)
        scored_strategies.to_parquet(scored_path)

        tracker.mark_completed(STEP, {"n": len(scored_strategies)})

print("\n=== SCORED STRATEGIES ===")
if len(scored_strategies) > 0:
    print(scored_strategies[
        ["rank", "strategy_id", "market", "horizon", "composite_score",
         "stability", "mean_sharpe", "mean_win_rate", "mean_turnover"]
    ].head(30).to_string(index=False))

## 12. Regime Analysis

Split walk-forward results into **bull** and **bear** regimes based on market
momentum during each test period. Reject strategies where the min-regime Sharpe
is less than `min_regime_performance_ratio` times the max-regime Sharpe.

In [None]:
STEP = "regime_analysis"
regime_path = os.path.join(CFG.global_eval_dir, "regime_results.parquet")

if tracker.is_completed(STEP):
    logger.info("[SKIP] %s" % STEP)
    regime_filtered = pd.read_parquet(regime_path)
    print("Loaded %d regime-filtered strategies." % len(regime_filtered))
else:
    logger.info("[RUN] %s" % STEP)

    if not CFG.evaluate_by_regime or len(scored_strategies) == 0 or len(wf_results) == 0:
        regime_filtered = scored_strategies.copy() if len(scored_strategies) > 0 else pd.DataFrame()
        if len(regime_filtered) > 0:
            regime_filtered.to_parquet(regime_path)
        tracker.mark_completed(STEP, {"n": len(regime_filtered), "skipped": True})
    else:
        # Classify each WF fold as bull or bear
        wf_with_regime = wf_results.copy()
        wf_with_regime["regime"] = "unknown"

        for idx, row in wf_with_regime.iterrows():
            mkt = row.get("market", "US")
            ts = pd.Timestamp(row["test_start"])
            te = pd.Timestamp(row["test_end"])
            mkt_idx = market_indices.get(mkt, pd.DataFrame())
            if "close" not in mkt_idx.columns or mkt_idx.empty:
                continue
            mc = mkt_idx["close"]
            period = mc.loc[ts:te]
            if len(period) >= 2:
                ret = (period.iloc[-1] / period.iloc[0]) - 1
                wf_with_regime.at[idx, "regime"] = "bull" if ret > 0 else "bear"

        # Per-strategy regime performance
        regime_stats = []
        scored_ids = set(scored_strategies["strategy_id"])
        for sid, grp in wf_with_regime[wf_with_regime["strategy_id"].isin(scored_ids)].groupby("strategy_id"):
            bull = grp[grp["regime"] == "bull"]["sharpe"]
            bear = grp[grp["regime"] == "bear"]["sharpe"]
            bull_mean = float(bull.mean()) if len(bull) > 0 else 0.0
            bear_mean = float(bear.mean()) if len(bear) > 0 else 0.0
            max_regime = max(abs(bull_mean), abs(bear_mean))
            min_regime = min(bull_mean, bear_mean)
            ratio = min_regime / max_regime if max_regime > 1e-8 else 0.0
            regime_stats.append({
                "strategy_id": sid,
                "bull_sharpe": bull_mean,
                "bear_sharpe": bear_mean,
                "regime_ratio": ratio,
            })

        rdf = pd.DataFrame(regime_stats)
        # Merge and filter
        merged = scored_strategies.merge(rdf, on="strategy_id", how="left")
        merged["regime_ratio"] = merged["regime_ratio"].fillna(0)

        before = len(merged)
        regime_filtered = merged[merged["regime_ratio"] >= CFG.min_regime_performance_ratio].copy()
        # If too aggressive, keep all
        if len(regime_filtered) == 0 and len(merged) > 0:
            logger.warning("Regime filter removed all strategies — relaxing to keep top half")
            merged = merged.sort_values("regime_ratio", ascending=False)
            regime_filtered = merged.head(max(1, len(merged) // 2)).copy()

        print("Regime filter: %d -> %d" % (before, len(regime_filtered)))
        regime_filtered.to_parquet(regime_path)
        tracker.mark_completed(STEP, {"n": len(regime_filtered)})

print("Regime-filtered strategies: %d" % len(regime_filtered))

## 13. Portfolio-Level Evaluation

- Per market: equal-weight top strategies
- Cross-market combination with FX-adjusted returns
- Diversification benefit: combined Sharpe vs average individual Sharpe

In [None]:
STEP = "portfolio_eval"
port_path = os.path.join(CFG.global_eval_dir, "portfolio_results.json")

if tracker.is_completed(STEP):
    logger.info("[SKIP] %s" % STEP)
    with open(port_path, 'r') as f:
        portfolio_results = json.load(f)
    print("Portfolio results loaded.")
else:
    logger.info("[RUN] %s" % STEP)
    portfolio_results = {"per_market": {}, "cross_market": {}}

    final_ids = regime_filtered["strategy_id"].tolist() if len(regime_filtered) > 0 else []

    # Per-market portfolio
    for mkt in CFG.markets:
        mkt_ids = [s for s in final_ids if s.startswith(mkt + "_")]
        if not mkt_ids:
            continue
        mkt_wf = wf_results[wf_results["strategy_id"].isin(mkt_ids)]
        if mkt_wf.empty:
            continue
        port_ret = mkt_wf.groupby("fold_idx")["mean_return"].mean()
        port_sharpe = float(
            port_ret.mean() / port_ret.std() * np.sqrt(252 / 21)
        ) if port_ret.std() > 1e-8 else 0.0
        port_total = float(port_ret.sum())
        avg_indiv_sharpe = float(
            mkt_wf.groupby("strategy_id")["sharpe"].mean().mean()
        )
        diversification = port_sharpe / avg_indiv_sharpe if avg_indiv_sharpe > 1e-8 else 1.0

        portfolio_results["per_market"][mkt] = {
            "n_strategies": len(mkt_ids),
            "portfolio_sharpe": round(port_sharpe, 4),
            "total_return": round(port_total, 6),
            "avg_individual_sharpe": round(avg_indiv_sharpe, 4),
            "diversification_ratio": round(diversification, 4),
            "win_folds": int((port_ret > 0).sum()),
            "total_folds": len(port_ret),
        }
        print("%s portfolio: Sharpe=%.2f, Return=%.4f, Diversification=%.2f" % (
            mkt, port_sharpe, port_total, diversification))

    # Cross-market portfolio
    if final_ids and len(wf_results) > 0:
        cross_wf = wf_results[wf_results["strategy_id"].isin(final_ids)]
        if not cross_wf.empty:
            cross_ret = cross_wf.groupby("fold_idx")["mean_return"].mean()
            cross_sharpe = float(
                cross_ret.mean() / cross_ret.std() * np.sqrt(252 / 21)
            ) if cross_ret.std() > 1e-8 else 0.0
            portfolio_results["cross_market"] = {
                "n_strategies": len(final_ids),
                "portfolio_sharpe": round(cross_sharpe, 4),
                "total_return": round(float(cross_ret.sum()), 6),
                "win_folds": int((cross_ret > 0).sum()),
                "total_folds": len(cross_ret),
            }
            print("\nCross-market portfolio: Sharpe=%.2f, Return=%.4f" % (
                cross_sharpe, float(cross_ret.sum())))

    with open(port_path, 'w') as f:
        json.dump(portfolio_results, f, indent=2)
    tracker.mark_completed(STEP, portfolio_results)

print("\n=== Portfolio Summary ===")
print(json.dumps(portfolio_results, indent=2))

## 14. Final Output & Dashboard

- Per-market ranked strategy tables
- Net performance after costs
- Equity curves per market
- Turnover metrics
- Cross-market portfolio & diversification analysis

In [None]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

if len(scored_strategies) == 0:
    print("No strategies to visualize.")
else:
    fig = plt.figure(figsize=(22, 18))
    gs = gridspec.GridSpec(3, 2, hspace=0.40, wspace=0.30)

    # --- Panel 1: Composite score bar (top 20) ---
    ax1 = fig.add_subplot(gs[0, 0])
    top20 = scored_strategies.head(20)
    colors = ['#4CAF50' if s >= 0.7 else '#FFC107' if s >= 0.4 else '#F44336'
              for s in top20['composite_score']]
    ax1.barh(range(len(top20)), top20['composite_score'], color=colors, edgecolor='white')
    ax1.set_yticks(range(len(top20)))
    labels = (top20['market'] + "/" + top20['horizon'] + " " + top20['strategy_id'].str[-20:])
    ax1.set_yticklabels(labels, fontsize=6)
    ax1.set_xlabel('Composite Score')
    ax1.set_title('Strategy Rankings (Top 20)', fontweight='bold')
    ax1.invert_yaxis()

    # --- Panel 2: Sharpe stability across folds (top 5) ---
    ax2 = fig.add_subplot(gs[0, 1])
    top5 = scored_strategies.head(5)
    for _, row in top5.iterrows():
        sid = row['strategy_id']
        fd = wf_results[wf_results['strategy_id'] == sid]
        if len(fd) > 0:
            ax2.plot(fd['fold_idx'], fd['sharpe'], 'o-',
                     label=sid[:30], linewidth=2, markersize=5)
    ax2.axhline(y=0, color='gray', linewidth=0.5, linestyle='--')
    ax2.set_xlabel('Fold')
    ax2.set_ylabel('Sharpe Ratio')
    ax2.set_title('Stability: Sharpe Across Folds (Top 5)', fontweight='bold')
    ax2.legend(fontsize=6)
    ax2.grid(alpha=0.3)

    # --- Panel 3: Equity curves (top 5) ---
    ax3 = fig.add_subplot(gs[1, 0])
    for _, row in top5.iterrows():
        sid = row['strategy_id']
        fd = wf_results[wf_results['strategy_id'] == sid].sort_values('fold_idx')
        if len(fd) > 0:
            cum = np.cumsum(fd['mean_return'].values)
            ax3.plot(range(len(cum)), cum, 'o-', label=sid[:30], linewidth=2)
    ax3.axhline(y=0, color='gray', linewidth=0.5, linestyle='--')
    ax3.set_xlabel('Walk-Forward Fold')
    ax3.set_ylabel('Cumulative Return (net of costs)')
    ax3.set_title('Equity Curves (Top 5)', fontweight='bold')
    ax3.legend(fontsize=6)
    ax3.grid(alpha=0.3)

    # --- Panel 4: Per-market portfolio ---
    ax4 = fig.add_subplot(gs[1, 1])
    for mkt in CFG.markets:
        mkt_ids = [s for s in regime_filtered["strategy_id"].tolist() if s.startswith(mkt + "_")] if len(regime_filtered) > 0 else []
        if not mkt_ids:
            continue
        mkt_wf = wf_results[wf_results["strategy_id"].isin(mkt_ids)]
        if mkt_wf.empty:
            continue
        pret = mkt_wf.groupby("fold_idx")["mean_return"].mean().sort_index()
        cum = np.cumsum(pret.values)
        ax4.plot(range(len(cum)), cum, 'o-', label="%s (%d strats)" % (mkt, len(mkt_ids)), linewidth=2)
    ax4.axhline(y=0, color='gray', linewidth=0.5, linestyle='--')
    ax4.set_xlabel('Walk-Forward Fold')
    ax4.set_ylabel('Cumulative Return')
    ax4.set_title('Per-Market Portfolios', fontweight='bold')
    ax4.legend(fontsize=8)
    ax4.grid(alpha=0.3)

    # --- Panel 5: Turnover distribution ---
    ax5 = fig.add_subplot(gs[2, 0])
    if "mean_turnover" in scored_strategies.columns:
        ax5.hist(scored_strategies["mean_turnover"].dropna(), bins=30, color='steelblue', edgecolor='white')
        ax5.axvline(x=CFG.max_turnover, color='red', linestyle='--', label='Max turnover (%.1f)' % CFG.max_turnover)
        ax5.set_xlabel('Annualised Turnover')
        ax5.set_ylabel('Count')
        ax5.set_title('Turnover Distribution', fontweight='bold')
        ax5.legend()

    # --- Panel 6: Cross-market diversification ---
    ax6 = fig.add_subplot(gs[2, 1])
    sharpes = []
    labels_bar = []
    for mkt in CFG.markets:
        pm = portfolio_results.get("per_market", {}).get(mkt, {})
        if pm:
            sharpes.append(pm["portfolio_sharpe"])
            labels_bar.append(mkt)
    cm = portfolio_results.get("cross_market", {})
    if cm:
        sharpes.append(cm["portfolio_sharpe"])
        labels_bar.append("Cross-Market")
    if sharpes:
        bar_colors = ['#2196F3'] * (len(sharpes) - 1) + ['#4CAF50'] if len(sharpes) > 1 else ['#2196F3']
        ax6.bar(labels_bar, sharpes, color=bar_colors, edgecolor='white')
        ax6.set_ylabel('Portfolio Sharpe')
        ax6.set_title('Diversification Benefit', fontweight='bold')
        ax6.grid(axis='y', alpha=0.3)

    fig.suptitle('Multi-Market Quant Pipeline v2 — Results Dashboard',
                 fontsize=16, fontweight='bold', y=1.01)
    plt.savefig(os.path.join(CFG.drive_root, 'pipeline_results_v2.png'),
                dpi=150, bbox_inches='tight')
    plt.show()
    print("Dashboard saved.")

In [None]:
print("=" * 70)
print("PIPELINE v2 COMPLETE")
print("=" * 70)

tracker.summary()

if len(scored_strategies) > 0:
    print("\n=== Top Strategy ===")
    best = scored_strategies.iloc[0]
    print("  ID:           %s" % best['strategy_id'])
    print("  Market:       %s" % best.get('market', ''))
    print("  Horizon:      %s" % best.get('horizon', ''))
    print("  Composite:    %.4f" % best['composite_score'])
    print("  Stability:    %.2f" % best['stability'])
    print("  Sharpe:       %.2f" % best['mean_sharpe'])
    print("  Win Rate:     %.2f%%" % (best['mean_win_rate'] * 100))
    print("  Turnover:     %.2f" % best.get('mean_turnover', 0))
    print("  Total Trades: %d" % best['total_trades'])

    # Portfolio summary
    print("\n=== Portfolio Summary ===")
    for mkt, pm in portfolio_results.get("per_market", {}).items():
        print("  %s: Sharpe=%.2f  Return=%.4f  Diversification=%.2f  Strats=%d" % (
            mkt, pm["portfolio_sharpe"], pm["total_return"],
            pm["diversification_ratio"], pm["n_strategies"]))
    cm = portfolio_results.get("cross_market", {})
    if cm:
        print("  CROSS-MARKET: Sharpe=%.2f  Return=%.4f  Strats=%d" % (
            cm["portfolio_sharpe"], cm["total_return"], cm["n_strategies"]))

    # Save final report
    report = {
        "pipeline_version": "v2",
        "markets": CFG.markets,
        "forward_horizons": CFG.forward_days_list,
        "cost_bps_roundtrip": CFG.total_cost_bps * 2,
        "n_candidates_total": len(all_candidates),
        "n_edge_evaluated": len(edge_results),
        "n_walk_forward": len(wf_results),
        "n_filtered": len(filtered_strategies),
        "n_turnover_filtered": len(turnover_filtered) if 'turnover_filtered' in dir() else 0,
        "n_scored": len(scored_strategies),
        "n_regime_filtered": len(regime_filtered) if 'regime_filtered' in dir() else 0,
        "top_strategy": best['strategy_id'],
        "top_composite": float(best['composite_score']),
        "portfolio": portfolio_results,
    }
    report_path = os.path.join(CFG.drive_root, 'report_v2.json')
    with open(report_path, 'w') as f:
        json.dump(report, f, indent=2)
    print("\nReport saved to:", report_path)
else:
    print("\nNo viable strategies found. Consider:")
    print("  - Relaxing filter thresholds")
    print("  - Adding more tickers or longer data period")
    print("  - Reducing min_sample_size")

print("\n" + "=" * 70)