In [2]:
import pandas as pd
import numpy as np

# ---------- Load data ----------
src_path = "G:\\25国赛\\C题\\PythonProject\\code2\\聚类.xlsx"
df_raw = pd.read_excel(src_path)

# Normalize column names
df = df_raw.rename(columns={"孕妇BMI": "BMI", "检测孕期": "first_time"}).copy()

# ---------- Detect unit of `first_time` and convert to weeks ----------
# Heuristic: if median > 40, assume days and convert to weeks
if pd.to_numeric(df["first_time"], errors="coerce").median() > 40:
    df["weeks"] = pd.to_numeric(df["first_time"], errors="coerce") / 7.0
else:
    df["weeks"] = pd.to_numeric(df["first_time"], errors="coerce")

# Keep plausible window (optional but robust)
df = df[(df["weeks"] >= 6) & (df["weeks"] <= 35)].copy()
df = df[["BMI","weeks"]].dropna().reset_index(drop=True)

# ---------- Build 5 data-driven BMI groups (quintiles) ----------
labels = [f"Q{i}" for i in range(1, 6)]
df["BMI组"], bmi_bins = pd.qcut(df["BMI"], q=5, retbins=True, labels=labels)

# Group summary
group_info_rows = []
for lab in labels:
    sub = df[df["BMI组"] == lab]
    group_info_rows.append({
        "BMI组": lab,
        "人数": int(len(sub)),
        "BMI范围": f"[{sub['BMI'].min():.4f}, {sub['BMI'].max():.4f}]",
        "BMI中位数": float(sub["BMI"].median()),
        "首次达标周(中位数)": float(sub["weeks"].median())
    })
group_info = pd.DataFrame(group_info_rows).sort_values("BMI中位数").reset_index(drop=True)

# ---------- Define risk function and compute optimal NIPT per group ----------
# Decision window
W_MIN, W_MAX = 10.0, 25.0

# Costs (tunable)
C_FAIL = 1.0     # penalty for failing (needing redraw) at the chosen week
C_WAIT = 0.04    # per week time cost within [W_MIN, W_MAX]
# If you want additional cost beyond 25 weeks, we could add `C_OVER`, but we restrict to <=25 here.

def empirical_success_prob(weeks_array, w):
    """P(success if test at week w) ≈ proportion whose first_success_week <= w"""
    if len(weeks_array) == 0:
        return np.nan
    return float(np.mean(weeks_array <= w))

def risk(w, weeks_array):
    """Risk = fail_cost * (1 - success_prob) + wait_cost * (w - W_MIN)"""
    p = empirical_success_prob(weeks_array, w)
    # If no data, return +inf to avoid selecting it
    if np.isnan(p):
        return np.inf
    return C_FAIL * (1.0 - p) + C_WAIT * max(0.0, w - W_MIN)

# Search grid
w_grid = np.round(np.linspace(W_MIN, W_MAX, 151), 2)  # 0.1-week resolution

opt_rows = []
for lab in labels:
    sub = df[df["BMI组"] == lab]
    wk_arr = sub["weeks"].to_numpy()
    risks = np.array([risk(w, wk_arr) for w in w_grid])
    idx = int(np.argmin(risks))
    w_star = float(w_grid[idx])
    p_star = empirical_success_prob(wk_arr, w_star)
    opt_rows.append({
        "BMI组": lab,
        "BMI范围": f"[{sub['BMI'].min():.4f}, {sub['BMI'].max():.4f}]",
        "人数": int(len(sub)),
        "推荐最优NIPT周数": round(w_star, 2),
        "该周达标概率估计": round(p_star, 3),
        "风险函数最小值": round(float(risks[idx]), 3),
        "参数(C_FAIL,C_WAIT,W_MIN,W_MAX)": f"({C_FAIL},{C_WAIT},{W_MIN},{W_MAX})"
    })
opt_table = pd.DataFrame(opt_rows).sort_values("推荐最优NIPT周数").reset_index(drop=True)

# ---------- Save results ----------
out_csv1 = "BMI_五分位分组信息.csv"
out_csv2 = "BMI组_最优NIPT周_与达标概率.csv"
out_xlsx = "问题二_分组与最优NIPT.xlsx"

group_info.to_csv(out_csv1, index=False)
opt_table.to_csv(out_csv2, index=False)