# Import libraries

In [None]:
import subprocess
from pathlib import Path
import os
import pandas as pd
import numpy as np
import pandas as pd
import tempfile
import shutil

In [1]:
def make_AFs(path_data, name_file_input, path_plink, path_output, threads=4):
    Path(path_output).mkdir(parents=True, exist_ok=True)
    for chrom in range(1, 23):
        name_file_output = f"chrom_{chrom}_AFs_{name_file_input}"
        cmd = [
            f"{Path(path_plink) / 'plink2'}",
            "--bfile", name_file_input,
            "--chr", str(chrom),
            "--freq",
            "--threads", str(threads),
            "--out", str(Path(path_output) / name_file_output),
        ]
        # run inside path_data so relative --bfile works
        subprocess.run(cmd, cwd=path_data, check=True, capture_output=True, text=True)


In [9]:

def concat_AFs(path_input, path_output):
    Path(path_output).mkdir(parents=True, exist_ok=True)
    af_files = [f for f in os.listdir(path_input) if f.endswith(".afreq")]
    chroms = sorted({f.split('_')[1] for f in af_files})  # 'chrom_1', etc.
    pops = sorted({f.split("_AFs_")[1].split(".")[0] for f in af_files})

    for chrom in chroms:
        dfs = []
        for pop in pops:
            p = Path(path_input) / f"chrom_{chrom}_AFs_{pop}.afreq"
            df = pd.read_csv(p, sep=r"\s+")
            # ALT_FREQS is AF for ALT; fold to MAF only at the end after weighting
            df = df.rename(columns={"ALT_FREQS": f"ALT_FREQS_{pop}", "OBS_CT": f"OBS_CT_{pop}"})
            dfs.append(df)

        base = dfs[0]
        for d in dfs[1:]:
            base = base.merge(d, on=["#CHROM","ID","REF","ALT","PROVISIONAL_REF?"], how="inner")

        # Weighted global ALT freq across populations
        total_alt = 0.0
        total_obs = 0.0
        for pop in pops:
            base[f"ALT_COUNT_{pop}"] = base[f"ALT_FREQS_{pop}"] * base[f"OBS_CT_{pop}"]
            total_alt += base[f"ALT_COUNT_{pop}"]
            total_obs += base[f"OBS_CT_{pop}"]
        base["TOTAL_ALT_FREQ"] = (total_alt / total_obs).astype("float32")

        # Convert to **MAF** in [0, 0.5]
        base["TOTAL_MAF"] = np.minimum(base["TOTAL_ALT_FREQ"], 1.0 - base["TOTAL_ALT_FREQ"]).astype("float32")

        base.to_pickle(Path(path_output) / f"global_AF_{chrom}.pkl")


In [19]:
import os, shutil, subprocess, tempfile
from pathlib import Path
import numpy as np, pandas as pd

PRIORITY = ["FID","IID","PAT","MAT","SEX","PHENOTYPE"]

def _resolve_plink2(path_plink: str) -> str:
    p = Path(path_plink)
    if p.is_file() and os.access(p, os.X_OK):
        return str(p)
    cand = p / "plink2"
    if cand.is_file() and os.access(cand, os.X_OK):
        return str(cand)
    which = shutil.which("plink2")
    if which: return which
    raise FileNotFoundError(f"plink2 not found at '{path_plink}' and not in PATH")

def _find_bfiles(path_dir: str):
    d = Path(path_dir)
    fams = {p.stem for p in d.glob("*.fam")}
    trios = [pref for pref in sorted(fams)
             if (d/f"{pref}.bed").exists() and (d/f"{pref}.bim").exists()]
    return trios

def divide_into_chunks(path_input_afs,
                       path_input_plink_bfiles,
                       path_plink,
                       path_output,
                       size_chunk=20_000,
                       min_maf=0.01,
                       threads=4,
                       chroms=None,
                       bfiles=None,      # <— NEW: explicitly choose, e.g. ["1000g"]
                       verbose=True):
    plink2_bin = _resolve_plink2(path_plink)
    out_root = Path(path_output); out_root.mkdir(parents=True, exist_ok=True)

    auto_bfiles = _find_bfiles(path_input_plink_bfiles)
    if bfiles is None:
        bfiles = auto_bfiles
    else:
        # keep only those that truly have .bed/.bim/.fam
        bfiles = [b for b in bfiles if b in auto_bfiles]

    if not bfiles:
        raise RuntimeError(f"No valid PLINK bfiles found. Present trios: {auto_bfiles}")

    if verbose:
        print(f"[divide_into_chunks] Using plink2: {plink2_bin}")
        print(f"[divide_into_chunks] Bfiles: {bfiles}")

    af_pkls_all = sorted(Path(path_input_afs).glob("global_AF_*.pkl"))
    if not af_pkls_all:
        raise RuntimeError(f"No AF files in {path_input_afs} (expected global_AF_chrom_*.pkl)")

    def _chr(p): return p.stem.split("_")[-1]
    if chroms is not None:
        keep = {str(c) for c in chroms}
        af_pkls = [p for p in af_pkls_all if _chr(p) in keep]
    else:
        af_pkls = af_pkls_all

    if verbose:
        print(f"[divide_into_chunks] Chromosomes: {[_chr(p) for p in af_pkls]}")

    for af_pkl in af_pkls:
        chrom = _chr(af_pkl)
        chrom_dir = out_root / f"chrom_{chrom}"; chrom_dir.mkdir(parents=True, exist_ok=True)

        AF = pd.read_pickle(af_pkl)
        AF = AF.sort_values("TOTAL_MAF")
        AF = AF.loc[AF["TOTAL_MAF"] >= float(min_maf)].reset_index(drop=True)
        if AF.empty:
            if verbose: print(f"[chr{chrom}] No SNPs ≥ MAF {min_maf}. Skipping.")
            continue

        snp_ids = AF["ID"].astype(str).to_numpy()
        n_chunks = max(1, int(np.ceil(len(snp_ids)/size_chunk)))
        chunks = np.array_split(snp_ids, n_chunks)
        if verbose:
            print(f"[chr{chrom}] {len(snp_ids):,} SNPs ≥ {min_maf}; {n_chunks} chunk(s) of ~{size_chunk}")

        for i, snp_list in enumerate(chunks, start=1):
            if len(snp_list)==0: continue
            sel = AF["ID"].isin(snp_list)
            maf_min = float(AF.loc[sel, "TOTAL_MAF"].min())
            maf_max = float(AF.loc[sel, "TOTAL_MAF"].max())
            if verbose and (i==1 or i%5==0 or i==len(chunks)):
                print(f"[chr{chrom}] chunk {i}/{len(chunks)} size={len(snp_list)} maf=[{maf_min:.4f},{maf_max:.4f}]")

            with tempfile.TemporaryDirectory() as td:
                snp_file = Path(td)/"extract.snps"
                snp_file.write_text("\n".join(snp_list))

                pop_frames = []
                for bfile in bfiles:
                    cmd = [
                        plink2_bin, "--bfile", bfile,
                        "--extract", str(snp_file),
                        "--recode", "A",
                        "--threads", str(threads),
                        "--out", "tmp_chunk",
                    ]
                    res = subprocess.run(cmd, cwd=path_input_plink_bfiles,
                                         capture_output=True, text=True)
                    if res.returncode != 0:
                        raise RuntimeError(
                            f"[chr{chrom}][{bfile}] PLINK2 failed (code {res.returncode}).\n"
                            f"STDERR (head):\n{res.stderr[:2000]}"
                        )

                    raw_path = Path(path_input_plink_bfiles)/"tmp_chunk.raw"
                    if not raw_path.exists():
                        raise RuntimeError(f"[chr{chrom}][{bfile}] Missing {raw_path}")

                    # Read .raw with stable dtypes for meta columns to avoid DtypeWarning
                    meta_dtype = {c: "string" for c in PRIORITY if c != "SEX"}
                    meta_dtype["SEX"] = "Int8"   # or "string" if you prefer
                    df = pd.read_csv(raw_path, sep=r"\s+", low_memory=False, dtype=meta_dtype)

                    cols = df.columns.tolist()
                    meta = [c for c in PRIORITY if c in cols]
                    gcols = [c for c in cols if c not in meta]

                    # 0/1/2/NaN -> -1/0/1/<NA>
                    df_geno = df[gcols].astype("Float32") - 1.0
                    df_geno = df_geno.astype("Int8")
                    df_final = pd.concat([df[meta], df_geno], axis=1)
                    pop_frames.append(df_final)

                    # cleanup temp PLINK outputs
                    for ext in [".raw", ".log", ".nosex"]:
                        p = Path(path_input_plink_bfiles)/f"tmp_chunk{ext}"
                        if p.exists(): p.unlink()

            combined = pd.concat(pop_frames, ignore_index=True)
            ordered = PRIORITY + [c for c in combined.columns if c not in PRIORITY]
            combined = combined[ordered]

            out_name = f"chunk_{i}_size_{len(snp_list)}_maf_{maf_min:.4f}-{maf_max:.4f}.parquet"
            combined.to_parquet(chrom_dir/out_name, index=False)


In [6]:
# Make sure your functions are imported or defined in this notebook/script
# (make_AFs, concat_AFs, divide_into_chunks)

path_data      = "../../data/1000G"           # where 1000g.bed/.bim/.fam live
name_file      = "1000g"                      # prefix (no extension)
path_plink     = "../../"               # directory containing plink2 binary
path_afs       = "../../data/1000G/afs"       # temporary allele freq outputs
path_afs_glob  = "../../data/1000G/global_af" # merged AFs
path_chunks    = "../../data/1000G/chunks"    # final chunks

Path(path_afs).mkdir(parents=True, exist_ok=True)
Path(path_afs_glob).mkdir(parents=True, exist_ok=True)
Path(path_chunks).mkdir(parents=True, exist_ok=True)


In [7]:
make_AFs(path_data, name_file, path_plink, path_afs, threads=8)

In [10]:
concat_AFs(path_afs, path_afs_glob)

In [20]:
path_data      = "../../data/1000G"            # has 1000g.{bed,bim,fam}
path_plink     = "../../plink2"                # binary in your tree
path_afs_glob  = "../../data/1000G/global_af"  # from concat_AFs
path_chunks    = "../../data/1000G/chunks"

divide_into_chunks(
    path_input_afs=path_afs_glob,
    path_input_plink_bfiles=path_data,
    path_plink=path_plink,
    path_output=path_chunks,
    size_chunk=20_000,
    min_maf=0.01,
    threads=8,
    chroms=["1"],          # start with chr1 to verify
    bfiles=["1000g"],      # <— explicitly avoid 'new'
    verbose=True
)


[divide_into_chunks] Using plink2: ../../plink2
[divide_into_chunks] Bfiles: ['1000g']
[divide_into_chunks] Chromosomes: ['1']
[chr1] 948,646 SNPs ≥ 0.01; 48 chunk(s) of ~20000
[chr1] chunk 1/48 size=19764 maf=[0.0100,0.0104]


KeyboardInterrupt: 

In [None]:
import pandas as pd

chunk = pd.read_parquet("../../data/1000G/chunks/chrom_1/chunk_1_size_20000_maf_0.0100-0.0135.parquet")
print(chunk.shape)
print(chunk.iloc[:5, :10])   # peek
