# 04 SNP statistics QC

**Origin:** `0_4_snp_stats.ipynb`  
**This annotated version was generated on:** 2025-10-13 06:41

**What this notebook does (high level):**  
- Compute SNP-level descriptive statistics (MAF, INFO, HWE proxies), counts by annotation, and QC summaries.

**How to use:**  
1. Review the markdown notes before each code cell.  
2. Adjust input/output paths as needed for your environment.  
3. Run cell-by-cell to reproduce artifacts for downstream steps.

---


**Step 1:** Load tabular data (summary stats / annotations).

In [1]:
# %% [markdown]
# Regulatory-variant stats per cohort (fast; RSID join; lymphoid precedence)

import os, re, gzip
from pathlib import Path
import pandas as pd

# ---------- Paths ----------
mapped_dir  = "/mnt/f/0.datasets/ens_vcf_dhs/"  # rsid_in_DHS_chr{n}_{lym|mye}.tsv
results_dir = "/mnt/f/10_osteo_MR/_results_tables/"
Path(results_dir).mkdir(parents=True, exist_ok=True)

# Cohort files (already "within DHS" but we re-confirm by RSID join)
cohorts = {
    "outcome_osteo": "/mnt/f/10_osteo_MR/MR_ready/outcome_osteo_within_DHS.tsv",
    "exposure_eqtlgen": "/mnt/f/10_osteo_MR/MR_ready/exposure_eqtlgen_dhs_index.tsv",
    "exposure_gtex_wb_eqtl": "/mnt/f/10_osteo_MR/MR_ready/exposure_gtex_whole_blood_eqtl_dhs_index.tsv",
    "exposure_decode_pqtl": "/mnt/f/10_osteo_MR/MR_ready/exposure_pqtl_decode_MR_dhs.tsv.gz",
    "exposure_ukbppp_pqtl": "/mnt/f/10_osteo_MR/MR_ready/exposure_ukbppp_pqtl_MR_dhs.tsv.gz",
}

# Column hints for each file (so we only read what we need)
# - 'SNP' (rsID), 'pval' always read if present
# - 'gene' used for DHS-per-gene
needed_cols = {
    "outcome_osteo":       ["SNP", "pval"],
    "exposure_eqtlgen":    ["SNP", "pval", "gene"],
    "exposure_gtex_wb_eqtl":["SNP", "pval", "gene"],   # 'gene' present per your example
    "exposure_decode_pqtl":["SNP", "pval", "gene"],
    "exposure_ukbppp_pqtl":["SNP", "pval", "gene"],
}

# ---------- Parameters ----------
PVAL_THRESH = 5e-8  # change if you prefer FDR or dataset-specific cutoffs

# ---------- Build RSID -> (identifier, component) with lymphoid precedence ----------
rsid2dhs = {}  # rsid -> (identifier:str, component:str)

for chr_i in range(1, 23):
    # 1) load myeloid/ery first
    p_mye = os.path.join(mapped_dir, f"rsid_in_DHS_chr{chr_i}_mye.tsv")
    if Path(p_mye).exists():
        df_m = pd.read_csv(p_mye, sep=r"\s+|,", engine="python",
                           usecols=["rsid","identifier","component"],
                           dtype={"rsid":"string","identifier":"string","component":"string"})
        for r, ident, comp in zip(df_m["rsid"], df_m["identifier"], df_m["component"]):
            if pd.isna(r): continue
            rsid2dhs[str(r)] = (None if pd.isna(ident) else str(ident),
                                None if pd.isna(comp)  else str(comp))
    # 2) override with lymphoid (precedence)
    p_lym = os.path.join(mapped_dir, f"rsid_in_DHS_chr{chr_i}_lym.tsv")
    if Path(p_lym).exists():
        df_l = pd.read_csv(p_lym, sep=r"\s+|,", engine="python",
                           usecols=["rsid","identifier","component"],
                           dtype={"rsid":"string","identifier":"string","component":"string"})
        for r, ident, comp in zip(df_l["rsid"], df_l["identifier"], df_l["component"]):
            if pd.isna(r): continue
            rsid2dhs[str(r)] = (None if pd.isna(ident) else str(ident),
                                None if pd.isna(comp)  else str(comp))

print(f"[info] RSID→DHS mappings loaded: {len(rsid2dhs):,}")

# ---------- Helpers ----------
def read_table(path, usecols=None):
    return pd.read_csv(path, sep="\t", compression="infer", dtype="string",
                       usecols=[c for c in (usecols or []) if c is not None])

def stats_for_cohort(name, path):
    cols = needed_cols.get(name, ["SNP", "pval"])
    df = read_table(path, cols)
    if "SNP" not in df.columns:
        raise ValueError(f"{name}: missing 'SNP' column.")

    # Normalize dtypes
    df["SNP"]  = df["SNP"].astype("string")
    if "pval" in df.columns:
        # Convert to float; tolerate sci-notation in strings
        df["pval"] = pd.to_numeric(df["pval"], errors="coerce")

    # Attach DHS identifier/component via RSID map
    mapped = df["SNP"].map(rsid2dhs.get)
    has_map = mapped.notna()
    df_map = pd.DataFrame(mapped.tolist(), index=df.index, columns=["identifier","component"])
    df = pd.concat([df, df_map], axis=1)

    # Filter to rows with a valid DHS mapping (safety)
    df_in = df.loc[has_map].copy()

    # ---- Totals / significance (row-level) ----
    total_rows_in_dhs = int(len(df_in))
    sig_rows_in_dhs   = int((df_in["pval"] < PVAL_THRESH).sum()) if "pval" in df_in.columns else None
    frac_sig          = (sig_rows_in_dhs / total_rows_in_dhs) if (sig_rows_in_dhs is not None and total_rows_in_dhs>0) else None

    # ---- Per-DHS: unique SNPs per DHS identifier ----
    # Use unique SNPs to avoid counting duplicate variant-gene associations multiple times.
    snps_per_dhs = (df_in.dropna(subset=["identifier"])
                       .drop_duplicates(subset=["SNP","identifier"])
                       .groupby(["identifier","component"], as_index=False)["SNP"]
                       .nunique()
                       .rename(columns={"SNP":"n_unique_snps"}))

    # ---- Per-gene: unique DHS count per gene (if gene column exists) ----
    dhs_per_gene = None
    if "gene" in df_in.columns:
        dhs_per_gene = (df_in.dropna(subset=["gene","identifier"])
                           .drop_duplicates(subset=["gene","identifier"])
                           .groupby("gene", as_index=False)["identifier"]
                           .nunique()
                           .rename(columns={"identifier":"n_unique_dhs"}))

    # ---- Unique SNPs total in DHS (optional extra QC) ----
    n_unique_snps_in_dhs = int(df_in["SNP"].nunique())

    # ---- Save outputs ----
    base = os.path.join(results_dir, f"{name}")
    # summary
    summary_rows = [
        {"metric":"rows_within_DHS", "value": total_rows_in_dhs},
        {"metric":"unique_snps_within_DHS", "value": n_unique_snps_in_dhs},
    ]
    if sig_rows_in_dhs is not None:
        summary_rows += [
            {"metric":f"significant_rows_p<{PVAL_THRESH}", "value": sig_rows_in_dhs},
            {"metric":"fraction_significant_rows", "value": round(frac_sig, 6) if frac_sig is not None else None},
        ]
    pd.DataFrame(summary_rows).to_csv(base + ".summary.csv", index=False)

    # per DHS
    snps_per_dhs.to_csv(base + ".per_dhs_unique_snp_counts.csv", index=False)

    # per gene (when available)
    if dhs_per_gene is not None:
        dhs_per_gene.to_csv(base + ".per_gene_unique_dhs_counts.csv", index=False)

    # Return a compact dict for the combined overview
    return {
        "cohort": name,
        "rows_within_DHS": total_rows_in_dhs,
        "unique_snps_within_DHS": n_unique_snps_in_dhs,
        "significant_rows_p<{}".format(PVAL_THRESH): sig_rows_in_dhs,
        "fraction_significant_rows": (round(frac_sig, 6) if frac_sig is not None else None),
        "per_dhs_file": os.path.basename(base + ".per_dhs_unique_snp_counts.csv"),
        "per_gene_file": os.path.basename(base + ".per_gene_unique_dhs_counts.csv") if "gene" in (dhs_per_gene.columns if dhs_per_gene is not None else []) else None,
    }

# ---------- Run ----------
overview = []
for name, path in cohorts.items():
    print(f"[run] {name}")
    overview.append(stats_for_cohort(name, path))

overview_df = pd.DataFrame(overview)
overview_path = os.path.join(results_dir, "regulatory_variant_stats.overview.csv")
overview_df.to_csv(overview_path, index=False)
print(f"\n[done] Wrote overview → {overview_path}")
print("Per-cohort summaries & tables saved beside it.")



[info] RSID→DHS mappings loaded: 32,364,424
[run] outcome_osteo
[run] exposure_eqtlgen
[run] exposure_gtex_wb_eqtl
[run] exposure_decode_pqtl
[run] exposure_ukbppp_pqtl

[done] Wrote overview → /mnt/f/10_osteo_MR/_results_tables/regulatory_variant_stats.overview.csv
Per-cohort summaries & tables saved beside it.


**Step 2:** Load tabular data (summary stats / annotations).

In [2]:
# %% [markdown]
# Aggregate stats: mean ± s.e. (SNPs per DHS, DHS per gene) per cohort

import os, math, glob
from pathlib import Path
import pandas as pd
import numpy as np

results_dir = "/mnt/f/10_osteo_MR/_results_tables/"
Path(results_dir).mkdir(parents=True, exist_ok=True)

# Find per-cohort files produced earlier
per_dhs_files  = {Path(p).name.split(".per_dhs_unique_snp_counts.csv")[0]: p
                  for p in glob.glob(os.path.join(results_dir, "*.per_dhs_unique_snp_counts.csv"))}
per_gene_files = {Path(p).name.split(".per_gene_unique_dhs_counts.csv")[0]: p
                  for p in glob.glob(os.path.join(results_dir, "*.per_gene_unique_dhs_counts.csv"))}

cohorts = sorted(set(per_dhs_files) | set(per_gene_files))

rows = []
for cohort in cohorts:
    # --- SNPs per DHS ---
    n_dhs = mean_snps = se_snps = None
    if cohort in per_dhs_files:
        df_dhs = pd.read_csv(per_dhs_files[cohort])
        # Expect columns: identifier, component, n_unique_snps
        if "n_unique_snps" in df_dhs.columns and len(df_dhs) > 0:
            n_dhs   = int(len(df_dhs))
            mean_snps = float(df_dhs["n_unique_snps"].mean())
            se_snps   = float(df_dhs["n_unique_snps"].std(ddof=1) / math.sqrt(n_dhs)) if n_dhs > 1 else np.nan

    # --- DHS per gene ---
    n_genes = mean_dhs = se_dhs = None
    if cohort in per_gene_files:
        df_gene = pd.read_csv(per_gene_files[cohort])
        # Expect columns: gene, n_unique_dhs
        if "n_unique_dhs" in df_gene.columns and len(df_gene) > 0:
            n_genes = int(len(df_gene))
            mean_dhs = float(df_gene["n_unique_dhs"].mean())
            se_dhs   = float(df_gene["n_unique_dhs"].std(ddof=1) / math.sqrt(n_genes)) if n_genes > 1 else np.nan

    # Pretty “mean ± s.e.” strings (rounded)
    def fmt(mean, se, digits=3):
        if mean is None or (isinstance(mean, float) and np.isnan(mean)):
            return "NA"
        if se is None or (isinstance(se, float) and np.isnan(se)):
            return f"{mean:.{digits}f} ± NA"
        return f"{mean:.{digits}f} ± {se:.{digits}f}"

    rows.append({
        "cohort": cohort,
        "n_DHS": n_dhs,
        "mean_snps_per_DHS": None if mean_snps is None else round(mean_snps, 6),
        "se_snps_per_DHS": None if se_snps is None or np.isnan(se_snps) else round(se_snps, 6),
        "mean±se_snps_per_DHS": fmt(mean_snps, se_snps),

        "n_genes": n_genes,
        "mean_DHS_per_gene": None if mean_dhs is None else round(mean_dhs, 6),
        "se_DHS_per_gene": None if se_dhs is None or np.isnan(se_dhs) else round(se_dhs, 6),
        "mean±se_DHS_per_gene": fmt(mean_dhs, se_dhs),
    })

summary_df = pd.DataFrame(rows).sort_values("cohort")

out_path = os.path.join(results_dir, "regulatory_variant_stats.mean_se_by_cohort.csv")
summary_df.to_csv(out_path, index=False)

print(summary_df.to_string(index=False))
print(f"\nSaved → {out_path}")


               cohort  n_DHS  mean_snps_per_DHS  se_snps_per_DHS mean±se_snps_per_DHS  n_genes  mean_DHS_per_gene  se_DHS_per_gene mean±se_DHS_per_gene
 exposure_decode_pqtl  72005           1.475634         0.005277        1.476 ± 0.005   4055.0          75.265845         1.980471       75.266 ± 1.980
     exposure_eqtlgen 109650           1.392795         0.002752        1.393 ± 0.003  15833.0          23.756521         0.187132       23.757 ± 0.187
exposure_gtex_wb_eqtl  52538           1.395580         0.006022        1.396 ± 0.006  10881.0          10.869497         0.168233       10.869 ± 0.168
 exposure_ukbppp_pqtl  54158           1.468204         0.006670        1.468 ± 0.007   2493.0         107.072603         3.145632      107.073 ± 3.146
        outcome_osteo 292131           1.991781         0.002604        1.992 ± 0.003      NaN                NaN              NaN                   NA

Saved → /mnt/f/10_osteo_MR/_results_tables/regulatory_variant_stats.mean_se_by_cohort.c

**Step 3:** Load tabular data (summary stats / annotations).

In [4]:


# %% [markdown]
# Subset exposure SNPs to variants inside each row's gene body ±500 bp (RSID→coord + gene coords)

import os, re, glob, gzip
from pathlib import Path
import pandas as pd
import numpy as np

# ---------------- Paths ----------------
mapped_dir  = "/mnt/f/0.datasets/ens_vcf_dhs/"  # rsid_in_DHS_chr{n}_{lym|mye}.tsv (columns: rsid, POS, component, identifier)
gene_path   = "/mnt/f/10_osteo_MR/datasets/mart_export.txt"  # Ensembl Biomart export
cohorts = {
    "exposure_eqtlgen":       "/mnt/f/10_osteo_MR/MR_ready/exposure_eqtlgen_dhs_index.tsv",
    "exposure_gtex_wb_eqtl":  "/mnt/f/10_osteo_MR/MR_ready/exposure_gtex_whole_blood_eqtl_dhs_index.tsv",
    "exposure_decode_pqtl":   "/mnt/f/10_osteo_MR/MR_ready/exposure_pqtl_decode_MR_dhs.tsv.gz",
    "exposure_ukbppp_pqtl":   "/mnt/f/10_osteo_MR/MR_ready/exposure_ukbppp_pqtl_MR_dhs.tsv.gz",
}
out_dir = "/mnt/f/10_osteo_MR/_results_tables/exposures_within_gene_window"
Path(out_dir).mkdir(parents=True, exist_ok=True)

# ---------------- Parameters ----------------
UPSTREAM_BP   = 500000   # extend upstream
DOWNSTREAM_BP = 500000  # extend downstream
CHUNKSIZE = 1_000_000

# ---------------- 1) Build RSID → (CHR, POS) with Lymphoid precedence ----------------
rsid2coord = {}  # rsid -> (chr_str, pos_int)

# Helper: parse chr index from filename (supports numbers, X, Y, MT)
chr_pat = re.compile(r"rsid_in_DHS_chr([0-9XYMT]+)_(lym|mye)\.tsv$")

# First load mye, then override with lym (precedence)
for prio in ("mye", "lym"):
    for p in glob.glob(os.path.join(mapped_dir, f"rsid_in_DHS_chr*_{prio}.tsv")):
        m = chr_pat.search(os.path.basename(p))
        if not m: 
            continue
        chr_str = m.group(1)  # '1'..'22', or 'X','Y','MT'
        df = pd.read_csv(p, sep=r"\s+|,", engine="python",
                         usecols=["rsid","POS"], dtype={"rsid":"string","POS":"Int64"})
        df = df.dropna(subset=["rsid", "POS"])
        for r, pos in zip(df["rsid"].astype(str), df["POS"].astype(int)):
            # Lymphoid overrides myeloid if same RSID appears in both
            if prio == "lym" or r not in rsid2coord:
                rsid2coord[r] = (chr_str, int(pos))

print(f"[info] RSIDs with coordinates: {len(rsid2coord):,}")

# ---------------- 2) Load + collapse gene coordinates; build ±500bp windows ----------------
# Expect columns: Gene stable ID, Gene name, Chromosome/scaffold name, Gene start (bp), Gene end (bp)
genes = pd.read_csv(gene_path, sep="\t", dtype="string")
genes = genes.rename(columns={
    "Gene stable ID": "gene_id",
    "Gene name": "gene",
    "Chromosome/scaffold name": "chr",
    "Gene start (bp)": "start",
    "Gene end (bp)": "end",
})
# Clean/convert
genes["chr"]   = genes["chr"].astype(str).str.replace("^chr","",regex=True)  # normalize (e.g., '1','X','MT')
genes["start"] = pd.to_numeric(genes["start"], errors="coerce")
genes["end"]   = pd.to_numeric(genes["end"], errors="coerce")
genes = genes.dropna(subset=["gene","chr","start","end"])
genes["start"] = genes["start"].astype(int)
genes["end"]   = genes["end"].astype(int)
genes = genes[genes["end"] >= genes["start"]].copy()

# Some genes may appear multiple times (alt loci / transcripts).
# Rule: choose the span with the largest length per (gene, chr), then pick the chromosome with the largest span overall.
genes["span"] = genes["end"] - genes["start"]
largest_per_chr = (genes.sort_values("span", ascending=False)
                        .groupby(["gene","chr"], as_index=False)
                        .first())  # largest span per gene-chr
# Now pick the single best chr per gene (largest span)
best_per_gene = (largest_per_chr.sort_values("span", ascending=False)
                               .groupby("gene", as_index=False)
                               .first()[["gene","chr","start","end"]])

# Build window
best_per_gene["win_start"] = (best_per_gene["start"] - UPSTREAM_BP).clip(lower=1)
best_per_gene["win_end"]   = best_per_gene["end"] + DOWNSTREAM_BP

# index for fast lookups
gene2coord = {g: (c, int(s), int(e)) 
              for g, c, s, e in zip(best_per_gene["gene"], best_per_gene["chr"],
                                     best_per_gene["win_start"], best_per_gene["win_end"])}

print(f"[info] Genes with windows: {len(gene2coord):,}")

# ---------------- 3) Per-cohort filtering ----------------
def process_cohort(name: str, path: str):
    out_path = os.path.join(out_dir, f"{name}.within_gene±{UPSTREAM_BP}_{DOWNSTREAM_BP}.tsv.gz")
    # Columns we need
    usecols = None  # read all to preserve, but we access SNP + gene (+ pval) by name
    written = total = matched_gene = matched_rsid = 0
    first = True

    def row_mask(df):
        nonlocal matched_gene, matched_rsid
        # Normalize SNP & gene cols
        if "SNP" not in df.columns or "gene" not in df.columns:
            missing = [c for c in ("SNP","gene") if c not in df.columns]
            raise ValueError(f"{name}: missing columns: {missing}")
        snp = df["SNP"].astype("string")
        gen = df["gene"].astype("string")

        # Map rsid to coord
        coords = snp.map(rsid2coord.get)  # (chr, pos) or None
        has_rsid = coords.notna()
        matched_rsid += int(has_rsid.sum())

        # Split into two arrays for speed
        chr_arr = []
        pos_arr = []
        for t in coords.fillna("").tolist():
            if not t:
                chr_arr.append(None); pos_arr.append(None)
            else:
                c,p = t
                chr_arr.append(str(c)); pos_arr.append(int(p))
        chr_arr = pd.Series(chr_arr, index=df.index, dtype="object")
        pos_arr = pd.Series(pos_arr, index=df.index, dtype="float").astype("Int64")

        # Map gene to window
        gene_win = gen.map(gene2coord.get)  # (chr, wstart, wend) or None
        has_gene = gene_win.notna()
        matched_gene += int(has_gene.sum())

        gw_chr = []; gw_s = []; gw_e = []
        for t in gene_win.fillna("").tolist():
            if not t:
                gw_chr.append(None); gw_s.append(None); gw_e.append(None)
            else:
                c,s,e = t
                gw_chr.append(str(c)); gw_s.append(int(s)); gw_e.append(int(e))
        gw_chr = pd.Series(gw_chr, index=df.index, dtype="object")
        gw_s   = pd.Series(gw_s,   index=df.index, dtype="float").astype("Int64")
        gw_e   = pd.Series(gw_e,   index=df.index, dtype="float").astype("Int64")

        # Mask: rsid coord known AND gene window known AND chr match AND pos within [start,end]
        m = has_rsid & has_gene
        if m.any():
            m = m & (chr_arr == gw_chr) & (pos_arr >= gw_s) & (pos_arr <= gw_e)
        return m

    with gzip.open(out_path, "wt") as gzout:
        for chunk in pd.read_csv(path, sep="\t", compression="infer", chunksize=CHUNKSIZE, dtype="string", usecols=usecols):
            total += len(chunk)
            m = row_mask(chunk)
            sub = chunk.loc[m]
            if not sub.empty:
                sub.to_csv(gzout, sep="\t", index=False, header=first)
                first = False
                written += len(sub)

    return {
        "cohort": name,
        "input_rows": total,
        "rows_with_mapped_rsid": matched_rsid,
        "rows_with_mapped_gene": matched_gene,
        "rows_within_gene_window": written,
        "outfile": os.path.basename(out_path),
    }

overview = []
for name, path in cohorts.items():
    print(f"[run] {name}")
    res = process_cohort(name, path)
    overview.append(res)
    print(f"  -> kept {res['rows_within_gene_window']:,} / {res['input_rows']:,} rows")

ov = pd.DataFrame(overview)
ov_path = os.path.join(out_dir, "overview.within_gene±{}_{}.csv".format(UPSTREAM_BP, DOWNSTREAM_BP))
ov.to_csv(ov_path, index=False)
print("\n[done] Saved overview →", ov_path)
print("Outputs in:", out_dir)




[info] RSIDs with coordinates: 32,364,424
[info] Genes with windows: 41,164
[run] exposure_eqtlgen
  -> kept 333,980 / 487,985 rows
[run] exposure_gtex_wb_eqtl
  -> kept 113,984 / 186,199 rows
[run] exposure_decode_pqtl
  -> kept 55,844 / 675,513 rows
[run] exposure_ukbppp_pqtl
  -> kept 68,686 / 633,569 rows

[done] Saved overview → /mnt/f/10_osteo_MR/_results_tables/exposures_within_gene_window/overview.within_gene±500000_500000.csv
Outputs in: /mnt/f/10_osteo_MR/_results_tables/exposures_within_gene_window


**Step 4:** Load tabular data (summary stats / annotations).

In [10]:
# %% [markdown]
# SNPs within 500 kb of any TSS, INCLUDING DHS metadata (rsid, chr, pos, dhs_id, dhs_tissue_type)
# - Lymphoid precedence when an rsid maps to both lymphoid & myeloid DHS

import os, re, glob, gzip
from pathlib import Path
import pandas as pd
import numpy as np

# ---------- Paths ----------
mapped_dir    = "/mnt/f/0.datasets/ens_vcf_dhs/"    # rsid_in_DHS_chr{n}_{lym|mye}.tsv
tss_gene_path = "/mnt/f/10_osteo_MR/datasets/tss_mart_export.txt.gz"  # Biomart export
tss_out_dir   = "/mnt/f/0.datasets/ens_vsf_dhs_tss50k/"
Path(tss_out_dir).mkdir(parents=True, exist_ok=True)

# ---------- Parameters ----------
WINDOW = 50_000  # 500 kb

# ---------- Helpers ----------
def merge_intervals(starts: np.ndarray, ends: np.ndarray):
    if starts.size == 0:
        return starts, ends
    order = np.argsort(starts, kind="mergesort")
    s = starts[order].astype(np.int64, copy=False)
    e = ends[order].astype(np.int64, copy=False)
    ms, me = [], []
    cs, ce = int(s[0]), int(e[0])
    for i in range(1, s.size):
        si, ei = int(s[i]), int(e[i])
        if si <= ce + 1:
            if ei > ce:
                ce = ei
        else:
            ms.append(cs); me.append(ce)
            cs, ce = si, ei
    ms.append(cs); me.append(ce)
    return np.array(ms, dtype=np.int64), np.array(me, dtype=np.int64)

def positions_in_intervals(positions, starts, ends):
    if positions.size == 0 or starts.size == 0:
        return np.zeros(positions.shape[0], dtype=bool)
    idx = np.searchsorted(starts, positions, side="right") - 1
    idx = np.clip(idx, 0, ends.size - 1)
    return positions <= ends[idx]

# ---------- 1) Build RSID -> (chr, pos, dhs_id, component) with Lymphoid precedence ----------
# Load myeloid first, then override with lymphoid.
rsid2info = {}  # rsid -> (chr, pos, identifier, component)
chr_tag_re = re.compile(r"rsid_in_DHS_chr([0-9XYMT]+)_(lym|mye)\.tsv$")

for priority in ("mye", "lym"):
    for p in glob.glob(os.path.join(mapped_dir, "rsid_in_DHS_chr*_[lm]ye.tsv")):
        m = chr_tag_re.search(os.path.basename(p))
        if not m:
            continue
        chr_str, tag = m.group(1), m.group(2)
        if tag != priority:
            continue
        try:
            df = pd.read_csv(
                p, sep=r"\s+|,", engine="python",
                usecols=["rsid","POS","identifier","component"],
                dtype={"rsid":"string","POS":"Int64","identifier":"string","component":"string"},
            ).dropna(subset=["rsid","POS"])
            # If an rsid appears multiple times within the same file, keep the first
            df = df.drop_duplicates(subset=["rsid"], keep="first")
        except Exception as e:
            print(f"[WARN] Skipping {p}: {e}")
            continue
        for r, pos, ident, comp in zip(df["rsid"].astype(str),
                                       df["POS"].astype(int),
                                       df["identifier"].astype("string"),
                                       df["component"].astype("string")):
            if priority == "lym" or r not in rsid2info:
                rsid2info[r] = (str(chr_str), int(pos), (None if pd.isna(ident) else str(ident)),
                                (None if pd.isna(comp) else str(comp)))

print(f"[info] RSIDs with DHS metadata: {len(rsid2info):,}")

# Build per-chromosome aligned arrays for fast filtering
chr2arrays = {}
for rs, (c, p, ident, comp) in rsid2info.items():
    chr2arrays.setdefault(c, {"rsid": [], "pos": [], "ident": [], "comp": []})
    chr2arrays[c]["rsid"].append(rs)
    chr2arrays[c]["pos"].append(p)
    chr2arrays[c]["ident"].append(ident)
    chr2arrays[c]["comp"].append(comp)
for c, d in list(chr2arrays.items()):
    order = np.argsort(np.array(d["pos"], dtype=np.int64))
    chr2arrays[c] = {
        "rsid":  np.array(d["rsid"], dtype=object)[order],
        "pos":   np.array(d["pos"], dtype=np.int64)[order],
        "ident": np.array(d["ident"], dtype=object)[order],
        "comp":  np.array(d["comp"], dtype=object)[order],
    }

# ---------- 2) Load TSS and build merged ±500 kb windows per chromosome ----------
tss = pd.read_csv(tss_gene_path, sep="\t", compression="infer", dtype="string")
tss = tss.rename(columns={
    "Gene stable ID": "gene_id",
    "Gene name": "gene",
    "Chromosome/scaffold name": "chr",
    "Transcription start site (TSS)": "tss"
})
tss["chr"] = tss["chr"].astype(str).str.replace("^chr","",regex=True).replace({"M":"MT"})
tss["tss"] = pd.to_numeric(tss["tss"], errors="coerce")
tss = tss.dropna(subset=["chr","tss"])
tss["tss"] = tss["tss"].astype(np.int64)

valid_chr = {str(i) for i in range(1,23)} | {"X","Y","MT"}
tss = tss[tss["chr"].isin(valid_chr)].copy()

chr2intervals = {}
for c, sub in tss.groupby("chr", sort=False):
    starts = (sub["tss"].to_numpy(np.int64) - WINDOW).clip(min=1)
    ends   = sub["tss"].to_numpy(np.int64) + WINDOW
    ms, me = merge_intervals(starts, ends)
    chr2intervals[str(c)] = (ms, me)
print(f"[info] Chromosomes with TSS windows: {len(chr2intervals)}")

# ---------- 3) Select SNPs within any TSS window and WRITE with DHS metadata ----------
rows = []
for c, arrays in chr2arrays.items():
    if c not in chr2intervals:
        continue
    s, e = chr2intervals[c]
    pos = arrays["pos"]
    inside = positions_in_intervals(pos, s, e)
    if inside.any():
        idx = np.where(inside)[0]
        for i in idx:
            rows.append((
                arrays["rsid"][i],
                c,
                int(arrays["pos"][i]),
                arrays["ident"][i],
                arrays["comp"][i],
            ))

out_path = os.path.join(tss_out_dir, "rsids_in_DHS_within_TSS50kb.with_dhs.tsv.gz")
with gzip.open(out_path, "wt") as gz:
    gz.write("rsid\tchr\tpos\tdhs_id\tdhs_tissue_type\n")
    for r in rows:
        # r = (rsid, chr, pos, identifier, component)
        gz.write(f"{r[0]}\t{r[1]}\t{r[2]}\t{'' if r[3] is None else r[3]}\t{'' if r[4] is None else r[4]}\n")

print(f"[done] Wrote {len(rows):,} SNPs → {out_path}")



[info] RSIDs with DHS metadata: 13,010,252
[info] Chromosomes with TSS windows: 25
[done] Wrote 12,245,788 SNPs → /mnt/f/0.datasets/ens_vsf_dhs_tss50k/rsids_in_DHS_within_TSS50kb.with_dhs.tsv.gz


**Step 5:** Load tabular data (summary stats / annotations).

In [11]:


# %% [markdown]
# Filter exposures to SNPs within 50 kb of any TSS (RSID membership only)

import os, gzip
from pathlib import Path
import pandas as pd

# ---------- Inputs ----------
tss50k_path = "/mnt/f/0.datasets/ens_vsf_dhs_tss50k/rsids_in_DHS_within_TSS50kb.with_dhs.tsv.gz"
cohorts = {
    "exposure_eqtlgen":       "/mnt/f/10_osteo_MR/MR_ready/exposure_eqtlgen_dhs_index.tsv",
    "exposure_gtex_wb_eqtl":  "/mnt/f/10_osteo_MR/MR_ready/exposure_gtex_whole_blood_eqtl_dhs_index.tsv",
    "exposure_decode_pqtl":   "/mnt/f/10_osteo_MR/MR_ready/exposure_pqtl_decode_MR_dhs.tsv.gz",
    "exposure_ukbppp_pqtl":   "/mnt/f/10_osteo_MR/MR_ready/exposure_ukbppp_pqtl_MR_dhs.tsv.gz",
}

# ---------- Output ----------
out_dir = "/mnt/f/10_osteo_MR/_results_tables/exposures_in_TSS50kb"
Path(out_dir).mkdir(parents=True, exist_ok=True)

# ---------- Load RSIDs in TSS±50kb ----------
rsids_50k = pd.read_csv(
    tss50k_path, sep="\t", compression="infer",
    usecols=["rsid"], dtype={"rsid":"string"}
)["rsid"].dropna().astype(str).unique().tolist()
rsid_set = set(rsids_50k)
print(f"[info] RSIDs in TSS±50kb: {len(rsid_set):,}")

# ---------- Filter helper ----------
def filter_cohort(name: str, in_path: str, rsid_set: set, chunksize: int = 1_000_000):
    out_path = os.path.join(out_dir, f"{name}.within_TSS50kb.tsv.gz")
    total = kept = 0
    wrote_header = False

    with gzip.open(out_path, "wt") as gzout:
        for chunk in pd.read_csv(in_path, sep="\t", compression="infer", dtype="string", chunksize=chunksize):
            if "SNP" not in chunk.columns:
                raise ValueError(f"{name}: missing 'SNP' column.")
            total += len(chunk)
            mask = chunk["SNP"].astype("string").isin(rsid_set)
            sub = chunk.loc[mask]
            if not sub.empty:
                sub.to_csv(gzout, sep="\t", index=False, header=not wrote_header)
                wrote_header = True
                kept += len(sub)

    print(f"[{name}] kept {kept:,} / {total:,} rows → {out_path}")
    return {"cohort": name, "input_rows": total, "kept_rows": kept, "outfile": os.path.basename(out_path)}

# ---------- Run ----------
overview = [filter_cohort(n, p, rsid_set) for n, p in cohorts.items()]
ov = pd.DataFrame(overview)
ov_path = os.path.join(out_dir, "overview.filtered_in_TSS50kb.csv")
ov.to_csv(ov_path, index=False)
print("\nOverview saved →", ov_path)



[info] RSIDs in TSS±50kb: 12,245,788
[exposure_eqtlgen] kept 185,386 / 487,985 rows → /mnt/f/10_osteo_MR/_results_tables/exposures_in_TSS50kb/exposure_eqtlgen.within_TSS50kb.tsv.gz
[exposure_gtex_wb_eqtl] kept 62,492 / 186,199 rows → /mnt/f/10_osteo_MR/_results_tables/exposures_in_TSS50kb/exposure_gtex_wb_eqtl.within_TSS50kb.tsv.gz
[exposure_decode_pqtl] kept 147,494 / 675,513 rows → /mnt/f/10_osteo_MR/_results_tables/exposures_in_TSS50kb/exposure_decode_pqtl.within_TSS50kb.tsv.gz
[exposure_ukbppp_pqtl] kept 139,707 / 633,569 rows → /mnt/f/10_osteo_MR/_results_tables/exposures_in_TSS50kb/exposure_ukbppp_pqtl.within_TSS50kb.tsv.gz

Overview saved → /mnt/f/10_osteo_MR/_results_tables/exposures_in_TSS50kb/overview.filtered_in_TSS50kb.csv
