# inferCNV (infercnvpy)

**Design goals (for GitHub / peer review):**
- All analysis parameters are collected in `CONFIG`.
- All paths are **relative** to the repository root.
- All outputs are written under `results/infercnv/`.
- No cell outputs are stored in this notebook.

**Input used in this project**
- `data/1_Hepato_count.h5ad` (treated as the input to this notebook)

> Notes  
> - `infercnvpy` expects **normalized and log-transformed** expression values (log-space).  
> - Genomic coordinates must be available in `adata.var[["chromosome","start","end"]]`.


In [None]:
from pathlib import Path

# =========================
# CONFIG (edit paths/keys here)
# =========================
CONFIG = {
    # ---- Input ----
    "INPUT_H5AD": Path("data/1_Hepato_count.h5ad"),

    # ---- Output root ----
    "OUTDIR": Path("results/infercnv"),

    # ---- Reproducibility ----
    "SEED": 0,
    "FIG_DPI": 200,

    # ---- Expression matrix for infercnvpy ----
    # infercnvpy expects log-transformed data (log-space).
    #
    # If your `INPUT_H5AD` stores raw counts in `adata.X` (common for "*_count*.h5ad"),
    # set AUTO_LOG_NORMALIZE=True to automatically create a log-normalized layer.
    "AUTO_LOG_NORMALIZE": True,
    "COUNTS_LAYER": "counts",   # used when AUTO_LOG_NORMALIZE=True (falls back to X if missing)
    "TARGET_SUM": 1e4,          # normalize_total target sum

    # ---- Genomic gene positions ----
    # Option A (recommended offline): local 4-column TSV with:
    #   gene_name    chromosome    start    end
    # If the file is missing, the notebook can download it automatically.
    "GENE_POS_TSV": Path("resources/hg38_gencode_v27.txt"),
    "GENE_POS_URL": "https://data.broadinstitute.org/Trinity/CTAT/cnv/hg38_gencode_v27.txt",

    # Option B: use a (GENCODE) GTF file instead of the TSV above.
    # If GTF_PATH is not None, the notebook will prefer the GTF.
    "GTF_PATH": None,           # e.g. Path("resources/gencode.v38.annotation.gtf")
    "GTF_GENE_ID": "gene_name", # "gene_name" or "gene_id"
    "ADATA_GENE_ID": None,      # None -> use adata.var_names, or set to a column in adata.var

    # ---- Reference definition (normal cells) ----
    # If REFERENCE_KEY is None, infercnvpy uses the average of all cells as reference.
    # If you know which cells are "normal", set REFERENCE_KEY and REFERENCE_CAT.
    "REFERENCE_KEY": "type",
    "REFERENCE_CAT": ["0"],     # one or multiple categories in obs[REFERENCE_KEY]

    # ---- Plotting ----
    # Optional: restrict chromosome heatmaps to a subset of cells.
    # Set PLOT_SUBSET_CAT=None to plot all cells.
    "PLOT_SUBSET_KEY": "type",
    "PLOT_SUBSET_CAT": ["1"],   # e.g. tumor cells only

    # Grouping variable for heatmaps (e.g., "subtype" / "leiden" / "sample")
    "PLOT_GROUPBY": "subtype",

    # ---- infercnv parameters ----
    "WINDOW_SIZE": 100,
    "STEP": 10,
    "DYNAMIC_THRESHOLD": 1.5,
    "EXCLUDE_CHROMS": ("chrX", "chrY"),
    "N_JOBS": None,

    # ---- CNV embedding / clustering ----
    "LEIDEN_RES": 0.3,
}

# =========================
# Output directories
# =========================
DIRS = {
    "root": CONFIG["OUTDIR"],
    "fig": CONFIG["OUTDIR"] / "figures",
    "adata": CONFIG["OUTDIR"] / "adata",
    "tables": CONFIG["OUTDIR"] / "tables",
}

for d in DIRS.values():
    d.mkdir(parents=True, exist_ok=True)

print(f"[CONFIG] INPUT_H5AD={CONFIG['INPUT_H5AD']}")
print(f"[CONFIG] OUTDIR={CONFIG['OUTDIR'].resolve()}")


In [None]:
import os
import random
import sys
import warnings
from importlib.metadata import PackageNotFoundError, version

import numpy as np
import pandas as pd
import scanpy as sc
from scipy import sparse

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

# -------------------------
# Reproducibility
# -------------------------
random.seed(CONFIG["SEED"])
np.random.seed(CONFIG["SEED"])

# Matplotlib defaults (vector-friendly fonts)
mpl.rcParams["pdf.fonttype"] = 42
mpl.rcParams["ps.fonttype"] = 42

# Scanpy defaults
sc.settings.verbosity = 2
sc.settings.set_figure_params(dpi=CONFIG["FIG_DPI"], facecolor="white")

warnings.filterwarnings("ignore", category=FutureWarning)

def _v(pkg: str) -> str:
    try:
        return version(pkg)
    except PackageNotFoundError:
        return "not-installed"

print("Versions:")
for pkg in ["python", "numpy", "pandas", "scanpy", "anndata", "matplotlib", "scipy", "infercnvpy"]:
    print(f" - {pkg}: {sys.version.split()[0] if pkg == 'python' else _v(pkg)}")

try:
    import infercnvpy as cnv
except Exception as e:
    raise ImportError(
        "infercnvpy is required. Install it via `pip install infercnvpy` "
        "or use the project's environment.yml / requirements.txt."
    ) from e


## Data loading

In [None]:
# Load input AnnData
if not CONFIG["INPUT_H5AD"].exists():
    raise FileNotFoundError(
        f"Missing input file: {CONFIG['INPUT_H5AD']}. "
        "Place it under `data/` (recommended) or edit CONFIG['INPUT_H5AD']."
    )

adata = sc.read_h5ad(CONFIG["INPUT_H5AD"])
adata.var_names_make_unique()

print(adata)

# Ensure key obs columns exist (for plotting / reference definition)
for k in [CONFIG["REFERENCE_KEY"], CONFIG["PLOT_SUBSET_KEY"], CONFIG["PLOT_GROUPBY"]]:
    if k is None:
        continue
    if k not in adata.obs.columns:
        print(f"[WARN] obs['{k}'] not found in input. You may need to update CONFIG.")
    else:
        # work with string categories for robustness (e.g., '0'/'1')
        adata.obs[k] = adata.obs[k].astype(str)

# Optional: quick UMAP sanity check (if embeddings already exist)
if "X_umap" in adata.obsm:
    sc.pl.umap(adata, color=[k for k in [CONFIG['PLOT_GROUPBY'], CONFIG['REFERENCE_KEY']] if k in adata.obs], wspace=0.4)


## Expression preprocessing (log-normalization)

In [None]:
# Create a working copy so we don't modify the on-disk object
adata_cnv = adata.copy()

def _matrix_stats(X, seed: int = 0, n: int = 200_000):
    """Heuristic summary statistics for (sparse) expression matrices."""
    rng = np.random.default_rng(seed)
    if sparse.issparse(X):
        data = X.data
        if data.size == 0:
            return {"dtype": str(X.dtype), "max": 0.0, "p99": 0.0, "intlike": True}
        idx = rng.choice(data.size, size=min(n, data.size), replace=False)
        sample = data[idx]
    else:
        arr = np.asarray(X)
        flat = arr.ravel()
        idx = rng.choice(flat.size, size=min(n, flat.size), replace=False)
        sample = flat[idx]
    return {
        "dtype": str(X.dtype),
        "max": float(np.max(sample)),
        "p99": float(np.quantile(sample, 0.99)),
        "intlike": bool(np.all(np.isclose(sample, np.round(sample)))),
    }

CNV_LAYER = None  # None -> use adata_cnv.X

if CONFIG["AUTO_LOG_NORMALIZE"]:
    # Decide whether `adata.X` already looks like log-normalized data.
    stats_X = _matrix_stats(adata_cnv.X, seed=CONFIG["SEED"])
    looks_like_counts = (np.issubdtype(adata_cnv.X.dtype, np.integer)) or (stats_X["max"] > 20 and stats_X["intlike"])

    print("[X stats]", stats_X, "| looks_like_counts:", looks_like_counts)

    if looks_like_counts:
        # Build a log-normalized layer for infercnvpy.
        base_layer = CONFIG["COUNTS_LAYER"] if CONFIG["COUNTS_LAYER"] in adata_cnv.layers else None
        if base_layer is None:
            print("[AUTO_LOG_NORMALIZE] Using adata.X as raw counts source.")
            adata_cnv.layers["log_norm"] = adata_cnv.X.copy()
        else:
            print(f"[AUTO_LOG_NORMALIZE] Using adata.layers['{base_layer}'] as raw counts source.")
            adata_cnv.layers["log_norm"] = adata_cnv.layers[base_layer].copy()

        sc.pp.normalize_total(adata_cnv, target_sum=CONFIG["TARGET_SUM"], layer="log_norm")
        sc.pp.log1p(adata_cnv, layer="log_norm")

        CNV_LAYER = "log_norm"
        print("[AUTO_LOG_NORMALIZE] Created adata.layers['log_norm'] for infercnvpy.")
    else:
        print("[AUTO_LOG_NORMALIZE] adata.X does not look like raw counts. Using adata.X directly.")
else:
    print("[AUTO_LOG_NORMALIZE] Disabled. infercnvpy will use adata.X (must be log-normalized).")


## Add genomic positions (adata.var)

`infercnvpy` requires genomic positions for each gene, stored in:

- `adata.var["chromosome"]`
- `adata.var["start"]`
- `adata.var["end"]`

This notebook supports two ways to annotate gene positions:

1. **GTF (preferred if you already have it)**: set `CONFIG["GTF_PATH"]` to a local GENCODE GTF file.  
2. **TSV mapping file**: `CONFIG["GENE_POS_TSV"]` (auto-download from `CONFIG["GENE_POS_URL"]` if missing).


In [None]:
from pathlib import Path
from urllib.request import urlretrieve

def _download_if_missing(url: str, dest: Path):
    dest.parent.mkdir(parents=True, exist_ok=True)
    if dest.exists():
        return
    print(f"[DOWNLOAD] {url} -> {dest}")
    urlretrieve(url, dest)

def _ensure_chr_prefix(chrom):
    if pd.isna(chrom):
        return np.nan
    chrom = str(chrom)
    if chrom.startswith("chr"):
        return chrom
    if chrom in {"X", "Y"}:
        return f"chr{chrom}"
    if chrom in {"M", "MT"}:
        return "chrM"
    if chrom.isdigit():
        return f"chr{chrom}"
    return "chr" + chrom  # fallback

n_genes_before = adata_cnv.n_vars

if CONFIG["GTF_PATH"] is not None:
    gtf_path = Path(CONFIG["GTF_PATH"])
    if not gtf_path.exists():
        raise FileNotFoundError(f"GTF not found: {gtf_path}")
    cnv.io.genomic_position_from_gtf(
        gtf_file=gtf_path,
        adata=adata_cnv,
        gtf_gene_id=CONFIG["GTF_GENE_ID"],
        adata_gene_id=CONFIG["ADATA_GENE_ID"],
        inplace=True,
    )
else:
    # TSV option (auto-download if needed)
    tsv_path = Path(CONFIG["GENE_POS_TSV"])
    _download_if_missing(CONFIG["GENE_POS_URL"], tsv_path)

    # Expected columns: gene_name, chromosome, start, end (no header in Broad's file)
    pos = pd.read_csv(
        tsv_path,
        sep="\t",
        header=None,
        names=["gene_name", "chromosome", "start", "end"],
        dtype={"gene_name": str, "chromosome": str},
        on_bad_lines="skip",
    ).drop_duplicates(subset=["gene_name"])

    # Map by gene id (var_names by default)
    if CONFIG["ADATA_GENE_ID"] is None:
        gene_ids = pd.Index(adata_cnv.var_names.astype(str))
    else:
        if CONFIG["ADATA_GENE_ID"] not in adata_cnv.var.columns:
            raise KeyError(f"adata.var['{CONFIG['ADATA_GENE_ID']}'] not found")
        gene_ids = pd.Index(adata_cnv.var[CONFIG["ADATA_GENE_ID"]].astype(str))

    pos = pos.set_index("gene_name").reindex(gene_ids)

    chrom = pos["chromosome"].map(_ensure_chr_prefix)
    start = pd.to_numeric(pos["start"], errors="coerce")
    end = pd.to_numeric(pos["end"], errors="coerce")

    # Assign by position (avoid pandas index alignment issues when ADATA_GENE_ID != var_names)
    adata_cnv.var["chromosome"] = chrom.to_numpy()
    adata_cnv.var["start"] = start.to_numpy()
    adata_cnv.var["end"] = end.to_numpy()

# Filter genes without genomic coordinates
valid = (
    adata_cnv.var["chromosome"].notna()
    & pd.notnull(adata_cnv.var["start"])
    & pd.notnull(adata_cnv.var["end"])
)

n_valid = int(valid.sum())
print(f"[GENE_POS] {n_valid}/{n_genes_before} genes have valid genomic positions.")

adata_cnv = adata_cnv[:, valid].copy()

# Now that NaNs are removed, cast to integer coordinates for downstream use
adata_cnv.var["start"] = adata_cnv.var["start"].astype(int)
adata_cnv.var["end"] = adata_cnv.var["end"].astype(int)

print("[After gene filtering]", adata_cnv)


## Run infercnvpy

This computes a smoothed, denoised expression matrix along the genome and stores it in:

- `adata_cnv.obsm["X_cnv"]` (when `key_added="cnv"`)
- metadata in `adata_cnv.uns["cnv"]`

If you provide known normal cells via `REFERENCE_KEY` / `REFERENCE_CAT`, infercnvpy uses them as the reference background.
If not, it uses the average of all cells as reference (default behavior).


In [None]:
# Resolve reference settings
reference_key = CONFIG["REFERENCE_KEY"]
reference_cat = CONFIG["REFERENCE_CAT"]

if reference_key is not None:
    if reference_key not in adata_cnv.obs.columns:
        print(f"[WARN] reference_key='{reference_key}' not found in adata.obs. Falling back to average-of-all-cells reference.")
        reference_key = None
        reference_cat = None
    else:
        # Ensure categories are strings
        adata_cnv.obs[reference_key] = adata_cnv.obs[reference_key].astype(str)

        if reference_cat is not None:
            reference_cat = [str(x) for x in reference_cat]
            missing = sorted(set(reference_cat) - set(adata_cnv.obs[reference_key].unique()))
            if missing:
                print(f"[WARN] reference_cat values not found in obs['{reference_key}']: {missing}")
                print("[WARN] Falling back to average-of-all-cells reference.")
                reference_key = None
                reference_cat = None

print("[infercnv] reference_key =", reference_key)
print("[infercnv] reference_cat =", reference_cat)
print("[infercnv] layer =", CNV_LAYER)

cnv.tl.infercnv(
    adata_cnv,
    reference_key=reference_key,
    reference_cat=reference_cat,
    window_size=CONFIG["WINDOW_SIZE"],
    step=CONFIG["STEP"],
    dynamic_threshold=CONFIG["DYNAMIC_THRESHOLD"],
    exclude_chromosomes=CONFIG["EXCLUDE_CHROMS"],
    n_jobs=CONFIG["N_JOBS"],
    layer=CNV_LAYER,
    key_added="cnv",
)

print("CNV matrix stored in:", [k for k in adata_cnv.obsm_keys() if k.startswith("X_") and "cnv" in k])


## Chromosome-level heatmaps

We generate two common plots:

1. `chromosome_heatmap`: per-cell CNV heatmap along the genome (grouped by `PLOT_GROUPBY`)  
2. `chromosome_heatmap_summary`: average CNV profile per group


In [None]:
# Custom blue-white-red colormap (optional; change if you prefer the default 'bwr')
colors = ["#222d6a", "#3a4891", "#78a1df", "#ffffff", "#af6c81", "#964e5a", "#6d3643"]
custom_cnv = mcolors.LinearSegmentedColormap.from_list("custom_cnv", colors)

def _savefig(obj, outpath):
    outpath = Path(outpath)
    outpath.parent.mkdir(parents=True, exist_ok=True)

    fig = None
    if obj is None:
        fig = plt.gcf()
    elif hasattr(obj, "get_figure"):
        fig = obj.get_figure()
    elif hasattr(obj, "figure"):
        fig = obj.figure
    elif isinstance(obj, dict):
        for v in obj.values():
            if hasattr(v, "get_figure"):
                fig = v.get_figure()
                break
    if fig is None:
        fig = plt.gcf()

    fig.savefig(outpath, bbox_inches="tight")
    return fig

# Subset cells for plotting if requested
adata_plot = adata_cnv
if CONFIG["PLOT_SUBSET_CAT"] is not None:
    key = CONFIG["PLOT_SUBSET_KEY"]
    if key in adata_plot.obs:
        keep = adata_plot.obs[key].astype(str).isin([str(x) for x in CONFIG["PLOT_SUBSET_CAT"]])
        adata_plot = adata_plot[keep].copy()
        print(f"[PLOT] Subset: {adata_plot.n_obs} cells kept by {key} in {CONFIG['PLOT_SUBSET_CAT']}")
    else:
        print(f"[WARN] PLOT_SUBSET_KEY='{key}' not found in obs. Plotting all cells.")

groupby = CONFIG["PLOT_GROUPBY"] if CONFIG["PLOT_GROUPBY"] in adata_plot.obs else None
if groupby is None:
    raise KeyError(f"PLOT_GROUPBY='{CONFIG['PLOT_GROUPBY']}' not found in adata.obs.")

# 1) Per-cell CNV heatmap
ax = cnv.pl.chromosome_heatmap(
    adata_plot,
    groupby=groupby,
    cmap=custom_cnv,
    show=False,
)
_savefig(ax, DIRS["fig"] / "chromosome_heatmap.pdf")
_savefig(ax, DIRS["fig"] / "chromosome_heatmap.svg")
plt.close("all")

# 2) Group-average CNV heatmap
ax = cnv.pl.chromosome_heatmap_summary(
    adata_plot,
    groupby=groupby,
    cmap=custom_cnv,
    show=False,
)
_savefig(ax, DIRS["fig"] / "chromosome_heatmap_summary.pdf")
_savefig(ax, DIRS["fig"] / "chromosome_heatmap_summary.svg")
plt.close("all")

print("[PLOT] Saved figures to:", DIRS["fig"].resolve())


## CNV embedding, clustering, and CNV score

infercnvpy provides Scanpy-like wrappers to work on the CNV matrix:

- `cnv.tl.pca` → `cnv.pp.neighbors` → `cnv.tl.leiden` → `cnv.tl.umap`
- `cnv.tl.cnv_score` assigns each CNV cluster a score (higher = more aberrant)


## Export

In [None]:
# Compute CNV-based PCA / neighbors / Leiden / UMAP
cnv.tl.pca(adata_cnv)  # stores CNV PCA in adata.obsm['X_cnv_pca']
cnv.pp.neighbors(adata_cnv)  # stores graph in adata.obsp['cnv_neighbors_*']
cnv.tl.leiden(adata_cnv, resolution=CONFIG["LEIDEN_RES"])
cnv.tl.umap(adata_cnv)
cnv.tl.cnv_score(adata_cnv)

# Plot CNV UMAP
colors_to_plot = ["cnv_leiden", "cnv_score"]
if CONFIG["PLOT_GROUPBY"] in adata_cnv.obs:
    colors_to_plot.append(CONFIG["PLOT_GROUPBY"])
if CONFIG["REFERENCE_KEY"] in adata_cnv.obs:
    colors_to_plot.append(CONFIG["REFERENCE_KEY"])

ax = cnv.pl.umap(
    adata_cnv,
    color=colors_to_plot,
    wspace=0.4,
    show=False,
)
_savefig(ax, DIRS["fig"] / "cnv_umap.pdf")
_savefig(ax, DIRS["fig"] / "cnv_umap.svg")
plt.close("all")

print("[PLOT] Saved CNV UMAP to:", (DIRS["fig"] / "cnv_umap.pdf").resolve())


In [None]:
# Save the processed AnnData with CNV results
out_h5ad = DIRS["adata"] / "1_Hepato_count.infercnv.h5ad"
adata_cnv.write_h5ad(out_h5ad, compression="gzip")
print("[EXPORT] Saved:", out_h5ad.resolve())

# Optional: export per-cell annotations to a TSV (useful for reviewers)
obs_export = DIRS["tables"] / "cell_metadata_with_cnv.tsv"
cols = [c for c in [CONFIG["PLOT_GROUPBY"], CONFIG["REFERENCE_KEY"], "cnv_leiden", "cnv_score"] if c in adata_cnv.obs]
adata_cnv.obs[cols].to_csv(obs_export, sep="\t")
print("[EXPORT] Saved:", obs_export.resolve())
