# NIPT的时点选择与胎儿的异常判定
核心目标（来自题文约定）：

1. 男胎：当 Y 染色体浓度 ≥ 4% 时认为“达标”，此时 NIPT 结果基本可靠；需尽量 早 达到达标
   * 12周内 → 低风险
   * 13–27周 → 高风险
   * ≥28周 → 极高风险

2. 女胎：无 Y 染色体，需判定 是否异常（以 21/18/13 号染色体“非整倍体 AB 列”为真值标签），综合利用 Z 值、GC 含量、读段数/比例、BMI 等信息构建判定方法。

In [22]:
import os, re, numpy as np, pandas as pd, matplotlib.pyplot as plt
from patsy import dmatrix
import statsmodels.formula.api as smf
import statsmodels.api as sm
from sklearn.tree import DecisionTreeRegressor
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold, cross_val_score, cross_val_predict
from sklearn.metrics import roc_curve, auc, classification_report, confusion_matrix, f1_score, roc_auc_score
import scienceplots
plt.style.use(['science', 'no-latex'])
# 设置字体
plt.rc('font', family= 'STIXGeneral', size= 12)  # 统一字体
plt.rc('axes', titlesize= 12, labelsize= 10)
plt.rc('legend', fontsize= 10)
plt.rc('xtick', labelsize= 10)
plt.rc('ytick', labelsize= 10) 

In [23]:
# ------------------------
# 0. 工具 & 读取/清洗
# ------------------------
def parse_week_str(s):
    """把 '11w+6' 等形式解析为 11+6/7；其他数字直接转浮点。"""
    if pd.isna(s): return np.nan
    txt2 = re.sub(r'[^\d\+]', '', str(s))   # 只保留数字与'+'，如 '11w+6' -> '11+6'
    m = re.search(r'(\d+)\+?(\d*)', txt2)
    if not m:
        try: return float(txt2)
        except: return np.nan
    w = int(m.group(1))
    d = int(m.group(2)) if m.group(2) != "" else 0
    return w + d/7.0

def find_col(df, keyword):
    cands = [c for c in df.columns if keyword in str(c)]
    return cands[0] if cands else None

def load_and_clean(xlsx_path, outdir):
    os.makedirs(outdir, exist_ok=True)

    # 优先尝试只读取两个目标 sheet（如果存在），否则读取所有 sheet
    try:
        sheets = pd.read_excel(xlsx_path, sheet_name=["男胎检测数据", "女胎检测数据"])
        if isinstance(sheets, pd.DataFrame):
            sheets = {"Sheet1": sheets}
    except Exception:
        sheets = pd.read_excel(xlsx_path, sheet_name=None)

    # 规范化每个 sheet 的列名并补齐缺失列（保证各表列一致，女表缺列用 NaN 补全）
    df_list = {}
    for name, df in (sheets.items() if isinstance(sheets, dict) else []):
        df = df.copy()
        df.columns = [str(c).strip() for c in df.columns]
        df_list[name] = df

    if isinstance(df_list, dict) and len(df_list) > 1:
        all_cols = set().union(*(set(df.columns) for df in df_list.values()))
        for name, df in df_list.items():
            for c in all_cols:
                if c not in df.columns:
                    df[c] = np.nan
            df = df[[c for c in all_cols]]
            df["_sheet"] = name
            df_list[name] = df
        df = pd.concat(list(df_list.values()), ignore_index=True)
    else:
        if isinstance(sheets, dict):
            df_all = []
            for name, df0 in sheets.items():
                df0 = df0.copy()
                df0.columns = [str(c).strip() for c in df0.columns]
                df0["_sheet"] = name
                df_all.append(df0)
            df = pd.concat(df_all, ignore_index=True)
        else:
            df = sheets.copy()
            df.columns = [str(c).strip() for c in df.columns]
            df["_sheet"] = "Sheet1"

    # 尝试定位每个目标列（若列名略有不同则通过关键字匹配）
    col_subject = find_col(df, "孕妇代码") or find_col(df, "subject") or find_col(df, "id")
    col_week    = find_col(df, "检测孕周") or find_col(df, "孕周") or find_col(df, "week")
    col_bmi     = find_col(df, "孕妇BMI") or find_col(df, "BMI")
    col_age     = find_col(df, "年龄") or find_col(df, "age")
    col_height  = find_col(df, "身高") or find_col(df, "height")
    col_weight  = find_col(df, "体重") or find_col(df, "weight")
    col_yconc   = find_col(df, "Y染色体浓度") or find_col(df, "Y浓度")
    col_yz      = find_col(df, "Y染色体的Z值") or find_col(df, "Y_Z")
    col_xz      = find_col(df, "X染色体的Z值") or find_col(df, "X_Z")
    col_xconc   = find_col(df, "X染色体浓度") or find_col(df, "X_conc")
    col_z13     = find_col(df, "13号染色体的Z值") or find_col(df, "Z13")
    col_z18     = find_col(df, "18号染色体的Z值") or find_col(df, "Z18")
    col_z21     = find_col(df, "21号染色体的Z值") or find_col(df, "Z21")
    col_gc_all  = find_col(df, "GC含量") or find_col(df, "GC_all")
    col_map     = find_col(df, "在参考基因组上比对的比例") or find_col(df, "map_ratio")
    col_rep     = find_col(df, "重复读段的比例") or find_col(df, "rep_ratio")
    col_reads_t = find_col(df, "原始读段数") or find_col(df, "reads_total")
    col_reads_u = find_col(df, "唯一比对的读段数") or find_col(df, "reads_unique")
    col_filt    = find_col(df, "被过滤掉读段数的比例") or find_col(df, "filter_ratio")
    col_ab      = find_col(df, "染色体的非整倍体") or find_col(df, "AB")
    col_health  = find_col(df, "胎儿是否健康") or find_col(df, "health")

    # 使用用户提供的 dfp 构建方式，兼容缺失列
    dfp = pd.DataFrame({
        "subject_id": df[col_subject] if col_subject in df.columns else pd.Series(df.index, index=df.index),
        "gest_week": df[col_week].apply(parse_week_str) if (col_week in df.columns) else pd.Series([np.nan]*len(df)),
        "BMI": pd.to_numeric(df[col_bmi], errors="coerce") if (col_bmi in df.columns) else pd.Series([np.nan]*len(df)),
        "age": pd.to_numeric(df[col_age], errors="coerce") if (col_age in df.columns) else pd.Series([np.nan]*len(df)),
        "height": pd.to_numeric(df[col_height], errors="coerce") if (col_height in df.columns) else pd.Series([np.nan]*len(df)),
        "weight": pd.to_numeric(df[col_weight], errors="coerce") if (col_weight in df.columns) else pd.Series([np.nan]*len(df)),
        "Y_conc": pd.to_numeric(df[col_yconc], errors="coerce") if (col_yconc in df.columns) else pd.Series([np.nan]*len(df)),
        "Y_Z": pd.to_numeric(df[col_yz], errors="coerce") if (col_yz in df.columns) else pd.Series([np.nan]*len(df)),
        "X_Z": pd.to_numeric(df[col_xz], errors="coerce") if (col_xz in df.columns) else pd.Series([np.nan]*len(df)),
        "X_conc": pd.to_numeric(df[col_xconc], errors="coerce") if (col_xconc in df.columns) else pd.Series([np.nan]*len(df)),
        "Z13": pd.to_numeric(df[col_z13], errors="coerce") if (col_z13 in df.columns) else pd.Series([np.nan]*len(df)),
        "Z18": pd.to_numeric(df[col_z18], errors="coerce") if (col_z18 in df.columns) else pd.Series([np.nan]*len(df)),
        "Z21": pd.to_numeric(df[col_z21], errors="coerce") if (col_z21 in df.columns) else pd.Series([np.nan]*len(df)),
        "GC_all": pd.to_numeric(df[col_gc_all], errors="coerce") if (col_gc_all in df.columns) else pd.Series([np.nan]*len(df)),
        "map_ratio": pd.to_numeric(df[col_map], errors="coerce") if (col_map in df.columns) else pd.Series([np.nan]*len(df)),
        "rep_ratio": pd.to_numeric(df[col_rep], errors="coerce") if (col_rep in df.columns) else pd.Series([np.nan]*len(df)),
        "reads_total": pd.to_numeric(df[col_reads_t], errors="coerce") if (col_reads_t in df.columns) else pd.Series([np.nan]*len(df)),
        "reads_unique": pd.to_numeric(df[col_reads_u], errors="coerce") if (col_reads_u in df.columns) else pd.Series([np.nan]*len(df)),
        "filter_ratio": pd.to_numeric(df[col_filt], errors="coerce") if (col_filt in df.columns) else pd.Series([np.nan]*len(df)),
        "AB": df[col_ab].astype(str).str.strip() if (col_ab in df.columns) else pd.Series([""]*len(df)),
        "health": df[col_health] if (col_health in df.columns) else pd.Series([np.nan]*len(df)),
        "_sheet": df["_sheet"],
    })

    # 兼容老逻辑：保证 subject_id 为字符串，gest_week 已解析为数值
    dfp["subject_id"] = dfp["subject_id"].astype(str)
    if "gest_week" in dfp:
        dfp["gest_week"] = pd.to_numeric(dfp["gest_week"], errors="coerce")

    # 性别：根据 sheet 判定
    dfp["sex"] = dfp["_sheet"].apply(lambda x: "male" if "男" in str(x) else "female")

    # 女胎异常标签（保留原 parse_ab 逻辑）
    def parse_ab_local(x):
        s = str(x).strip().lower()
        if s in ["", "nan", "none", "无异常", "正常"]:
            return 0
        if "异常" in s or "阳性" in s:
            return 1
        return 0
    dfp["abnormal"] = dfp["AB"].apply(parse_ab_local)

    # 质量控制（与原逻辑相同）
    dfp["QC_pass"] = True
    if "GC_all" in dfp: dfp.loc[~dfp["GC_all"].between(0.40,0.60), "QC_pass"] = False
    if "map_ratio" in dfp: dfp.loc[dfp["map_ratio"]<0.60, "QC_pass"] = False
    if "filter_ratio" in dfp: dfp.loc[dfp["filter_ratio"]>0.60, "QC_pass"] = False

    out_csv = os.path.join(outdir, "cleaned_data.csv")
    dfp.to_csv(out_csv, index=False, encoding="utf-8-sig")
    return dfp
# ------------------------

## 二、各子问题方案与结果

### 问题1：男胎 Y 浓度与孕周、BMI 关系

* **方法**：B 样条回归 + BMI 交互项
* **结果**：`Q1_ols_summary.txt`（系数检验）与 `Q1_pred_curves.png`（曲线图）

  * 不同 BMI 下的预测曲线
  * 4% 横线标注达标时间


In [24]:
# ------------------------
# 1. 问题1：Y~week+BMI（样条）
# ------------------------
def q1_model_and_plot(dfp, outdir, thr=0.04):
    male = dfp[(dfp["sex"]=="male") & dfp["gest_week"].notna() & dfp["BMI"].notna() & dfp["Y_conc"].notna()].copy()
    if len(male) < 30: return None
    male["BMI_c"] = male["BMI"] - male["BMI"].mean()
    des = dmatrix("bs(gest_week, df=4, include_intercept=False)", {"gest_week": male["gest_week"]}, return_type='dataframe')
    for i,c in enumerate(des.columns): male[f"s{i+1}"] = des[c].values
    fml = "Y_conc ~ s1+s2+s3+s4 + BMI_c + BMI_c:s1 + BMI_c:s2 + BMI_c:s3 + BMI_c:s4"
    fit = smf.ols(fml, data=male).fit(cov_type="HC3")
    with open(os.path.join(outdir, "Q1_ols_summary.txt"), "w", encoding="utf-8") as f:
        f.write(fit.summary().as_text())
    # 画预测曲线（%）
    weeks = np.linspace(max(10.0, male["gest_week"].min()), male["gest_week"].max(), 200)
    bmi_p = np.percentile(male["BMI"], [10,50,90])
    plt.figure()
    for b in bmi_p:
        dm = dmatrix("bs(w, df=4, include_intercept=False)", {"w": weeks}, return_type='dataframe')
        Xp = pd.DataFrame({
            "s1": dm.iloc[:,0].values, "s2": dm.iloc[:,1].values, "s3": dm.iloc[:,2].values, "s4": dm.iloc[:,3].values,
            "BMI_c": b - male["BMI"].mean()
        })
        for k in ["s1","s2","s3","s4"]:
            Xp[f"BMI_c:{k}"] = Xp["BMI_c"]*Xp[k]
        yhat = fit.predict(Xp)*100.0
        plt.plot(weeks, yhat, label=f"BMI≈{b:.1f}")
    plt.axhline(4.0, linestyle="--")
    plt.xlabel("Gestational age (weeks)")
    plt.ylabel("Predicted Y concentration (%)")
    plt.title("Y concentration vs gestational age and BMI (spline regression)")
    plt.legend()
    plt.savefig(os.path.join(outdir, "Q1_pred_curves.png"), bbox_inches="tight")
    plt.close()
    return fit

### 问题2：BMI 分组与推荐时点

* **步骤**：

  1. 逐孕妇确定最早达标孕周
  2. 回归树分组
  3. 每组取 95% 分位作为推荐时点
* **结果**：`Q2_BMI_bins_recommendations.csv`，包含：

  * BMI 区间、人数、推荐时点
  * 风险等级（低/高/极高）
  * 阈值敏感性（3.5%、4.5%）

---

In [25]:
# ------------------------
# 2. 问题2：BMI 分组 + 推荐时点（95%分位）+ 阈值敏感性
# ------------------------
def earliest_reach(group, thr=0.04):
    g = group.sort_values("gest_week")
    conc = pd.to_numeric(g["Y_conc"], errors="coerce")
    idx = np.where(conc >= thr)[0]
    return g.iloc[idx[0]]["gest_week"] if len(idx)>0 else np.nan

def q2_bmi_grouping(dfp, outdir, thr=0.04):
    male = dfp[(dfp["sex"]=="male") & dfp["gest_week"].notna() & dfp["BMI"].notna()].copy()
    reach = male.groupby("subject_id").apply(lambda g: earliest_reach(g, thr)).rename("earliest_week").reset_index()
    subj = male.drop_duplicates("subject_id")[["subject_id","BMI"]].merge(reach, on="subject_id", how="left")
    obs = subj.dropna(subset=["earliest_week"]).copy()
    if len(obs) < 30: return None
    tree = DecisionTreeRegressor(max_leaf_nodes=5, min_samples_leaf=10, random_state=42).fit(obs[["BMI"]].values, obs["earliest_week"].values)
    # 提取树阈值→BMI 分箱
    t = tree.tree_; thresholds=[]
    def walk(node):
        if t.feature[node] != -2:
            thresholds.append(t.threshold[node]); walk(t.children_left[node]); walk(t.children_right[node])
    walk(0)
    thr_sorted = sorted(set([x for x in thresholds if np.isfinite(x)]))
    bmin, bmax = float(obs["BMI"].min()), float(obs["BMI"].max())
    edges = [bmin-1e-6] + thr_sorted + [bmax+1e-6]
    bins = pd.IntervalIndex.from_tuples(list(zip(edges[:-1], edges[1:])), closed="right")
    obs["bmi_bin"] = pd.cut(obs["BMI"], bins)
    rows=[]
    for iv in bins:
        g = obs[obs["bmi_bin"]==iv]["earliest_week"].dropna()
        rec = float(np.percentile(g,95)) if len(g)>0 else np.nan
        risk = "低风险(≤12周)" if rec<=12 else ("高风险(13-27周)" if rec<28 else "极高风险(≥28周)")
        rows.append({"BMI区间": f"({iv.left:.1f}, {iv.right:.1f}]", "人数": int(len(g)), "推荐时点(周)": round(rec,2) if pd.notna(rec) else np.nan, "风险等级": risk})
    tab = pd.DataFrame(rows)
    # 阈值敏感性
    def recs_for(th):
        r2 = male.groupby("subject_id").apply(lambda g: earliest_reach(g, th)).rename("earliest_week")
        s2 = male.drop_duplicates("subject_id")[["subject_id","BMI"]].merge(r2, on="subject_id", how="left").dropna(subset=["earliest_week"]).copy()
        s2["bmi_bin"] = pd.cut(s2["BMI"], bins)
        out=[]
        for iv in bins:
            g = s2[s2["bmi_bin"]==iv]["earliest_week"].dropna()
            out.append(float(np.percentile(g,95)) if len(g)>0 else np.nan)
        return out
    tab["阈值3.5%推荐(周)"] = recs_for(0.035)
    tab["阈值4.5%推荐(周)"] = recs_for(0.045)
    tab.to_csv(os.path.join(outdir, "Q2_BMI_bins_recommendations.csv"), index=False, encoding="utf-8-sig")
    return tab

### 问题3：多因素逻辑回归与推荐时点

* **模型**：带正则的逻辑回归（class\_weight=balanced）
* **评估**：5 折 AUC
* **输出**：

  * `Q3_multifactor_bins.csv`：BMI 分组与推荐时点
  * `Q3_recommendation_CI.csv`：推荐时点的不确定区间（bootstrap）

---

In [26]:
# ------------------------
# 3. 问题3：多因素逻辑回归（正则） + BMI 依赖推荐时点 + bootstrap CI
# ------------------------
def q3_multifactor_logit(dfp, outdir, thr=0.04):
    male = dfp[(dfp["sex"]=="male") & dfp["gest_week"].notna()].copy()
    if len(male) < 80: return None
    male["hit"] = (pd.to_numeric(male["Y_conc"], errors="coerce") >= thr).astype(int)
    des = dmatrix("bs(gest_week, df=4, include_intercept=False)", {"gest_week": male["gest_week"]}, return_type='dataframe').values
    BMI_c = (male["BMI"] - male["BMI"].mean()).values.reshape(-1,1)
    X = np.column_stack([des, BMI_c, des*BMI_c,
                         (male["age"]-male["age"].mean()).values,
                         (male["height"]-male["height"].mean()).values,
                         (male["weight"]-male["weight"].mean()).values])
    y = male["hit"].values.astype(int)
    pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler()),
        ("clf", LogisticRegression(max_iter=4000, class_weight="balanced", solver="liblinear", C=1.0))
    ])
    # 5折 AUC
    y_score_cv = cross_val_predict(pipe, X, y, cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=42), method="predict_proba")[:,1]
    auc_cv = roc_auc_score(y, y_score_cv)
    # 拟合全量并生成 BMI→推荐孕周
    pipe.fit(X, y)
    weeks = np.linspace(9.5, 30.0, 410)
    dm = dmatrix("bs(w, df=4, include_intercept=False)", {"w": weeks}, return_type='dataframe').values
    def rec_week_for_bmi(bmi_val, p_target=0.95):
        BMI_c0 = bmi_val - male["BMI"].mean()
        Xb = np.column_stack([dm, np.full((len(weeks),1), BMI_c0), dm*BMI_c0, np.zeros((len(weeks),3))])
        p = pipe.predict_proba(Xb)[:,1]
        idx = np.where(p>=p_target)[0]
        return float(weeks[idx[0]]) if len(idx)>0 else np.nan
    bmi_grid = np.linspace(np.nanpercentile(male["BMI"],5), np.nanpercentile(male["BMI"],95), 25)
    recs = [rec_week_for_bmi(b) for b in bmi_grid]
    q3_recs = pd.DataFrame({"BMI": np.round(bmi_grid,2), "推荐时点P95(周)": np.round(recs,2)})
    # 以推荐时点的“阶跃变化”决定分组边界
    edges=[float(q3_recs["BMI"].min())]; last=None
    for _,r in q3_recs.iterrows():
        t=r["推荐时点P95(周)"]; 
        if pd.isna(t): continue
        if last is None: last=t; continue
        if abs(t-last)>0.5: edges.append(float(r["BMI"]))
        last=t
    edges.append(float(q3_recs["BMI"].max()))
    edges=sorted(set([round(e,1) for e in edges if np.isfinite(e)]))
    bins = pd.IntervalIndex.from_tuples(list(zip(edges[:-1], edges[1:])), closed="right")
    q3_recs["bin"]=pd.cut(q3_recs["BMI"], bins)
    group = q3_recs.groupby("bin", observed=True)["推荐时点P95(周)"].agg(["count","median","min","max"]).reset_index()
    group["BMI区间"]=group["bin"].apply(lambda iv:f"({iv.left:.1f}, {iv.right:.1f}]")
    group["推荐时点(周)"]=group["median"].round(2)
    group[["BMI区间","count","推荐时点(周)","min","max"]].to_csv(os.path.join(outdir, "Q3_multifactor_bins.csv"), index=False, encoding="utf-8-sig")
    return auc_cv

def q3_bootstrap_ci(dfp, outdir, thr=0.04, B=150):
    # 计算 BMI 10/50/90 分位对应的推荐时点区间
    male = dfp[(dfp["sex"]=="male") & dfp["gest_week"].notna()].copy()
    if len(male) < 50: return None
    male["hit"] = (pd.to_numeric(male["Y_conc"], errors="coerce") >= thr).astype(int)
    rng = np.random.default_rng(7)
    def once(data, bmi_val, p_target=0.95):
        from patsy import dmatrix
        from sklearn.pipeline import Pipeline
        from sklearn.impute import SimpleImputer
        from sklearn.preprocessing import StandardScaler
        from sklearn.linear_model import LogisticRegression
        des = dmatrix("bs(gest_week, df=4, include_intercept=False)", {"gest_week": data["gest_week"]}, return_type='dataframe').values
        BMI_c = (data["BMI"] - data["BMI"].mean()).values.reshape(-1,1)
        X = np.column_stack([des, BMI_c, des*BMI_c,
                             (data["age"]-data["age"].mean()).values,
                             (data["height"]-data["height"].mean()).values,
                             (data["weight"]-data["weight"].mean()).values])
        y = data["hit"].values.astype(int)
        pipe = Pipeline([("imp", SimpleImputer(strategy="median")), ("sc", StandardScaler()), ("clf", LogisticRegression(max_iter=4000, class_weight="balanced", solver="liblinear", C=1.0))])
        try: pipe.fit(X, y)
        except: return np.nan
        weeks = np.linspace(9.5, 30.0, 410)
        dm = dmatrix("bs(w, df=4, include_intercept=False)", {"w": weeks}, return_type='dataframe').values
        BMI_c0 = bmi_val - data["BMI"].mean()
        Xb = np.column_stack([dm, np.full((len(weeks),1), BMI_c0), dm*BMI_c0, np.zeros((len(weeks),3))])
        p = pipe.predict_proba(Xb)[:,1]
        idx = np.where(p>=p_target)[0]
        return float(weeks[idx[0]]) if len(idx)>0 else np.nan
    bmis = [np.nanpercentile(male["BMI"], p) for p in [10,50,90]]
    rows=[]
    for b in bmis:
        meds=[]
        for _ in range(B):
            idx = rng.choice(male.index.values, size=len(male), replace=True)
            d = male.loc[idx]
            val = once(d, b)
            if pd.notna(val): meds.append(val)
        if len(meds):
            rows.append({"BMI": round(b,1), "推荐时点中位数(周)": round(float(np.median(meds)),2),
                         "2.5%": round(float(np.percentile(meds,2.5)),2),
                         "97.5%": round(float(np.percentile(meds,97.5)),2)})
    if rows:
        pd.DataFrame(rows).to_csv(os.path.join(outdir, "Q3_recommendation_CI.csv"), index=False, encoding="utf-8-sig")

### 问题4：女胎异常判定

* **透明规则**：

  * 若 $\max(Z_{13},Z_{18},Z_{21}) ≥ z_0$，判异常
  * 最优阈值及对应 F1 输出到 `Q4_simple_rule.csv`
* **逻辑回归融合模型**：

  * 特征：Z 值 + GC 含量 + 读段质量 + BMI + 年龄
  * 输出：`Q4_eval.csv`，包含 ROC-AUC、最佳阈值、混淆矩阵

---

In [27]:
# ------------------------
# 4. 问题4：女胎异常判定（基线规则 + 逻辑回归）
# ------------------------
def q4_female_anomaly(dfp, outdir):
    female = dfp[dfp["sex"]=="female"].copy()

    # ---- 改进标签识别 ----
    def parse_ab(x):
        s = str(x).strip().lower()
        if s in ["", "nan", "none", "无异常", "正常"]:
            return 0
        if "异常" in s or "阳性" in s:
            return 1
        return 0
    female["abnormal"] = female["AB"].apply(parse_ab)

    # ---- 数据检查 ----
    if len(female) < 20:
        pd.DataFrame({"note": [f"Female samples too few: {len(female)}"]}) \
          .to_csv(os.path.join(outdir, "Q4_note.csv"), index=False, encoding="utf-8-sig")
        return None
    if female["abnormal"].nunique() < 2:
        pd.DataFrame({"note": ["No positive abnormal cases found in female samples"]}) \
          .to_csv(os.path.join(outdir, "Q4_note.csv"), index=False, encoding="utf-8-sig")
        return None

    # ---- 规则法 ----
    y = female["abnormal"].astype(int)
    zmax = female[["Z13","Z18","Z21"]].max(axis=1, skipna=True)
    best_f1, best_z = -1.0, 3.0
    for z0 in np.linspace(2.0, 5.0, 31):
        f1 = f1_score(y, (zmax >= z0).astype(int), zero_division=0)
        if f1 > best_f1:
            best_f1, best_z = f1, z0
    pd.DataFrame({
        "Rule": ["If max(Z13,Z18,Z21) ≥ threshold → Abnormal"],
        "Best_threshold": [round(best_z, 2)],
        "F1_score(train)": [round(best_f1, 3)]
    }).to_csv(os.path.join(outdir, "Q4_simple_rule.csv"), index=False, encoding="utf-8-sig")

    # ---- 逻辑回归法 ----
    X = female[["Z13","Z18","Z21","X_Z","X_conc","GC_all","reads_total",
                "reads_unique","map_ratio","rep_ratio","filter_ratio",
                "BMI","age"]]
    pipe = Pipeline([
        ("imp", SimpleImputer(strategy="median")),
        ("sc", StandardScaler()),
        ("clf", LogisticRegression(max_iter=2000, class_weight="balanced"))
    ])
    aucs = cross_val_score(pipe, X, y, cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
                           scoring="roc_auc")
    pipe.fit(X, y)
    y_score = pipe.predict_proba(X)[:,1]
    fpr, tpr, thr = roc_curve(y, y_score)
    J = tpr - fpr
    t_idx = np.argmax(J)
    best_thr = thr[t_idx]
    y_pred = (y_score >= best_thr).astype(int)
    cm = confusion_matrix(y, y_pred)

    pd.DataFrame({
        "metric": ["ROC-AUC(CV5-mean)", "Best_threshold", "TP", "FP", "FN", "TN"],
        "value": [round(float(np.mean(aucs)), 3),
                  round(float(best_thr), 3),
                  int(cm[1,1]), int(cm[0,1]), int(cm[1,0]), int(cm[0,0])]
    }).to_csv(os.path.join(outdir, "Q4_eval.csv"), index=False, encoding="utf-8-sig")


In [28]:
# ------------------------
# main
# ------------------------
def main():
    xlsx = "附件.xlsx"       # 放在同目录或给出绝对路径
    outdir = "nipt_outputs"
    dfp = load_and_clean(xlsx, outdir)
    #dfp.to_csv("dfp_all_data.csv", index=False, encoding="utf-8-sig")
    q1_model_and_plot(dfp, outdir, thr=0.04)
    q2_bmi_grouping(dfp, outdir, thr=0.04)
    auc_cv = q3_multifactor_logit(dfp, outdir, thr=0.04)
    q3_bootstrap_ci(dfp, outdir, thr=0.04, B=150)
    q4_female_anomaly(dfp, outdir)
    print("All done. Results in:", outdir, "AUC(Q3)=", auc_cv)

if __name__ == "__main__":
    main()


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["_sheet"] = name
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["_sheet"] = name
  reach = male.groupby("subject_id").apply(lambda g: earliest_reach(g, thr)).rename("earliest_week").reset_index()
  r2 = male.groupby("subject_id").apply(lambda g: earliest_reach(g, th)).rename("earliest_week")
  r2 = male.groupby("subject_id").apply(lambda g: earliest_reach(g, th)).rename("earliest_week")


All done. Results in: nipt_outputs AUC(Q3)= 0.6282044676701137
