# Run full epistasis pipeline

**Single config:** Sources, tools, and per-tool batch sizes live in `notebooks/processing/pipeline_config.py`. Edit that file to add sources or change tools. Output layout: `{output_base}/{source}/{model_key}.db` and `{output_base}/sheets/epistasis_metrics_{tool}.parquet`.

**Execution (one tool at a time):** For each conda env (main, evo2, alphagenome), we run all sources (okgp first) for that env's tools. cov_inv is fitted from sources whose name contains "okgp" and used for Mahalanobis (epi_mahal, etc.) on all other sources. SpliceAI runs only for sources containing "fas".

**Notebook flow:**  
1. **Config** — Import from `pipeline_config`; set `ENV_PROFILE` for this kernel (main / evo2 / alphagenome).  
2. **Step 1 (Embed)** — Run embeddings for the current env's tools over all sources (by-tool: one model load per tool).  
3. **Step 2 (cov_inv)** — Compute cov_inv from okgp sources only.  
4. **Step 3 (Sheets)** — Recompute metrics with cov_inv; save one parquet per tool to `output_base/sheets/`.

**On a cluster (Lambda, 8× A100):** Run once per env, then metrics:
```bash
bash scripts/run_pipeline_cluster.sh 2>&1 | tee pipeline.log
```
In another terminal, monitor progress and GPU: `bash scripts/monitor_pipeline.sh`. Progress is written to `output_base/pipeline_status.json` (or `PIPELINE_STATUS_FILE`). To verify GPU and wrappers before a full run: `bash scripts/run_pipeline_cluster.sh --quick-test --env-profile main`. Or per profile: `--env-profile main`, then evo2, then alphagenome; then `--phase metrics`.

In [None]:
# ---------------------------------------------------------------------------
# Config: from pipeline_config + paper_data_config
# ---------------------------------------------------------------------------
# Edit pipeline_config.py to change sources, tools, batch_size per tool.
import sys
from pathlib import Path
ROOT = Path.cwd()
for _ in range(4):
    if (ROOT / "notebooks" / "paper_data_config.py").exists():
        break
    ROOT = ROOT.parent
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from notebooks.processing.pipeline_config import (
    SOURCE_COL, ID_COL, SOURCE_MODEL_MAP, COV_INV_SOURCE_NAMES,
    get_single_dataframe_path, resolve_sources, get_batch_size,
)
from notebooks.processing.process_epistasis import get_model_keys_for_env
from notebooks.paper_data_config import EPISTASIS_PAPER_ROOT, data_dir, embeddings_dir

# Which env this kernel is running in (main | evo2 | alphagenome). Step 1 runs only this env's tools.
ENV_PROFILE = "main"
MODEL_KEYS = get_model_keys_for_env(ENV_PROFILE)
OUTPUT_BASE = embeddings_dir()
SHEETS_DIR = OUTPUT_BASE / "sheets"
SPLICEAI_MODEL_DIR = None  # or set path / env OPENSPLICEAI_MODEL_DIR

print(f"ENV_PROFILE={ENV_PROFILE!r} -> MODEL_KEYS={MODEL_KEYS}")
print(f"Data root: {EPISTASIS_PAPER_ROOT}")
print(f"Output base: {OUTPUT_BASE}")

In [None]:
# ---------------------------------------------------------------------------
# Step 1: Embed (one tool at a time over all sources); one .db per model per source
# ---------------------------------------------------------------------------
import logging
import pandas as pd
from notebooks.processing.process_epistasis import run_sources_by_tool, run_from_single_dataframe

logging.basicConfig(level=logging.INFO)
batch_size_by_model = {k: get_batch_size(k) for k in MODEL_KEYS}

single_path = get_single_dataframe_path(data_dir)
if single_path is not None:
    df_all = pd.read_csv(single_path, sep=None, engine="python")
    run_from_single_dataframe(
        df_all,
        output_base=OUTPUT_BASE,
        source_col=SOURCE_COL,
        model_keys=MODEL_KEYS,
        source_model_map=SOURCE_MODEL_MAP,
        id_col=ID_COL,
        show_progress=True,
        force=False,
        batch_size=16,
        batch_size_by_model=batch_size_by_model,
        use_by_tool=True,
    )
    SOURCE_NAMES = df_all[SOURCE_COL].dropna().astype(str).unique().tolist()
else:
    sources = resolve_sources(data_dir)
    df_all = None
    SOURCE_NAMES = [name for name, _ in sources]
    run_sources_by_tool(
        sources,
        output_base=OUTPUT_BASE,
        model_keys=MODEL_KEYS,
        source_model_map=SOURCE_MODEL_MAP,
        spliceai_model_dir=SPLICEAI_MODEL_DIR,
        id_col=ID_COL,
        show_progress=True,
        force=False,
        batch_size=16,
        batch_size_by_model=batch_size_by_model,
    )

In [None]:
# ---------------------------------------------------------------------------
# Step 2: Compute cov_inv from rows whose source is in COV_INV_SOURCE_NAMES (see pipeline_config)
# ---------------------------------------------------------------------------
from notebooks.processing.process_epistasis import compute_cov_inv, FULL_MODEL_CONFIG

if df_all is not None:
    src = df_all[SOURCE_COL].astype(str)
    df_subset = df_all[src.isin(COV_INV_SOURCE_NAMES)]
    okgp_sources = [s for s in src.unique() if s in COV_INV_SOURCE_NAMES]
else:
    sources = resolve_sources(data_dir)
    okgp_sources = [n for n, _ in sources if n in COV_INV_SOURCE_NAMES]
    dfs = []
    for name, path in sources:
        if name not in COV_INV_SOURCE_NAMES:
            continue
        d = pd.read_csv(path, sep=None, engine="python")
        d[SOURCE_COL] = name
        dfs.append(d)
    df_subset = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
if len(df_subset) == 0:
    raise ValueError("No rows with source in COV_INV_SOURCE_NAMES=%s; need data for cov_inv." % COV_INV_SOURCE_NAMES)
model_keys_for_cov = [
    k for k in FULL_MODEL_CONFIG
    if any((OUTPUT_BASE / src / f"{k}.db").exists() for src in okgp_sources)
]
if not model_keys_for_cov:
    model_keys_for_cov = MODEL_KEYS

cov_inv_by_model = compute_cov_inv(
    OUTPUT_BASE,
    source_df=df_subset,
    source_col=SOURCE_COL,
    id_col=ID_COL,
    model_keys=model_keys_for_cov,
    method="ledoit_wolf",
    show_progress=True,
)
print("cov_inv for models:", list(cov_inv_by_model.keys()))

In [None]:
# ---------------------------------------------------------------------------
# Step 3: Recompute metrics with cov_inv → one sheet (parquet) per tool
# ---------------------------------------------------------------------------
from notebooks.processing.process_epistasis import recompute_metrics_with_cov_inv

if df_all is None:
    single_path = get_single_dataframe_path(data_dir)
    if single_path is not None:
        df_full = pd.read_csv(single_path, sep=None, engine="python")
    else:
        sources = resolve_sources(data_dir)
        dfs = []
        for name, path in sources:
            d = pd.read_csv(path)
            d[SOURCE_COL] = name
            dfs.append(d)
        df_full = pd.concat(dfs, ignore_index=True)
else:
    df_full = df_all
if SOURCE_COL not in df_full.columns or ID_COL not in df_full.columns:
    raise ValueError(f"DataFrame must have {SOURCE_COL!r} and {ID_COL!r}")

metrics_by_tool = recompute_metrics_with_cov_inv(
    OUTPUT_BASE,
    df_full,
    cov_inv_by_model,
    source_col=SOURCE_COL,
    model_keys=list(cov_inv_by_model),
    id_col=ID_COL,
    spliceai_model_dir=SPLICEAI_MODEL_DIR,
    show_progress=True,
)

SHEETS_DIR.mkdir(parents=True, exist_ok=True)
for tool, tbl in metrics_by_tool.items():
    out = SHEETS_DIR / f"epistasis_metrics_{tool}.parquet"
    tbl.to_parquet(out, index=False)
    print(f"  {tool}: {tbl.shape[0]} rows -> {out.name}")

In [None]:
# Downstream: cov, cov_inv = cov_inv_by_model["nt500_multi"]; sheets in SHEETS_DIR

In [None]:
# Optional: null cov packs are already in OUTPUT_BASE/null_cov/{model_key}_pack.npz