In [4]:
import pandas as pd
import numpy as np

# ---------------------------
# 数值字段清洗函数
# ---------------------------
def clean_numeric(val, lower, upper):
    try:
        val = float(val)
        if val < lower or val > upper:
            return np.nan
        return val
    except:
        return np.nan

# ---------------------------
# 主诉字段标准化函数
# ---------------------------
def standardize_complaints(df):
    df["chiefcomplaint_clean"] = (
        df["chiefcomplaint"]
        .astype(str)
        .str.lower()
        .str.strip()
        .str.replace(r"\s+", " ", regex=True)
    )
    return df

# ---------------------------
# 每行字段清洗函数
# ---------------------------
def clean_row(row):
    row["age_at_visit"] = clean_numeric(row["age_at_visit"], 0, 120)
    row["pain"]         = clean_numeric(row["pain"], 0, 10)
    row["temperature"]  = clean_numeric(row["temperature"], 90, 110)   # 华氏度
    row["heartrate"]    = clean_numeric(row["heartrate"], 30, 220)
    row["sbp"]          = clean_numeric(row["sbp"], 50, 250)
    row["dbp"]          = clean_numeric(row["dbp"], 30, 150)
    row["o2sat"]        = clean_numeric(row["o2sat"], 50, 100)

    # 性别字段标准化
    try:
        gender = str(row["gender"]).strip().upper()
        row["gender"] = gender if gender in {"M", "F"} else np.nan
    except:
        row["gender"] = np.nan

    # 主诉空值统一处理
    try:
        cc = str(row["chiefcomplaint"]).strip().lower()
        if cc in {"", "unknown", "none", "n/a", "nan"}:
            row["chiefcomplaint"] = np.nan
    except:
        row["chiefcomplaint"] = np.nan

    # 到院方式规范化（可自定义更多规则）
    try:
        t = str(row["arrival_transport"]).strip().lower()
        transport_map = {
            "ambulance": "ambulance",
            "walk in": "walk-in",
            "walk-in": "walk-in",
            "wheelchair": "wheelchair",
            "private car": "private",
            "unknown": "unknown"
        }
        row["arrival_transport"] = transport_map.get(t, t)
    except:
        row["arrival_transport"] = "unknown"

    return row

# ---------------------------
# Step 1: 加载原始数据
# ---------------------------
triage   = pd.read_csv("./raw/triage.csv")     
edstays  = pd.read_csv("./raw/edstays.csv")    
patients = pd.read_csv("./raw/patients.csv") 

# ---------------------------
# Step 2: 时间处理 + 年份提取
# ---------------------------
edstays["intime"] = pd.to_datetime(edstays["intime"], errors="coerce")
edstays["admit_year"] = edstays["intime"].dt.year

# ---------------------------
# Step 3: 合并字段
# ---------------------------
edstays_sub = edstays[[
    "stay_id", "subject_id", "admit_year", "intime", "outtime",
    "arrival_transport", "disposition"
]]

patients_sub = patients[["subject_id", "anchor_age", "anchor_year", "gender"]]

triage = triage.merge(edstays_sub, on=["stay_id", "subject_id"], how="left")
triage = triage.merge(patients_sub, on="subject_id", how="left")

# ---------------------------
# Step 4: 计算就诊年龄
# ---------------------------
triage["age_at_visit"] = (
    triage["anchor_age"] + (triage["admit_year"] - triage["anchor_year"])
)

# ---------------------------
# Step 5: 字段级清洗 + 主诉清洗
# ---------------------------
triage = triage.apply(clean_row, axis=1)
triage = standardize_complaints(triage)

# ---------------------------
# Step 6: 最终字段选择 + 丢弃关键缺失值
# ---------------------------
final_cols = [
    "subject_id", "age_at_visit", "gender", "pain", "temperature", "heartrate",
    "sbp", "dbp", "o2sat", "chiefcomplaint", "chiefcomplaint_clean", "acuity",
    "intime", "outtime", "arrival_transport", "disposition"
]

required_fields = [
    "age_at_visit", "gender", "heartrate", "sbp", "dbp", "o2sat", "acuity"
]

final_data = triage[final_cols].drop_duplicates()
final_data = final_data.dropna(subset=required_fields)

# ---------------------------
# Step 7: 保存清洗后数据
# ---------------------------
final_data.to_csv("./processed_data/triage_final.csv", index=False)
print(f"✅ 清洗完成：已保存 {len(final_data)} 条 triage 记录到 data/processed/triage_final.csv")


✅ 清洗完成：已保存 400058 条 triage 记录到 data/processed/triage_final.csv
