In [18]:
# -*- coding: utf-8 -*-
"""
2025高教社杯数学建模竞赛 C题 代码实现
作者：Qwen
日期：2025年9月6日
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体（可选）
import matplotlib

fm = matplotlib.font_manager.fontManager
fm.addfont("./仿宋_GB2312.TTF")
fm.addfont("./times.ttf")
print(fm)
# 设置中文字体和负号正常显示
plt.rcParams["font.sans-serif"] = ["FangSong_GB2312", "times"]
plt.rcParams["axes.unicode_minus"] = False

# ========================
# 1. 读取并合并数据
# ========================

# 读取男胎数据（多个sheet或重复表头）
male_files = ['附件 - 男胎检测数据.xlsx'] * 3  # 实际上文件被分成了多个sheet-like块
male_dfs = []
for file in male_files:
    df = pd.read_excel(file, engine='openpyxl', header=None)
    # 找到真正的表头行
    header_row = df[df.eq('序号').any(axis=1)].index[0]
    df = pd.read_excel(file, engine='openpyxl', header=header_row)
    male_dfs.append(df)

# 合并所有男胎数据
male_df = pd.concat(male_dfs, ignore_index=True)
male_df.drop_duplicates(subset=['序号'], keep='first', inplace=True)

# 读取女胎数据
female_files = ['附件 - 女胎检测数据.xlsx'] * 3
female_dfs = []
for file in female_files:
    df = pd.read_excel(file, engine='openpyxl', header=None)
    header_row = df[df.eq('序号').any(axis=1)].index[0]
    df = pd.read_excel(file, engine='openpyxl', header=header_row)
    female_dfs.append(df)

female_df = pd.concat(female_dfs, ignore_index=True)
female_df.drop_duplicates(subset=['序号'], keep='first', inplace=True)

print("男胎数据形状:", male_df.shape)
print("女胎数据形状:", female_df.shape)

# 添加性别标签
male_df['性别'] = '男胎'
female_df['性别'] = '女胎'

# 合并所有数据
data = pd.concat([male_df, female_df], ignore_index=True)

# ========================
# 2. 数据清洗与特征工程
# ========================

# 处理孕周列：如 "12w+3" → 12 + 3/7 ≈ 12.43
# def parse_gestational_week(week_str):
#     if pd.isna(week_str):
#         return np.nan
#     try:
#         if 'w+' in week_str:
#             w, d = week_str.split('w+')
#             return float(w) + float(d)/7
#         elif 'W+' in week_str:
#             w, d = week_str.split('W+')
#             return float(w) + float(d)/7
#         elif 'w' in week_str:
#             return float(week_str.replace('w', ''))
#         else:
#             return float(week_str)
#     except:
#         return np.nan

data['孕周_数值'] = data['检测孕周'].apply(lambda x:(int(x[::-1][1:][::-1])*7 if len(x.split('+'))==1 else int(x.split('+')[0][::-1][1:][::-1])*7+int(x.split('+')[1]))/7)

# # 处理末次月经和检测日期（统一为datetime）
# def parse_date(date_str):
#     if pd.isna(date_str):
#         return np.nan
#     try:
#         if isinstance(date_str, str):
#             return pd.to_datetime(date_str)
#         elif isinstance(date_str, (int, float)):
#             # Excel日期格式转换（从1900年开始）
#             return pd.to_datetime('1900-01-01') + pd.Timedelta(days=int(date_str)-2)
#     except:
#         return np.nan
data.dropna(subset=['末次月经'],inplace=True)
def formate(x):
    print(type(x))
    if isinstance(x,str):
        print(len(x.strip()))
    return pd.Timestamp(x)

data['末次月经_日期'] = data['末次月经'].apply(formate)
data['检测日期_日期'] = data['检测日期'].apply(lambda x:pd.Timestamp(year=int(x/10000),month=int(int(x/100)%100),day=int(x%100)) if isinstance(x,int) else x)



# 计算实际孕周（天数差）
# data['实际孕周_天'] = (data['检测日期_日期'] - data['末次月经_日期']).dt.days
data['实际孕周_天'] = data[['末次月经_日期','检测日期_日期']].apply(lambda x:np.abs((x['检测日期_日期']-x['末次月经_日期']).days))

data['实际孕周_周'] = data['实际孕周_天'].apply(lambda x:x/7.0)

# 使用“检测孕周”为主，校验一致性
data['孕周'] = data['孕周_数值'].fillna(data['实际孕周_周'])

# 清理 BMI
data['BMI'] = pd.to_numeric(data['孕妇BMI'], errors='coerce')

# 提取男胎数据（用于问题1-3）
male_data = data[data['性别'] == '男胎'].copy()
male_data = male_data.dropna(subset=['Y染色体浓度', '孕周', 'BMI'])

# 转换 Y染色体浓度 为数值
male_data['Y染色体浓度'] = pd.to_numeric(male_data['Y染色体浓度'], errors='coerce')

# 过滤异常值
male_data = male_data[(male_data['Y染色体浓度'] >= 0) & (male_data['Y染色体浓度'] <= 0.2)]  # 浓度≤20%
male_data = male_data[(male_data['孕周'] >= 10) & (male_data['孕周'] <= 25)]
male_data = male_data[male_data['BMI'] >= 20]

print("清洗后男胎数据量:", len(male_data))

<matplotlib.font_manager.FontManager object at 0x78e4b273acf0>
男胎数据形状: (1082, 31)
女胎数据形状: (605, 31)
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<class 'datetime.datetime'>
<cla

DateParseError: Unable to parse datetime string:  

In [None]:
# ========================
# 问题1：Y染色体浓度 ~ 孕周 + BMI
# ========================
from scipy import stats
import statsmodels.api as sm
import statsmodels.formula.api as smf

# 对数变换Y浓度（更符合正态）
male_data['log_Y'] = np.log1p(male_data['Y染色体浓度'] * 100)  # ×100避免太小

# 方法1：多元线性回归
model1 = smf.ols('log_Y ~ 孕周 + BMI + 孕周:BMI', data=male_data).fit()
print("\n=== 问题1：多元线性回归结果 ===")
print(model1.summary())

# 方法2：广义可加模型（GAM）——允许非线性
from pygam import LinearGAM, s, f

gam = LinearGAM(s('孕周') + s('BMI'), fit_intercept=True)
# gam = LinearGAM(s(0, name='孕周') + s(1, name='BMI'), fit_intercept=True)
X_gam = male_data[['孕周', 'BMI']].values
y_gam = male_data['Y染色体浓度'].values
gam.fit(X_gam, y_gam)

print("\n=== GAM 模型解释方差 (R²):", gam.score(X_gam, y_gam))

# 可视化
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# 散点图：Y浓度 vs 孕周，按BMI分色
sns.scatterplot(data=male_data, x='孕周', y='Y染色体浓度', hue='BMI', palette='viridis', ax=axes[0])
axes[0].set_title('Y染色体浓度 vs 孕周（按BMI着色）')
axes[0].set_ylabel('Y染色体浓度')

# 孕周与log_Y回归线
axes[1].scatter(male_data['孕周'], male_data['log_Y'], alpha=0.6)
axes[1].plot(male_data['孕周'], model1.fittedvalues, color='red', label='OLS拟合')
axes[1].set_xlabel('孕周')
axes[1].set_ylabel('log(Y浓度)')
axes[1].set_title('线性回归拟合')
axes[1].legend()

# GAM 预测曲面（简化为2D）
XX = gam.generate_X_grid()
axes[2].plot(XX[:, 0], gam.predict(XX), color='blue', label='GAM预测（孕周主效应）')
axes[2].scatter(male_data['孕周'], gam.predict(X_gam), alpha=0.4, color='gray')
axes[2].set_xlabel('孕周')
axes[2].set_ylabel('预测 log(Y浓度)')
axes[2].set_title('GAM 拟合曲线')
axes[2].legend()

plt.tight_layout()
plt.show()

# 相关性矩阵
corr_cols = ['Y染色体浓度', '孕周', 'BMI']
corr = male_data[corr_cols].corr()
sns.heatmap(corr, annot=True, cmap='coolwarm', center=0)
plt.title('Y浓度、孕周、BMI相关性热力图')
plt.show()

NameError: name 'male_data' is not defined

In [None]:
# ========================
# 问题2：BMI分组 + 最佳时点
# ========================
from lifelines import KaplanMeierFitter
from lifelines.utils import concordance_index

# 步骤1：为每个孕妇找首次Y≥4%的孕周
male_grouped = male_data.groupby('孕妇代码')

达标_events = []

for code, group in male_grouped:
    group = group.sort_values('孕周')
    y_conc_percent = group['Y染色体浓度'] * 100  # 转为百分比
    达标_time = np.nan
    event = 0  # 1=事件发生（达标），0=删失
    for idx, row in group.iterrows():
        if row['Y染色体浓度'] * 100 >= 4:
            达标_time = row['孕周']
            event = 1
            break
    if np.isnan(达标_time):
        达标_time = group['孕周'].max()  # 删失
        event = 0
    bmi = group['BMI'].iloc[0]
    达标_events.append({'孕妇代码': code, '达标时间': 达标_time, '事件': event, 'BMI': bmi})

df_surv = pd.DataFrame(达标_events)

# BMI分组（基于临床经验）
df_surv['BMI组'] = pd.cut(df_surv['BMI'], bins=[20, 28, 32, 36, 40, 100], 
                         labels=['[20,28)', '[28,32)', '[32,36)', '[36,40)', '≥40'])

# Kaplan-Meier 生存分析（此处“生存”=未达标）
kmf = KaplanMeierFitter()
fig, ax = plt.subplots(figsize=(10, 6))

for name, group in df_surv.groupby('BMI组'):
    T = group['达标时间']
    E = group['事件']
    kmf.fit(T, E, label=name)
    kmf.plot_survival_function(ax=ax, ci_show=True)

ax.set_title('不同BMI组Y染色体浓度≥4%的累积达标概率')
ax.set_xlabel('孕周')
ax.set_ylabel('达标概率')
ax.grid(True)
plt.legend()
plt.show()

# 计算每组P90达标时间
recommendations = []
for name, group in df_surv.groupby('BMI组'):
    T = group['达标时间']
    E = group['事件']
    kmf.fit(T, E)
    p90_time = kmf.percentile(0.9)  # 90%达标所需时间
    optimal_time = min(p90_time, 12)  # 不超过12周
    recommendations.append({
        'BMI组': name,
        'P90达标时间': round(p90_time, 2),
        '推荐检测时间': round(optimal_time, 2)
    })

rec_df = pd.DataFrame(recommendations)
print("\n=== 问题2：各BMI组推荐检测时间 ===")
print(rec_df)

# 检测误差影响：模拟±0.5%误差
def simulate_error_impact(df, error_std=0.005):
    df_copy = df.copy()
    np.random.seed(42)
    df_copy['Y染色体浓度_扰动'] = df_copy['Y染色体浓度'] + np.random.normal(0, error_std, len(df_copy))
    df_copy['Y染色体浓度_扰动'] = df_copy['Y染色体浓度_扰动'].clip(lower=0)
    
    # 重新计算达标时间
    new_events = []
    for code, group in df_copy.groupby('孕妇代码'):
        group = group.sort_values('孕周')
        for idx, row in group.iterrows():
            if row['Y染色体浓度_扰动'] * 100 >= 4:
                new_events.append({'孕妇代码': code, '新达标时间': row['孕周']})
                break
    return pd.DataFrame(new_events)

# 可选：运行误差模拟
# error_df = simulate_error_impact(male_data)

In [2]:
# ========================
# 问题3：多因素 + 机器学习预测
# ========================
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# 构造特征：每个孕妇取第一次检测数据（或平均）
first_test = male_data.sort_values('孕周').groupby('孕妇代码').first().reset_index()

features = ['BMI', '年龄', '身高', '体重', '孕周']
X = first_test[features]
y = first_test['Y染色体浓度'] * 100  # 百分比

# 分训练测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 随机森林回归
rf = RandomForestRegressor(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)

print(f"\n=== 问题3：随机森林预测Y浓度（测试集RMSE）: {np.sqrt(mean_squared_error(y_test, y_pred)):.3f}")

# 特征重要性
importances = rf.feature_importances_
feat_importance = pd.DataFrame({'feature': features, 'importance': importances}).sort_values('importance', ascending=False)
print("\n特征重要性:")
print(feat_importance)

# 聚类分组（K-means）
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_scaled = scaler.fit_transform(first_test[features])
kmeans = KMeans(n_clusters=3, random_state=42)
first_test['cluster'] = kmeans.fit_predict(X_scaled)

# 每组计算推荐时间（同问题2逻辑）
cluster_recs = []
for cid, group in first_test.groupby('cluster'):
    # 找该组所有检测记录中首次达标时间
    codes = group['孕妇代码'].tolist()
    sub_data = male_data[male_data['孕妇代码'].isin(codes)]
    sub_grouped = sub_data.groupby('孕妇代码')
    events = []
    for code, g in sub_grouped:
        g = g.sort_values('孕周')
        for idx, row in g.iterrows():
            if row['Y染色体浓度'] * 100 >= 4:
                events.append({'time': row['孕周'], 'event': 1})
                break
        else:
            events.append({'time': g['孕周'].max(), 'event': 0})
    df_ev = pd.DataFrame(events)
    if len(df_ev) == 0: continue
    kmf.fit(df_ev['time'], df_ev['event'])
    p90 = kmf.percentile(0.9)
    optimal = min(p90, 12)
    cluster_recs.append({'cluster': cid, 'P90': round(p90, 2), '推荐时间': round(optimal, 2)})

print("\n=== 问题3：聚类分组推荐时间 ===")
print(pd.DataFrame(cluster_recs))

NameError: name 'male_data' is not defined

In [None]:
# ========================
# 问题4：女胎异常判定
# ========================
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, roc_curve
from sklearn.model_selection import cross_val_score

# 准备女胎数据
female_data = data[data['性别'] == '女胎'].copy()
female_data['label'] = (female_data['胎儿是否健康'] == '否').astype(int)  # 1=异常

# 特征选择
z_features = ['21号染色体的Z值', '18号染色体的Z值', '13号染色体的Z值', 'X染色体的Z值']
other_features = ['GC含量', '原始读段数', '唯一比对的读段数', '被过滤掉读段数的比例', 'BMI']
features_f = z_features + other_features

female_data = female_data.dropna(subset=features_f + ['label'])

X_f = female_data[features_f]
y_f = female_data['label']

# 标准化
X_f_scaled = StandardScaler().fit_transform(X_f)

# 逻辑回归 + 随机森林
lr = LogisticRegression()
rf_clf = RandomForestClassifier()

# 交叉验证评估
lr_scores = cross_val_score(lr, X_f_scaled, y_f, cv=5, scoring='f1')
rf_scores = cross_val_score(rf_clf, X_f_scaled, y_f, cv=5, scoring='f1')

print(f"\n=== 问题4：女胎异常判定模型性能（F1-score）===")
print(f"逻辑回归: {lr_scores.mean():.3f} ± {lr_scores.std():.3f}")
print(f"随机森林: {rf_scores.mean():.3f} ± {rf_scores.std():.3f}")

# 使用随机森林训练最终模型
rf_clf.fit(X_f_scaled, y_f)
fpr, tpr, thr = roc_curve(y_f, rf_clf.predict_proba(X_f_scaled)[:,1])
auc = roc_auc_score(y_f, rf_clf.predict_proba(X_f_scaled)[:,1])

plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC={auc:.3f})')
plt.plot([0,1],[0,1],'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('女胎异常判定模型 ROC 曲线')
plt.legend()
plt.grid(True)
plt.show()

# 特征重要性
importance_df = pd.DataFrame({
    'feature': features_f,
    'importance': rf_clf.feature_importances_
}).sort_values('importance', ascending=False)

print("\n女胎判定特征重要性:")
print(importance_df)

# 输出判定规则示例
print("\n=== 示例：新样本判定 ===")
example = X_f_scaled[0:1]
prob = rf_clf.predict_proba(example)[0,1]
print(f"预测异常概率: {prob:.3f}, 判定: {'异常' if prob > 0.5 else '正常'}")