In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 设置可视化风格
plt.style.use('tableau-colorblind10')
# 设置字体为SimHei(黑体)
plt.rcParams['font.sans-serif'] = ['SimHei']
# 解决中文字体下坐标轴负数的负号显示问题
plt.rcParams['axes.unicode_minus'] = False


### 筛选出与肾衰竭有关的代号

In [9]:
import psycopg2
#建立数据库连接
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")
#调用游标对象
cur = con.cursor()

# cur.execute('''SELECT DISTINCT d.row_id, d.subject_id, d.hadm_id, d.seq_num, d.icd9_code, lab.value, lab.valueuom, lab.charttime
# FROM diagnoses_icd d
# JOIN labevents lab ON d.hadm_id = lab.hadm_id
# WHERE (d.icd9_code LIKE '584%' OR d.icd9_code LIKE '586%');''')

cur.execute('''select distinct* from vitals_first_day vfd
join kdigo_uo ku on ku.icustay_id = vfd.icustay_id
join diagnoses_icd d on d.hadm_id=vfd.hadm_id
join d_icd_diagnoses di on di.icd9_code=d.icd9_code
join patients p  on p.subject_id = d.subject_id
where (di.long_title ilike '% renal fail%' 
	or di.long_title ilike '%kidney fail%'
	or di.long_title ilike '%liver fail%'
	or di.long_title ilike '%spleen fail%');''')
            
rows = cur.fetchall()

# 获取列名（可选，方便 DataFrame 的列标题）
column_names = [desc[0] for desc in cur.description]

# 将查询结果转换为 DataFrame
df = pd.DataFrame(rows, columns=column_names)

# 输出查看数据
print(df.head())
df.to_csv("住院第一天的生命体征与aki肾衰竭患者尿量数据.csv", index=False,header=True)
con.close()

   subject_id  hadm_id  icustay_id  heartrate_min  heartrate_max  \
0           3   145834      211552           75.0          168.0   
1           3   145834      211552           75.0          168.0   
2           3   145834      211552           75.0          168.0   
3           3   145834      211552           75.0          168.0   
4           3   145834      211552           75.0          168.0   

   heartrate_mean  sysbp_min  sysbp_max  sysbp_mean  diasbp_min  ...  \
0      111.785714       64.0      217.0      102.96        28.0  ...   
1      111.785714       64.0      217.0      102.96        28.0  ...   
2      111.785714       64.0      217.0      102.96        28.0  ...   
3      111.785714       64.0      217.0      102.96        28.0  ...   
4      111.785714       64.0      217.0      102.96        28.0  ...   

                short_title                         long_title  row_id  \
0  Acute kidney failure NOS  Acute kidney failure, unspecified       2   
1  Acute k

In [None]:
import psycopg2
#建立数据库连接
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")
#调用游标对象
cur = con.cursor()

batch_size = 10000
offset = 0

# 打开文件并写入数据
with open("CHARTEVENTS.csv", mode="w", newline='', encoding="utf-8") as file:
    first_batch = True  # 是否是第一批数据

    while True:
        # 分批查询
        query = f'''
        SELECT DISTINCT 
            C.charttime, C.itemid, C.value, C.valueuom,
            d.row_id, d.icd9_code, d.hadm_id,
            p.subject_id, p.gender, p.dob
        FROM CHARTEVENTS C
        JOIN diagnoses_icd d ON d.hadm_id = C.hadm_id
        JOIN patients p ON p.subject_id = d.subject_id
        WHERE (d.icd9_code LIKE '584%' OR d.icd9_code LIKE '586%')
        LIMIT {batch_size} OFFSET {offset};
        '''
        
        # 执行查询
        cur.execute(query)
        rows = cur.fetchall()

        # 如果没有更多数据，退出循环
        if not rows:
            break

        # 获取列名，确保列名与数据列数匹配
        column_names = [desc[0] for desc in cur.description]

        # 创建 DataFrame
        df = pd.DataFrame(rows, columns=column_names)

        # 写入 CSV 文件
        df.to_csv(file, index=False, header=first_batch, mode="a")
        first_batch = False  # 仅第一批数据包含表头

        # 更新 offset
        offset += batch_size


### 血气

In [4]:
import psycopg2
import pandas as pd
import random

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# SQL 查询语句
query_positive = '''
SELECT DISTINCT 
    bl.subject_id, bl.hadm_id, d.icd9_code, bl.so2, bl.spo2, bl.po2, bl.pco2, bl.fio2, bl.aado2, bl.pao2fio2,
    bl.ph, bl.totalco2, bl.temperature, p.gender, bl.requiredo2, bl.tidalvolume, bl.calcium,
    1 AS match_flag
FROM blood_gas_first_day_arterial bl
JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE di.long_title ILIKE '% renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%';
'''

query_negative = '''
SELECT DISTINCT 
    bl.subject_id, bl.hadm_id, d.icd9_code, bl.so2, bl.spo2, bl.po2, bl.pco2, bl.fio2, bl.aado2, bl.pao2fio2,
    bl.ph, bl.totalco2, bl.temperature, p.gender, bl.requiredo2, bl.tidalvolume, bl.calcium,
    0 AS match_flag
FROM blood_gas_first_day_arterial bl
JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE NOT (di.long_title ILIKE '% renal fail%' 
           OR di.long_title ILIKE '%kidney fail%'
           OR di.long_title ILIKE '%liver fail%'
           OR di.long_title ILIKE '%spleen fail%');
'''

try:
    cur = con.cursor()
    
    # 获取急性肾衰竭患者数据
    cur.execute(query_positive)
    positive_rows = cur.fetchall()
    positive_df = pd.DataFrame(positive_rows, columns=[desc[0] for desc in cur.description])
    
    # 获取非急性肾衰竭患者数据
    cur.execute(query_negative)
    negative_rows = cur.fetchall()
    negative_df = pd.DataFrame(negative_rows, columns=[desc[0] for desc in cur.description])
    
    # 随机抽取与急性肾衰竭患者数量相等的非急性肾衰竭数据
    sample_negative_df = negative_df.sample(n=len(positive_df), random_state=42)

    # 合并数据
    combined_df = pd.concat([positive_df, sample_negative_df], ignore_index=True)

    # 保存到 CSV 文件
    output_file = "blood_gas_sampled_with_flags.csv"
    combined_df.to_csv(output_file, index=False, header=True)
    print(f"抽样结果已保存到 {output_file}")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


抽样结果已保存到 blood_gas_sampled_with_flags.csv


### aki患者第一天实验室数据

In [5]:
import psycopg2
import pandas as pd

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# SQL 查询语句
query_positive = '''
SELECT DISTINCT 
    lf.*, kc.*, d.icd9_code, di.long_title, p.gender,
    1 AS match_flag
FROM labs_first_day lf
JOIN kdigo_creatinine kc ON kc.icustay_id = lf.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = lf.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE di.long_title ILIKE '% renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%';
'''

query_negative = '''
SELECT DISTINCT 
    lf.*, kc.*, d.icd9_code, di.long_title, p.gender,
    0 AS match_flag
FROM labs_first_day lf
JOIN kdigo_creatinine kc ON kc.icustay_id = lf.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = lf.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE NOT (di.long_title ILIKE '% renal fail%' 
           OR di.long_title ILIKE '%kidney fail%'
           OR di.long_title ILIKE '%liver fail%'
           OR di.long_title ILIKE '%spleen fail%');
'''

try:
    cur = con.cursor()

    # 获取急性肾衰竭患者数据
    cur.execute(query_positive)
    positive_rows = cur.fetchall()
    positive_df = pd.DataFrame(positive_rows, columns=[desc[0] for desc in cur.description])

    # 获取非急性肾衰竭患者数据
    cur.execute(query_negative)
    negative_rows = cur.fetchall()
    negative_df = pd.DataFrame(negative_rows, columns=[desc[0] for desc in cur.description])

    # 随机抽取与急性肾衰竭患者数量相等的非急性肾衰竭数据
    sample_negative_df = negative_df.sample(n=len(positive_df), random_state=42)

    # 合并数据
    combined_df = pd.concat([positive_df, sample_negative_df], ignore_index=True)

    # 保存到 CSV 文件
    output_file = "aki_patients_labs_sampled_with_flags.csv"
    combined_df.to_csv(output_file, index=False, header=True)
    print(f"抽样结果已保存到 {output_file}")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


抽样结果已保存到 aki_patients_labs_sampled_with_flags.csv


### aki患者微生物实验室结果

In [2]:
import psycopg2
import pandas as pd

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# SQL 查询语句
query_positive = '''
SELECT DISTINCT 
    mb.*, icd.*, kc.*, d.icd9_code, di.long_title, p.gender,
    1 AS match_flag
FROM microbiologyevents mb
JOIN icustay_detail icd ON icd.hadm_id = mb.hadm_id
JOIN kdigo_creatinine kc ON kc.icustay_id = icd.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = mb.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE di.long_title ILIKE '% renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%'
LIMIT 20000;
'''

query_negative = '''
SELECT DISTINCT 
    mb.*, icd.*, kc.*, d.icd9_code, di.long_title, p.gender,
    0 AS match_flag
FROM microbiologyevents mb
JOIN icustay_detail icd ON icd.hadm_id = mb.hadm_id
JOIN kdigo_creatinine kc ON kc.icustay_id = icd.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = mb.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE NOT (di.long_title ILIKE '% renal fail%' 
           OR di.long_title ILIKE '%kidney fail%'
           OR di.long_title ILIKE '%liver fail%'
           OR di.long_title ILIKE '%spleen fail%')
LIMIT 20000;
'''

try:
    cur = con.cursor()

    # 获取急性肾衰竭患者数据（前 20,000 条）
    cur.execute(query_positive)
    positive_rows = cur.fetchall()
    positive_df = pd.DataFrame(positive_rows, columns=[desc[0] for desc in cur.description])

    # 获取非急性肾衰竭患者数据（前 20,000 条）
    cur.execute(query_negative)
    negative_rows = cur.fetchall()
    negative_df = pd.DataFrame(negative_rows, columns=[desc[0] for desc in cur.description])

    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)

    # 保存到 CSV 文件
    output_file = "aki_microbiology_sampled_with_flags.csv"
    combined_df.to_csv(output_file, index=False, header=True)
    print(f"前 20,000 条每类数据已保存到 {output_file}")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


前 20,000 条每类数据已保存到 aki_microbiology_sampled_with_flags.csv


### aki肾衰竭患者尿量数据+第一天实验室数据

In [1]:
import psycopg2
import pandas as pd

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# SQL 查询语句：急性肾衰竭
query_positive = '''
SELECT DISTINCT 
    lf.*, ku.*, d.icd9_code, di.long_title, p.gender,
    1 AS match_flag
FROM labs_first_day lf
JOIN kdigo_uo ku ON ku.icustay_id = lf.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = lf.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE di.long_title ILIKE '% renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%'
LIMIT 20000;
'''

# SQL 查询语句：非急性肾衰竭
query_negative = '''
SELECT DISTINCT 
    lf.*, ku.*, d.icd9_code, di.long_title, p.gender,
    0 AS match_flag
FROM labs_first_day lf
JOIN kdigo_uo ku ON ku.icustay_id = lf.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = lf.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE NOT (di.long_title ILIKE '% renal fail%' 
           OR di.long_title ILIKE '%kidney fail%'
           OR di.long_title ILIKE '%liver fail%'
           OR di.long_title ILIKE '%spleen fail%')
LIMIT 20000;
'''

try:
    cur = con.cursor()
    
    # 获取急性肾衰竭患者数据
    cur.execute(query_positive)
    positive_rows = cur.fetchall()
    positive_df = pd.DataFrame(positive_rows, columns=[desc[0] for desc in cur.description])

    # 获取非急性肾衰竭患者数据
    cur.execute(query_negative)
    negative_rows = cur.fetchall()
    negative_df = pd.DataFrame(negative_rows, columns=[desc[0] for desc in cur.description])

    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)

    # 保存到 CSV 文件
    output_file = "aki_urine_labs_sampled_with_flags.csv"
    combined_df.to_csv(output_file, index=False, header=True)
    print(f"随机抽样结果已保存到 {output_file}")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


随机抽样结果已保存到 aki_urine_labs_sampled_with_flags.csv


### 住院第一天的生命体征与aki肾衰竭患者尿量数据

In [2]:
import psycopg2
import pandas as pd

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# SQL 查询语句：急性肾衰竭
query_positive = '''
SELECT DISTINCT 
    vfd.*, ku.*, d.icd9_code, di.long_title, p.gender,
    1 AS match_flag
FROM vitals_first_day vfd
JOIN kdigo_uo ku ON ku.icustay_id = vfd.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = vfd.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE di.long_title ILIKE '% renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%'
LIMIT 20000;
'''

# SQL 查询语句：非急性肾衰竭
query_negative = '''
SELECT DISTINCT 
    vfd.*, ku.*, d.icd9_code, di.long_title, p.gender,
    0 AS match_flag
FROM vitals_first_day vfd
JOIN kdigo_uo ku ON ku.icustay_id = vfd.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = vfd.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE NOT (di.long_title ILIKE '% renal fail%' 
           OR di.long_title ILIKE '%kidney fail%'
           OR di.long_title ILIKE '%liver fail%'
           OR di.long_title ILIKE '%spleen fail%')
LIMIT 20000;
'''

try:
    cur = con.cursor()
    
    # 获取急性肾衰竭患者数据
    cur.execute(query_positive)
    positive_rows = cur.fetchall()
    positive_df = pd.DataFrame(positive_rows, columns=[desc[0] for desc in cur.description])

    # 获取非急性肾衰竭患者数据
    cur.execute(query_negative)
    negative_rows = cur.fetchall()
    negative_df = pd.DataFrame(negative_rows, columns=[desc[0] for desc in cur.description])

    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)

    # 保存到 CSV 文件
    output_file = "vitals_aki_urine_sampled_with_flags.csv"
    combined_df.to_csv(output_file, index=False, header=True)
    print(f"随机抽样结果已保存到 {output_file}")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


随机抽样结果已保存到 vitals_aki_urine_sampled_with_flags.csv


### 语言 宗教 婚姻 种族

In [1]:
import psycopg2
import pandas as pd

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# SQL 查询语句：急性肾衰竭
query_positive = '''
SELECT DISTINCT 
    a.language,
    a.religion,
    a.marital_status,
    a.ethnicity,
    1 AS match_flag
FROM admissions a
JOIN diagnoses_icd d ON d.hadm_id = a.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
WHERE di.long_title ILIKE '% renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%'
LIMIT 20000;
'''

# SQL 查询语句：非急性肾衰竭
query_negative = '''
SELECT DISTINCT 
    a.language,
    a.religion,
    a.marital_status,
    a.ethnicity,
    0 AS match_flag
FROM admissions a
JOIN diagnoses_icd d ON d.hadm_id = a.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
WHERE NOT (di.long_title ILIKE '% renal fail%' 
           OR di.long_title ILIKE '%kidney fail%'
           OR di.long_title ILIKE '%liver fail%'
           OR di.long_title ILIKE '%spleen fail%')
LIMIT 20000;
'''

try:
    cur = con.cursor()
    
    # 获取急性肾衰竭数据
    cur.execute(query_positive)
    positive_rows = cur.fetchall()
    positive_df = pd.DataFrame(positive_rows, columns=[desc[0] for desc in cur.description])

    # 获取非急性肾衰竭数据
    cur.execute(query_negative)
    negative_rows = cur.fetchall()
    negative_df = pd.DataFrame(negative_rows, columns=[desc[0] for desc in cur.description])

    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)

    # 保存为 CSV 文件
    output_file = "language_religion_ethnicity_sampled_with_flags.csv"
    combined_df.to_csv(output_file, index=False, header=True)
    print(f"随机抽样结果已保存到 {output_file}")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


随机抽样结果已保存到 language_religion_ethnicity_sampled_with_flags.csv


In [None]:
import psycopg2
import pandas as pd

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# SQL 查询语句：符合条件的数据
query_positive = '''
SELECT DISTINCT 
    bl.subject_id, bl.hadm_id, bl.icustay_id, -- 主键
    bl.so2, bl.spo2, bl.po2, bl.pco2, bl.fio2, bl.aado2, bl.pao2fio2,
    bl.ph, bl.totalco2, bl.temperature, bl.requiredo2, bl.tidalvolume, bl.calcium,
    p.gender,
    lf.*, -- 第一日实验室数据
    mb.*, -- 微生物事件
    rfd.*, -- 血液净化治疗
    uof.*, -- 第一日尿液排出量
    vfd.*, -- 第一日生命体征
    1 AS match_flag -- 匹配标志
FROM blood_gas_first_day_arterial bl
JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
JOIN icustay_detail icd ON icd.hadm_id = d.hadm_id
JOIN kdigo_creatinine kdc ON kdc.icustay_id = icd.icustay_id
JOIN kdigo_uo ku ON ku.icustay_id = icd.icustay_id
JOIN labs_first_day lf ON lf.icustay_id = icd.icustay_id
JOIN microbiologyevents mb ON mb.hadm_id = d.hadm_id
JOIN rrt_first_day rfd ON rfd.hadm_id = d.hadm_id
JOIN urine_output_first_day uof ON uof.hadm_id = d.hadm_id
JOIN vitals_first_day vfd ON vfd.icustay_id = icd.icustay_id
WHERE di.long_title ILIKE '%renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%'
LIMIT 1;
'''

# SQL 查询语句：不符合条件的数据
query_negative = '''
SELECT DISTINCT 
    bl.subject_id, bl.hadm_id, bl.icustay_id, -- 主键
    bl.so2, bl.spo2, bl.po2, bl.pco2, bl.fio2, bl.aado2, bl.pao2fio2,
    bl.ph, bl.totalco2, bl.temperature, bl.requiredo2, bl.tidalvolume, bl.calcium,
    p.gender,
    lf.*, -- 第一日实验室数据
    mb.*, -- 微生物事件
    rfd.*, -- 血液净化治疗
    uof.*, -- 第一日尿液排出量
    vfd.*, -- 第一日生命体征
    0 AS match_flag -- 不匹配标志
FROM blood_gas_first_day_arterial bl
LEFT JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
LEFT JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
LEFT JOIN patients p ON p.subject_id = bl.subject_id
LEFT JOIN icustay_detail icd ON icd.hadm_id = bl.hadm_id
LEFT JOIN kdigo_creatinine kdc ON kdc.icustay_id = icd.icustay_id
LEFT JOIN kdigo_uo ku ON ku.icustay_id = icd.icustay_id
LEFT JOIN labs_first_day lf ON lf.icustay_id = icd.icustay_id
LEFT JOIN microbiologyevents mb ON mb.hadm_id = bl.hadm_id
LEFT JOIN rrt_first_day rfd ON rfd.hadm_id = bl.hadm_id
LEFT JOIN urine_output_first_day uof ON uof.hadm_id = bl.hadm_id
LEFT JOIN vitals_first_day vfd ON vfd.icustay_id = icd.icustay_id
WHERE di.icd9_code IS NULL
LIMIT 1;
'''

output_file = "aki_sampled_record.csv"

try:
    cur = con.cursor()
    
    # 创建一个空的 DataFrame，用于保存记录
    columns_written = False
    
    for query in [query_positive, query_negative]:
        cur.execute(query)
        row = cur.fetchone()
        
        if row:
            # 获取列名
            column_names = [desc[0] for desc in cur.description]
            record_df = pd.DataFrame([row], columns=column_names)

            # 如果是首次写入 CSV 文件，包含表头
            if not columns_written:
                record_df.to_csv(output_file, mode='w', index=False, header=True)
                columns_written = True
            else:
                # 追加到 CSV 文件中，无需表头
                record_df.to_csv(output_file, mode='a', index=False, header=False)
            
            print(f"已保存一条记录到 {output_file}")
    
except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


In [1]:
import psycopg2
import pandas as pd
import os
import gc

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# 查询语句
query_positive = '''
SELECT DISTINCT 
    bl.so2, bl.spo2, bl.po2, bl.pco2, bl.fio2, bl.aado2, bl.pao2fio2,
    bl.ph, bl.totalco2, bl.temperature, bl.requiredo2, bl.tidalvolume, bl.calcium,
    p.gender,
    rfd.rrt,
    uof.urineoutput,
    vfd.*,
    1 AS match_flag
FROM blood_gas_first_day_arterial bl
JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
JOIN icustay_detail icd ON icd.hadm_id = d.hadm_id
JOIN rrt_first_day rfd ON rfd.hadm_id = d.hadm_id
JOIN urine_output_first_day uof ON uof.hadm_id = d.hadm_id
JOIN vitals_first_day vfd ON vfd.icustay_id = icd.icustay_id
WHERE di.long_title ILIKE '%renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%'
LIMIT 30000;
'''

query_negative = '''
SELECT DISTINCT 
    bl.so2, bl.spo2, bl.po2, bl.pco2, bl.fio2, bl.aado2, bl.pao2fio2,
    bl.ph, bl.totalco2, bl.temperature, bl.requiredo2, bl.tidalvolume, bl.calcium,
    p.gender,
    rfd.rrt,
    uof.urineoutput,
    vfd.*,
    0 AS match_flag
FROM blood_gas_first_day_arterial bl
LEFT JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
LEFT JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
LEFT JOIN patients p ON p.subject_id = bl.subject_id
LEFT JOIN icustay_detail icd ON icd.hadm_id = bl.hadm_id
LEFT JOIN rrt_first_day rfd ON rfd.hadm_id = bl.hadm_id
LEFT JOIN urine_output_first_day uof ON uof.hadm_id = bl.hadm_id
LEFT JOIN vitals_first_day vfd ON vfd.icustay_id = icd.icustay_id
WHERE di.icd9_code IS NULL
LIMIT 30000;
'''

output_file = "aki_data3.csv"

try:
    cur = con.cursor()
    queries = [query_positive, query_negative]

    for query in queries:
        cur.execute(query)
        
        # 分批提取数据
        while True:
            rows = cur.fetchmany(1000)  # 每次提取 1000 行
            if not rows:
                break
            
            # 获取列名并创建 DataFrame
            column_names = [desc[0] for desc in cur.description]
            batch_df = pd.DataFrame(rows, columns=column_names)

            # 去除重复行
            batch_df.drop_duplicates(inplace=True)

            # 写入 CSV 文件
            batch_df.to_csv(output_file, mode='a', index=False, header=not os.path.exists(output_file))
            
            # 释放内存
            del batch_df
            gc.collect()

        print(f"完成 {query[:50]} 的记录写入")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


完成 
SELECT DISTINCT 
    bl.so2, bl.spo2, bl.po2, bl. 的记录写入
完成 
SELECT DISTINCT 
    bl.so2, bl.spo2, bl.po2, bl. 的记录写入


In [2]:
import psycopg2
import pandas as pd
import os
import gc

# 数据库连接配置
con = psycopg2.connect(database="mimiciii",
                       user="postgres",
                       password="123456",
                       host="localhost",
                       port="5433")

# 查询语句
query_positive = '''
SELECT DISTINCT 
    mb.isolate_num, mb.dilution_value, mb.interpretation,
    lf.*,
    bl.hadm_id,
    1 AS match_flag
FROM blood_gas_first_day_arterial bl
JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN labs_first_day lf ON lf.icustay_id = (
    SELECT icustay_id
    FROM icustay_detail icd
    WHERE icd.hadm_id = d.hadm_id
    LIMIT 1
)
JOIN microbiologyevents mb ON mb.hadm_id = d.hadm_id
WHERE di.long_title ILIKE '%renal fail%' 
   OR di.long_title ILIKE '%kidney fail%'
   OR di.long_title ILIKE '%liver fail%'
   OR di.long_title ILIKE '%spleen fail%'
LIMIT 30000;
'''

query_negative = '''
SELECT DISTINCT 
    mb.isolate_num, mb.dilution_value, mb.interpretation,
    lf.*,
    bl.hadm_id,
    0 AS match_flag
FROM blood_gas_first_day_arterial bl
LEFT JOIN diagnoses_icd d ON d.hadm_id = bl.hadm_id
LEFT JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
LEFT JOIN labs_first_day lf ON lf.icustay_id = (
    SELECT icustay_id
    FROM icustay_detail icd
    WHERE icd.hadm_id = bl.hadm_id
    LIMIT 1
)
LEFT JOIN microbiologyevents mb ON mb.hadm_id = bl.hadm_id
WHERE di.icd9_code IS NULL
LIMIT 30000;
'''

output_file = "aki_data4.csv"

try:
    cur = con.cursor()
    queries = [query_positive, query_negative]

    for query in queries:
        cur.execute(query)
        
        # 分批提取数据
        while True:
            rows = cur.fetchmany(1000)  # 每次提取 1000 行
            if not rows:
                break
            
            # 获取列名并创建 DataFrame
            column_names = [desc[0] for desc in cur.description]
            batch_df = pd.DataFrame(rows, columns=column_names)

            # 去除重复行
            batch_df.drop_duplicates(inplace=True)

            # 写入 CSV 文件
            batch_df.to_csv(output_file, mode='a', index=False, header=not os.path.exists(output_file))
            
            # 释放内存
            del batch_df
            gc.collect()

        print(f"完成 {query[:50]} 的记录写入")

except Exception as e:
    print(f"发生错误: {e}")
finally:
    if con:
        cur.close()
        con.close()


完成 
SELECT DISTINCT 
    mb.isolate_num, mb.dilution_ 的记录写入
完成 
SELECT DISTINCT 
    mb.isolate_num, mb.dilution_ 的记录写入


In [5]:
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder

# 读取数据
file_path = 'aki_data4.csv'
df = pd.read_csv(file_path)

# 1. 删除 ID 类和文本描述相关的列
columns_to_remove = ['row_id', 'subject_id', 'hadm_id', 'icustay_id', 'charttime',
                     'icd9_code', 'long_title', ]  # 添加需要移除的列名
df_cleaned = df.drop(columns=[col for col in columns_to_remove if col in df.columns])

# 2. 去除重复列
df_cleaned = df_cleaned.loc[:, ~df_cleaned.columns.duplicated()]

# 3. 将性别转换为数值类型
if 'gender' in df_cleaned.columns:
    df_cleaned['gender'] = df_cleaned['gender'].apply(lambda x: 1 if x == 'M' else 0)

# 4. 对非文本类型的分类特征进行编码
for col in df_cleaned.select_dtypes(include=['object']).columns:
    label_encoder = LabelEncoder()
    df_cleaned[col] = label_encoder.fit_transform(df_cleaned[col].astype(str))

# 5. 去除缺失值比例超过 50% 的列
missing_threshold = 0.5
df_cleaned = df_cleaned.loc[:, df_cleaned.isnull().mean() <= missing_threshold]

# 6. 填充剩余的缺失值
imputer = SimpleImputer(strategy='median')  # 使用中位数填充
df_cleaned[df_cleaned.columns] = imputer.fit_transform(df_cleaned)

# 7. 提取数值特征并标准化
features = df_cleaned.drop(columns=['match_flag'])  # 假设 'match_flag' 是标签列
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

# 进一步归一化到 [0, 1] 范围
scaler = MinMaxScaler()
features_scaled = scaler.fit_transform(features_scaled)

# 转换为 DataFrame 并添加回标签列
df_final = pd.DataFrame(features_scaled, columns=features.columns)
df_final['match_flag'] = df_cleaned['match_flag'].values

# 保存清洗后的数据
output_file = 'cleaned_aki_data4.csv'
df_final.to_csv(output_file, index=False)
print(f"清洗后的数据已保存到 {output_file}")


清洗后的数据已保存到 cleaned_aki_data4.csv


In [11]:
import psycopg2
import pandas as pd

# 数据库连接配置
con = psycopg2.connect(
    database="mimiciii",
    user="postgres",
    password="123456",
    host="localhost",
    port="5433"
)

# 阳性样本查询
query_positive = '''
SELECT DISTINCT 
    wfd.weight,
    p.gender,
    1 AS match_flag
FROM weight_first_day wfd
JOIN icustays ic ON ic.icustay_id = wfd.icustay_id
JOIN diagnoses_icd d ON d.hadm_id = ic.hadm_id
JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
JOIN patients p ON p.subject_id = d.subject_id
WHERE (di.long_title ILIKE '% renal fail%' 
    OR di.long_title ILIKE '%kidney fail%'
    OR di.long_title ILIKE '%liver fail%'
    OR di.long_title ILIKE '%spleen fail%')
'''

# 优化后的阴性样本查询
query_negative = '''
SELECT * FROM (
    SELECT DISTINCT 
        wfd.weight,
        p.gender,
        0 AS match_flag
    FROM weight_first_day wfd
    JOIN icustays ic ON ic.icustay_id = wfd.icustay_id
    JOIN diagnoses_icd d ON d.hadm_id = ic.hadm_id
    JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
    JOIN patients p ON p.subject_id = d.subject_id
    WHERE NOT EXISTS (
        SELECT 1
        FROM diagnoses_icd d2
        JOIN d_icd_diagnoses di2 ON di2.icd9_code = d2.icd9_code
        WHERE d2.hadm_id = d.hadm_id
          AND (di2.long_title ILIKE '% renal fail%' 
              OR di2.long_title ILIKE '%kidney fail%'
              OR di2.long_title ILIKE '%liver fail%'
              OR di2.long_title ILIKE '%spleen fail%')
    )
) AS sub
ORDER BY RANDOM()
LIMIT 20000;
'''

try:
    cur = con.cursor()
    
    # 获取阳性样本
    print("正在获取阳性样本...")
    cur.execute(query_positive)
    positive_df = pd.DataFrame(cur.fetchall(), columns=[desc[0] for desc in cur.description])
    print(f"阳性样本获取完成，共 {len(positive_df)} 条")
    
    # 获取阴性样本
    print("正在获取阴性样本...")
    cur.execute(query_negative)
    negative_df = pd.DataFrame(cur.fetchall(), columns=[desc[0] for desc in cur.description])
    print(f"阴性样本获取完成，共 {len(negative_df)} 条")
    
    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)
    
    # 保存结果
    output_file = "./data/weight_first_day.csv"
    combined_df.to_csv(output_file, index=False)
    print(f"数据集已保存到 {output_file}")
    print(f"总样本数：{len(combined_df)}（阳性 {len(positive_df)}，阴性 {len(negative_df)}）")

except Exception as e:
    print(f"操作失败：{str(e)}")
finally:
    if con:
        cur.close()
        con.close()
        
import pandas as pd
import numpy as np
from sklearn.experimental import enable_iterative_imputer  # 启用 IterativeImputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder

# 读取数据
file_path = './data/weight_first_day.csv'
df = pd.read_csv(file_path)

# 1. 删除 ID 类和文本描述相关的列
columns_to_remove = ['row_id', 'subject_id',  'icustay_id', 'charttime',
                     'icd9_code', 'long_title', 'subject_id1', 'hadm_id1', 'expire_flag', 'short_title', 'row_id1', 'icd9_code1'
                    ,'dod','dob','dod_ssn','dod_hosp']  # 添加需要移除的列名
df_cleaned = df.drop(columns=[col for col in columns_to_remove if col in df.columns])

# 2. 去除重复列
df_cleaned = df_cleaned.loc[:, ~df_cleaned.columns.duplicated()]

# 3. 将性别转换为数值类型
if 'gender' in df_cleaned.columns:
    df_cleaned['gender'] = df_cleaned['gender'].apply(lambda x: 1 if x == 'M' else 0)
# 4. 对非文本类型的分类特征进行编码
categorical_cols = []  # 记录分类列用于后续处理
for col in df_cleaned.select_dtypes(include=['object']).columns:
    if col not in ['intime', 'outtime']:  # 跳过时间列
        categorical_cols.append(col)
        label_encoder = LabelEncoder()
        df_cleaned[col] = label_encoder.fit_transform(df_cleaned[col].astype(str))

# 5. 将时间列转换为Unix时间戳并处理缺失
time_cols = ['intime', 'outtime']
for col in time_cols:
    if col in df_cleaned.columns:
        # 转换为datetime并处理无效值
        df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
        # 转换为Unix时间戳（秒）
        df_cleaned[col] = (df_cleaned[col].astype('int64') // 10**9).replace(-9223372036854775808, np.nan)

# 6. 删除高缺失率列
missing_threshold = 0.5
df_cleaned = df_cleaned.loc[:, df_cleaned.isnull().mean() <= missing_threshold]

# 7. 使用XGBoost进行缺失值填补
from sklearn.impute import IterativeImputer
from xgboost import XGBRegressor

# 对所有列进行填补（包括时间列和标签列）
columns_to_impute = df_cleaned.columns.tolist()
imputer = IterativeImputer(
    estimator=XGBRegressor(n_estimators=100, random_state=42),
    max_iter=10,
    random_state=42
)
df_cleaned[columns_to_impute] = imputer.fit_transform(df_cleaned[columns_to_impute])

# 8. 后处理：分类列和标签列取整
for col in categorical_cols + ['match_flag']:
    if col in df_cleaned.columns:
        df_cleaned[col] = df_cleaned[col].round().astype(int)

# 时间列取整（确保为整数时间戳）
for col in time_cols:
    if col in df_cleaned.columns:
        df_cleaned[col] = df_cleaned[col].round().astype('int64')

# 9. 标准化处理（排除指定列）
# features = df_cleaned.drop(columns=['match_flag', 'intime', 'outtime',])
features = df_cleaned.drop(columns=['match_flag'])
# 标准化+归一化
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
scaler = MinMaxScaler()
features_scaled = scaler.fit_transform(features_scaled)

# 重组最终DataFrame
df_final = pd.DataFrame(features_scaled, columns=features.columns)
# for col in ['intime', 'outtime','match_flag']:
for col in ['match_flag']:
    if col in df_cleaned.columns:
        df_final[col] = df_cleaned[col].values

# 保存结果
output_file = './data/cleaned_weight_first_day.csv'
df_final.to_csv(output_file, index=False)
print(f"清洗后的数据已保存到 {output_file}")


正在获取阳性样本...
阳性样本获取完成，共 1991 条
正在获取阴性样本...
阴性样本获取完成，共 2534 条
数据集已保存到 ./data/weight_first_day.csv
总样本数：4525（阳性 1991，阴性 2534）
清洗后的数据已保存到 ./data/cleaned_weight_first_day.csv


In [6]:
import psycopg2
import pandas as pd
import numpy as np
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBRegressor

# ================== 数据库配置部分 ==================
# 数据库连接配置
con = psycopg2.connect(
    database="mimiciii",
    user="postgres",
    password="123456",
    host="localhost",
    port="5433"
)

# ================== 修改后的SQL查询 ==================
# 新的阳性样本查询
query_positive = '''
SELECT DISTINCT lfd.*,
                p.gender,
                1 AS match_flag
FROM labs_first_day lfd
LEFT JOIN diagnoses_icd d ON d.hadm_id = lfd.hadm_id
LEFT JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
LEFT JOIN patients p ON p.subject_id = d.subject_id
WHERE (di.long_title ILIKE '% renal fail%' 
    OR di.long_title ILIKE '%kidney fail%'
    OR di.long_title ILIKE '%liver fail%'
    OR di.long_title ILIKE '%spleen fail%')
'''

# 新的阴性样本查询（动态匹配阳性样本数量）
query_negative = '''
WITH positive_hadm AS (
    SELECT DISTINCT d.hadm_id
    FROM diagnoses_icd d
    JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
    WHERE di.long_title ILIKE '% renal fail%' 
        OR di.long_title ILIKE '%kidney fail%'
        OR di.long_title ILIKE '%liver fail%'
        OR di.long_title ILIKE '%spleen fail%'
)
SELECT 
    lfd.*,
    p.gender,
    0 AS match_flag 
FROM labs_first_day lfd
LEFT JOIN patients p ON p.subject_id = lfd.subject_id
WHERE NOT EXISTS (
    SELECT 1 
    FROM positive_hadm ph 
    WHERE ph.hadm_id = lfd.hadm_id
)
ORDER BY RANDOM()
LIMIT (SELECT COUNT(*) FROM positive_hadm);
'''

# ================== 数据获取部分 ==================
def get_dynamic_keep_columns(cursor_description):
    """
    从数据库查询结果中动态提取列名，并确保保留关键列。
    :param cursor_description: 数据库查询结果的description属性
    :return: 动态生成的keep_columns列表
    """
    # 提取所有列名
    all_columns = [desc[0] for desc in cursor_description]
    
    # 确保关键列存在
    required_columns = ['hadm_id', 'match_flag']
    for col in required_columns:
        if col not in all_columns:
            raise ValueError(f"关键列 {col} 不在查询结果中！")
    
    # 动态构建keep_columns
    # 排除不需要的列（可根据需要调整）
    exclude_columns = ['row_id', 'subject_id', 'icustay_id']  # 示例：排除这些列
    keep_columns = [col for col in all_columns if col not in exclude_columns]
    
    return keep_columns

try:
    cur = con.cursor()
    
    # 获取阳性样本
    print("正在获取阳性样本...")
    cur.execute(query_positive)
    positive_columns = [desc[0] for desc in cur.description]  # 提取列名
    positive_df = pd.DataFrame(cur.fetchall(), columns=positive_columns)
    print(f"阳性样本获取完成，共 {len(positive_df)} 条")
    
    # 动态更新阴性样本查询
    query_negative = query_negative.replace('SELECT COUNT(*) FROM positive_hadm', str(len(positive_df)))
    
    # 获取阴性样本
    print("正在获取阴性样本...")
    cur.execute(query_negative)
    negative_columns = [desc[0] for desc in cur.description]  # 提取列名
    negative_df = pd.DataFrame(cur.fetchall(), columns=negative_columns)
    print(f"阴性样本获取完成，共 {len(negative_df)} 条")
    
    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)
    
    # 动态生成keep_columns
    keep_columns = get_dynamic_keep_columns(cur.description)
    print(f"动态保留的列：{keep_columns}")
    
    # 仅保留需要的列
    combined_df = combined_df[keep_columns]
    
    # 保存原始数据
    raw_output = "./data/labs_first_day_raw.csv"
    combined_df.to_csv(raw_output, index=False)
    print(f"原始数据已保存到 {raw_output}")

except Exception as e:
    print(f"数据库操作失败：{str(e)}")
finally:
    if con:
        cur.close()
        con.close()

# ================== 数据预处理部分 ==================
# 读取数据
df = pd.read_csv('./data/labs_first_day_raw.csv')

# 1. 动态保留列（基于查询结果）
keep_columns = get_dynamic_keep_columns([(col,) for col in df.columns])  # 模拟cursor.description
df = df[keep_columns]

# 2. 去除重复记录（基于hadm_id）
df = df.drop_duplicates(subset=['hadm_id'], keep='first')

# 3. 性别编码
df['gender'] = df['gender'].map({'M': 1, 'F': 0}).fillna(-1).astype(int)

# 4. 缺失值处理（保留hadm_id）
impute_cols = [col for col in df.columns if col not in ['hadm_id', 'match_flag']]  # 动态选择需要填补的列
imputer = IterativeImputer(
    estimator=XGBRegressor(n_estimators=50, random_state=42),
    max_iter=10,
    random_state=42
)
df[impute_cols] = imputer.fit_transform(df[impute_cols])

# 5. 标准化处理（排除hadm_id和标签列）
features = df.drop(columns=['hadm_id', 'match_flag'])
scaler = StandardScaler().fit(features)
features_scaled = MinMaxScaler().fit_transform(scaler.transform(features))

# 重组最终DataFrame
df_final = pd.DataFrame(features_scaled, columns=features.columns)
df_final = pd.concat([
    df[['hadm_id']].reset_index(drop=True),
    df_final,
    df['match_flag'].reset_index(drop=True)
], axis=1)

# 保存最终数据
final_output = './data/cleaned_labs_first_day.csv'
df_final.to_csv(final_output, index=False)
print(f"处理后的数据已保存到 {final_output}")
print("数据特征分布：\n", df_final.describe())

# ================== 新增验证部分 ==================
# 验证hadm_id保留情况
assert 'hadm_id' in df_final.columns, "hadm_id列丢失！"
print("\n验证结果：")
print(f"总样本数：{len(df_final)}")
print(f"阳性样本数：{df_final.match_flag.sum()}")
print(f"阴性样本数：{len(df_final) - df_final.match_flag.sum()}")
print(f"唯一hadm_id数量：{df_final.hadm_id.nunique()}")

正在获取阳性样本...
阳性样本获取完成，共 12888 条
正在获取阴性样本...
阴性样本获取完成，共 12888 条
动态保留的列：['hadm_id', 'aniongap_min', 'aniongap_max', 'albumin_min', 'albumin_max', 'bands_min', 'bands_max', 'bicarbonate_min', 'bicarbonate_max', 'bilirubin_min', 'bilirubin_max', 'creatinine_min', 'creatinine_max', 'chloride_min', 'chloride_max', 'glucose_min', 'glucose_max', 'hematocrit_min', 'hematocrit_max', 'hemoglobin_min', 'hemoglobin_max', 'lactate_min', 'lactate_max', 'platelet_min', 'platelet_max', 'potassium_min', 'potassium_max', 'ptt_min', 'ptt_max', 'inr_min', 'inr_max', 'pt_min', 'pt_max', 'sodium_min', 'sodium_max', 'bun_min', 'bun_max', 'wbc_min', 'wbc_max', 'gender', 'match_flag']
原始数据已保存到 ./data/labs_first_day_raw.csv




处理后的数据已保存到 ./data/cleaned_labs_first_day.csv
数据特征分布：
              hadm_id  aniongap_min  aniongap_max   albumin_min   albumin_max  \
count   24087.000000  24087.000000  24087.000000  24087.000000  24087.000000   
mean   149804.298252      0.291386      0.258762      0.337905      0.350923   
std     28875.239187      0.081605      0.089616      0.113909      0.117026   
min    100001.000000      0.000000      0.000000      0.000000      0.000000   
25%    124775.500000      0.232558      0.196429      0.256857      0.264151   
50%    149804.000000      0.279070      0.250000      0.322706      0.337007   
75%    174792.500000      0.325581      0.285714      0.410794      0.429168   
max    199999.000000      1.000000      1.000000      1.000000      1.000000   

          bands_min     bands_max  bicarbonate_min  bicarbonate_max  \
count  24087.000000  24087.000000     24087.000000     24087.000000   
mean       0.127821      0.137647         0.401015         0.408652   
std        0

#### LGBMRegressor

In [3]:
import psycopg2
import pandas as pd
import joblib
import numpy as np
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor

# ================== 数据库配置部分 ==================
# 数据库连接配置
con = psycopg2.connect(
    database="mimiciii",
    user="postgres",
    password="123456",
    host="localhost",
    port="5433"
)

# ================== 修改后的SQL查询 ==================
# 新的阳性样本查询
query_positive = '''
SELECT DISTINCT lfd.*,
                p.gender,
                1 AS match_flag
FROM labs_first_day lfd
LEFT JOIN diagnoses_icd d ON d.hadm_id = lfd.hadm_id
LEFT JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
LEFT JOIN patients p ON p.subject_id = d.subject_id
WHERE (di.long_title ILIKE '% renal fail%'
    OR di.long_title ILIKE '%kidney fail%'
    OR di.long_title ILIKE '%liver fail%'
    OR di.long_title ILIKE '%spleen fail%')
'''

# 新的阴性样本查询（动态匹配阳性样本数量）
query_negative = '''
WITH positive_hadm AS (
    SELECT DISTINCT d.hadm_id
    FROM diagnoses_icd d
    JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
    WHERE di.long_title ILIKE '% renal fail%'
        OR di.long_title ILIKE '%kidney fail%'
        OR di.long_title ILIKE '%liver fail%'
        OR di.long_title ILIKE '%spleen fail%'
)
SELECT
    lfd.*,
    p.gender,
    0 AS match_flag
FROM labs_first_day lfd
LEFT JOIN patients p ON p.subject_id = lfd.subject_id
WHERE NOT EXISTS (
    SELECT 1
    FROM positive_hadm ph
    WHERE ph.hadm_id = lfd.hadm_id
)
ORDER BY RANDOM()
LIMIT (SELECT COUNT(*) FROM positive_hadm);
'''


#================== 数据获取部分 ==================
def get_dynamic_keep_columns(cursor_description):
    """
    从数据库查询结果中动态提取列名，并确保保留关键列。
    :param cursor_description: 数据库查询结果的description属性
    :return: 动态生成的keep_columns列表
    """
    # 提取所有列名
    all_columns = [desc[0] for desc in cursor_description]

    # 确保关键列存在
    required_columns = ['hadm_id', 'match_flag']
    for col in required_columns:
        if col not in all_columns:
            raise ValueError(f"关键列 {col} 不在查询结果中！")

    # 动态构建keep_columns
    # 排除不需要的列（可根据需要调整）
    exclude_columns = ['row_id', 'subject_id', 'icustay_id']  # 示例：排除这些列
    keep_columns = [col for col in all_columns if col not in exclude_columns]

    return keep_columns


try:
    cur = con.cursor()

    # 获取阳性样本
    print("正在获取阳性样本...")
    cur.execute(query_positive)
    positive_columns = [desc[0] for desc in cur.description]  # 提取列名
    positive_df = pd.DataFrame(cur.fetchall(), columns=positive_columns)
    print(f"阳性样本获取完成，共 {len(positive_df)} 条")

    # 动态更新阴性样本查询
    query_negative = query_negative.replace('SELECT COUNT(*) FROM positive_hadm', str(len(positive_df)))

    # 获取阴性样本
    print("正在获取阴性样本...")
    cur.execute(query_negative)
    negative_columns = [desc[0] for desc in cur.description]  # 提取列名
    negative_df = pd.DataFrame(cur.fetchall(), columns=negative_columns)
    print(f"阴性样本获取完成，共 {len(negative_df)} 条")

    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)

    # 动态生成keep_columns
    keep_columns = get_dynamic_keep_columns(cur.description)
    print(f"动态保留的列：{keep_columns}")

    # 仅保留需要的列
    combined_df = combined_df[keep_columns]

    # 保存原始数据
    raw_output = "./data/labs_first_day_raw.csv"
    combined_df.to_csv(raw_output, index=False)
    print(f"原始数据已保存到 {raw_output}")

except Exception as e:
    print(f"数据库操作失败：{str(e)}")
finally:
    if con:
        cur.close()
        con.close()

# ================== 数据预处理部分 ==================
# 读取数据
df = pd.read_csv('./data/labs_first_day_raw.csv')

# 1. 动态保留列（基于查询结果）
keep_columns = get_dynamic_keep_columns([(col,) for col in df.columns])  # 模拟cursor.description
df = df[keep_columns]

# 2. 去除重复记录（基于hadm_id）
df = df.drop_duplicates(subset=['hadm_id'], keep='first')

# 3. 性别编码
df['gender'] = df['gender'].map({'M': 1, 'F': 0}).fillna(-1).astype(int)

# 4. 缺失值处理（保留hadm_id）
impute_cols = [col for col in df.columns if col not in ['hadm_id', 'match_flag']]  # 动态选择需要填补的列
imputer = IterativeImputer(
    estimator=LGBMRegressor(n_estimators=50, random_state=42),
    max_iter=10,
    random_state=42
)
df[impute_cols] = imputer.fit_transform(df[impute_cols])
# 保存插值模型
joblib.dump(imputer, "./models/labs_first_day_lgbm_xgb.pkl")
print("插值模型已保存！")

# 5. 标准化处理（排除hadm_id和标签列）
features = df.drop(columns=['hadm_id', 'match_flag'])
scaler = StandardScaler().fit(features)
# 保存标准化模型
scaler_path = "./models/labs_first_day_lgbm_standard_scaler.pkl"
joblib.dump(scaler, scaler_path)
print(f"标准化模型已保存至 {scaler_path}")

features_scaled = scaler.transform(features)

# 重组最终DataFrame
df_final = pd.DataFrame(features_scaled, columns=features.columns)
df_final = pd.concat([
    df[['hadm_id']].reset_index(drop=True),
    df_final,
    df['match_flag'].reset_index(drop=True)
], axis=1)

# 保存最终数据
final_output = './data/cleaned_labs_first_day_lgbm.csv'
df_final.to_csv(final_output, index=False)
print(f"处理后的数据已保存到 {final_output}")
print("数据特征分布：\n", df_final.describe())

# ================== 新增验证部分 ==================
# 验证hadm_id保留情况
assert 'hadm_id' in df_final.columns, "hadm_id列丢失！"
print("\n验证结果：")
print(f"总样本数：{len(df_final)}")
print(f"阳性样本数：{df_final.match_flag.sum()}")
print(f"阴性样本数：{len(df_final) - df_final.match_flag.sum()}")
print(f"唯一hadm_id数量：{df_final.hadm_id.nunique()}")

正在获取阳性样本...
阳性样本获取完成，共 12888 条
正在获取阴性样本...
阴性样本获取完成，共 12888 条
动态保留的列：['hadm_id', 'aniongap_min', 'aniongap_max', 'albumin_min', 'albumin_max', 'bands_min', 'bands_max', 'bicarbonate_min', 'bicarbonate_max', 'bilirubin_min', 'bilirubin_max', 'creatinine_min', 'creatinine_max', 'chloride_min', 'chloride_max', 'glucose_min', 'glucose_max', 'hematocrit_min', 'hematocrit_max', 'hemoglobin_min', 'hemoglobin_max', 'lactate_min', 'lactate_max', 'platelet_min', 'platelet_max', 'potassium_min', 'potassium_max', 'ptt_min', 'ptt_max', 'inr_min', 'inr_max', 'pt_min', 'pt_max', 'sodium_min', 'sodium_max', 'bun_min', 'bun_max', 'wbc_min', 'wbc_max', 'gender', 'match_flag']
原始数据已保存到 ./data/labs_first_day_raw.csv
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.002856 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 5834
[LightGBM] [Info] Number of data points in the train set: 24117, number of used features: 38
[Ligh



插值模型已保存！
标准化模型已保存至 ./models/labs_first_day_lgbm_standard_scaler.pkl
处理后的数据已保存到 ./data/cleaned_labs_first_day_lgbm.csv
数据特征分布：
              hadm_id  aniongap_min  aniongap_max   albumin_min   albumin_max  \
count   24117.000000  2.411700e+04  2.411700e+04  2.411700e+04  2.411700e+04   
mean   149951.696189 -3.016941e-16 -1.508471e-16  4.336853e-16  5.091089e-16   
std     28840.616513  1.000021e+00  1.000021e+00  1.000021e+00  1.000021e+00   
min    100001.000000 -3.581268e+00 -2.310412e+00 -4.306053e+00 -4.537382e+00   
25%    125056.000000 -7.356458e-01 -7.130917e-01 -4.457192e-01 -4.420069e-01   
50%    149947.000000 -1.665215e-01 -1.140965e-01  2.897292e-02  3.738769e-03   
75%    174893.000000  4.811374e-01  3.527051e-01  6.074105e-01  6.084877e-01   
max    199999.000000  8.654906e+00  8.471500e+00  5.177099e+00  5.291518e+00   

          bands_min     bands_max  bicarbonate_min  bicarbonate_max  \
count  2.411700e+04  2.411700e+04     2.411700e+04     2.411700e+04   
mean  -9.8

In [9]:
import psycopg2
import pandas as pd
import numpy as np
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBRegressor

# ================== 数据库配置部分 ==================
# 数据库连接配置
con = psycopg2.connect(
    database="mimiciii",
    user="postgres",
    password="123456",
    host="localhost",
    port="5433"
)

# ================== 修改后的SQL查询 ==================
# 新的阳性样本查询
query_positive = '''
select  distinct m.hadm_id,
				m.spec_itemid,
				m.org_itemid,
				m.isolate_num,
				m.ab_itemid,
				m.dilution_text,
				m.dilution_value,
				p.gender,
				1 AS match_flag
from microbiologyevents m
left join labevents l on l.hadm_id=m.hadm_id
left join diagnoses_icd d on d.hadm_id=m.hadm_id
left join d_icd_diagnoses di on di.icd9_code=d.icd9_code
left join patients p  on p.subject_id = d.subject_id
where l.itemid = 50912
	AND l.valuenum <= 150
	and(di.long_title ilike '% renal fail%' 
	or di.long_title ilike '%kidney fail%'
	or di.long_title ilike '%liver fail%'
	or di.long_title ilike '%spleen fail%')
'''

# 新的阴性样本查询（动态匹配阳性样本数量）
query_negative = '''
WITH positive_hadm AS (
    SELECT DISTINCT d.hadm_id
    FROM diagnoses_icd d
    JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
    WHERE di.long_title ILIKE '% renal fail%' 
        OR di.long_title ILIKE '%kidney fail%'
        OR di.long_title ILIKE '%liver fail%'
        OR di.long_title ILIKE '%spleen fail%'
)
SELECT * FROM (
    SELECT 
        DISTINCT m.hadm_id,
        m.spec_itemid,
        m.org_itemid,
        m.isolate_num,
        m.ab_itemid,
        m.dilution_text,
        m.dilution_value,
        p.gender,
        0 AS match_flag 
    FROM microbiologyevents m
    LEFT JOIN patients p ON p.subject_id = m.subject_id
    WHERE NOT EXISTS (
        SELECT 1 
        FROM positive_hadm ph 
        WHERE ph.hadm_id = m.hadm_id
    )
) AS subquery
ORDER BY RANDOM()
LIMIT (SELECT COUNT(*) FROM positive_hadm);

'''

# ================== 数据获取部分 ==================
def get_dynamic_keep_columns(cursor_description):
    """
    从数据库查询结果中动态提取列名，并确保保留关键列。
    :param cursor_description: 数据库查询结果的description属性
    :return: 动态生成的keep_columns列表
    """
    # 提取所有列名
    all_columns = [desc[0] for desc in cursor_description]
    
    # 确保关键列存在
    required_columns = ['hadm_id', 'match_flag']
    for col in required_columns:
        if col not in all_columns:
            raise ValueError(f"关键列 {col} 不在查询结果中！")
    
    # 动态构建keep_columns
    # 排除不需要的列（可根据需要调整）
    exclude_columns = ['row_id', 'subject_id', 'icustay_id']  # 示例：排除这些列
    keep_columns = [col for col in all_columns if col not in exclude_columns]
    
    return keep_columns

try:
    cur = con.cursor()
    
    # 获取阳性样本
    print("正在获取阳性样本...")
    cur.execute(query_positive)
    positive_columns = [desc[0] for desc in cur.description]  # 提取列名
    positive_df = pd.DataFrame(cur.fetchall(), columns=positive_columns)
    print(f"阳性样本获取完成，共 {len(positive_df)} 条")
    
    # 动态更新阴性样本查询
    query_negative = query_negative.replace('SELECT COUNT(*) FROM positive_hadm', str(len(positive_df)))
    
    # 获取阴性样本
    print("正在获取阴性样本...")
    cur.execute(query_negative)
    negative_columns = [desc[0] for desc in cur.description]  # 提取列名
    negative_df = pd.DataFrame(cur.fetchall(), columns=negative_columns)
    print(f"阴性样本获取完成，共 {len(negative_df)} 条")
    
    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)
    
    # 动态生成keep_columns
    keep_columns = get_dynamic_keep_columns(cur.description)
    print(f"动态保留的列：{keep_columns}")
    
    # 仅保留需要的列
    combined_df = combined_df[keep_columns]
    
    # 保存原始数据
    raw_output = "./data/microbiologyevents_raw.csv"
    combined_df.to_csv(raw_output, index=False)
    print(f"原始数据已保存到 {raw_output}")

except Exception as e:
    print(f"数据库操作失败：{str(e)}")
finally:
    if con:
        cur.close()
        con.close()

# ================== 数据预处理部分 ==================
from sklearn.preprocessing import LabelEncoder
# 读取数据
df = pd.read_csv('./data/microbiologyevents_raw.csv')

# 1. 动态保留列（基于查询结果）
keep_columns = get_dynamic_keep_columns([(col,) for col in df.columns])  # 模拟cursor.description
df = df[keep_columns]

df['dilution_text'] = df['dilution_text'].str.extract(r'([\d\.]+)').astype(float)


# 2. 去除重复记录（基于hadm_id）
df = df.drop_duplicates(subset=['hadm_id'], keep='first')

# 3. 性别编码
df['gender'] = df['gender'].map({'M': 1, 'F': 0}).fillna(-1).astype(int)

# 4. 缺失值处理（保留hadm_id）
impute_cols = [col for col in df.columns if col not in ['hadm_id', 'match_flag','dilution_comparison']]  # 动态选择需要填补的列
imputer = IterativeImputer(
    estimator=XGBRegressor(n_estimators=50, random_state=42),
    max_iter=10,
    random_state=42
)
df[impute_cols] = imputer.fit_transform(df[impute_cols])

# 5. 标准化处理（排除hadm_id和标签列）
features = df.drop(columns=['hadm_id', 'match_flag'])
scaler = StandardScaler().fit(features)
features_scaled = MinMaxScaler().fit_transform(scaler.transform(features))

# 重组最终DataFrame
df_final = pd.DataFrame(features_scaled, columns=features.columns)
df_final = pd.concat([
    df[['hadm_id']].reset_index(drop=True),
    df_final,
    df['match_flag'].reset_index(drop=True)
], axis=1)

# 保存最终数据
final_output = './data/cleaned_microbiologyevents.csv'
df_final.to_csv(final_output, index=False)
print(f"处理后的数据已保存到 {final_output}")
print("数据特征分布：\n", df_final.describe())

# ================== 新增验证部分 ==================
# 验证hadm_id保留情况
assert 'hadm_id' in df_final.columns, "hadm_id列丢失！"
print("\n验证结果：")
print(f"总样本数：{len(df_final)}")
print(f"阳性样本数：{df_final.match_flag.sum()}")
print(f"阴性样本数：{len(df_final) - df_final.match_flag.sum()}")
print(f"唯一hadm_id数量：{df_final.hadm_id.nunique()}")

正在获取阳性样本...
阳性样本获取完成，共 148751 条
正在获取阴性样本...
阴性样本获取完成，共 148751 条
动态保留的列：['hadm_id', 'spec_itemid', 'org_itemid', 'isolate_num', 'ab_itemid', 'dilution_text', 'dilution_value', 'gender', 'match_flag']
原始数据已保存到 ./data/microbiologyevents_raw.csv




处理后的数据已保存到 ./data/cleaned_microbiologyevents.csv
数据特征分布：
              hadm_id   spec_itemid    org_itemid   isolate_num     ab_itemid  \
count   40786.000000  40786.000000  40786.000000  40786.000000  40786.000000   
mean   149950.636689      0.474512      0.361828      0.050977      0.512851   
std     28847.476642      0.361430      0.243739      0.044876      0.206961   
min    100001.000000      0.000000      0.000000      0.000000      0.000000   
25%    124962.250000      0.109890      0.158974      0.036316      0.342567   
50%    150013.500000      0.483516      0.352154      0.037114      0.474153   
75%    174886.750000      0.846154      0.421053      0.060519      0.695523   
max    199999.000000      1.000000      1.000000      1.000000      1.000000   

       dilution_text  dilution_value        gender    match_flag  
count   40786.000000    40786.000000  40786.000000  40786.000000  
mean        0.004159        0.004464      0.548178      0.267567  
std         0.017880

In [2]:
import psycopg2
import pandas as pd
import numpy as np
import joblib
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBRegressor

# ================== 数据库配置部分 ==================
# 数据库连接配置
con = psycopg2.connect(
    database="mimiciii",
    user="postgres",
    password="123456",
    host="localhost",
    port="5433"
)

# ================== 修改后的SQL查询 ==================
# 新的阳性样本查询
query_positive = '''
select  distinct vfd.hadm_id,
				vfd.heartrate_min,vfd.heartrate_max,vfd.heartrate_mean,
				vfd.sysbp_min,vfd.sysbp_max,vfd.sysbp_mean,
				vfd.diasbp_min,vfd.diasbp_max,vfd.diasbp_mean,
				vfd.meanbp_min,vfd.meanbp_max,vfd.meanbp_mean,
				vfd.resprate_min,vfd.resprate_max,vfd.resprate_mean,
				vfd.tempc_min,vfd.tempc_max,vfd.tempc_mean,
				vfd.spo2_min,vfd.spo2_max,vfd.spo2_mean,
				vfd.glucose_min,vfd.glucose_max,vfd.glucose_mean,
				p.gender,
				1 AS match_flag
from vitals_first_day vfd
left join labevents l on l.hadm_id=vfd.hadm_id
left join diagnoses_icd d on d.hadm_id=vfd.hadm_id
left join d_icd_diagnoses di on di.icd9_code=d.icd9_code
left join patients p  on p.subject_id = d.subject_id
where l.itemid = 50912
	AND l.valuenum <= 150
	and(di.long_title ilike '% renal fail%' 
	or di.long_title ilike '%kidney fail%'
	or di.long_title ilike '%liver fail%'
	or di.long_title ilike '%spleen fail%')
'''

# 新的阴性样本查询（动态匹配阳性样本数量）
query_negative = '''
WITH positive_hadm AS (
    SELECT DISTINCT d.hadm_id
    FROM diagnoses_icd d
    JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
    WHERE di.long_title ILIKE '% renal fail%' 
        OR di.long_title ILIKE '%kidney fail%'
        OR di.long_title ILIKE '%liver fail%'
        OR di.long_title ILIKE '%spleen fail%'
)
SELECT * FROM (
select  distinct vfd.hadm_id,
				vfd.heartrate_min,vfd.heartrate_max,vfd.heartrate_mean,
				vfd.sysbp_min,vfd.sysbp_max,vfd.sysbp_mean,
				vfd.diasbp_min,vfd.diasbp_max,vfd.diasbp_mean,
				vfd.meanbp_min,vfd.meanbp_max,vfd.meanbp_mean,
				vfd.resprate_min,vfd.resprate_max,vfd.resprate_mean,
				vfd.tempc_min,vfd.tempc_max,vfd.tempc_mean,
				vfd.spo2_min,vfd.spo2_max,vfd.spo2_mean,
				vfd.glucose_min,vfd.glucose_max,vfd.glucose_mean,
				p.gender,
				0 AS match_flag
from vitals_first_day vfd
    LEFT JOIN patients p ON p.subject_id = vfd.subject_id
    WHERE NOT EXISTS (
        SELECT 1 
        FROM positive_hadm ph 
        WHERE ph.hadm_id = vfd.hadm_id
    )
) AS subquery
ORDER BY RANDOM()
LIMIT (SELECT COUNT(*) FROM positive_hadm);

'''

# ================== 数据获取部分 ==================
def get_dynamic_keep_columns(cursor_description):
    """
    从数据库查询结果中动态提取列名，并确保保留关键列。
    :param cursor_description: 数据库查询结果的description属性
    :return: 动态生成的keep_columns列表
    """
    # 提取所有列名
    all_columns = [desc[0] for desc in cursor_description]
    
    # 确保关键列存在
    required_columns = ['hadm_id', 'match_flag']
    for col in required_columns:
        if col not in all_columns:
            raise ValueError(f"关键列 {col} 不在查询结果中！")
    
    # 动态构建keep_columns
    # 排除不需要的列（可根据需要调整）
    exclude_columns = ['row_id', 'subject_id', 'icustay_id']  # 示例：排除这些列
    keep_columns = [col for col in all_columns if col not in exclude_columns]
    
    return keep_columns

try:
    cur = con.cursor()
    
    # 获取阳性样本
    print("正在获取阳性样本...")
    cur.execute(query_positive)
    positive_columns = [desc[0] for desc in cur.description]  # 提取列名
    positive_df = pd.DataFrame(cur.fetchall(), columns=positive_columns)
    print(f"阳性样本获取完成，共 {len(positive_df)} 条")
    
    # 动态更新阴性样本查询
    query_negative = query_negative.replace('SELECT COUNT(*) FROM positive_hadm', str(len(positive_df)))
    
    # 获取阴性样本
    print("正在获取阴性样本...")
    cur.execute(query_negative)
    negative_columns = [desc[0] for desc in cur.description]  # 提取列名
    negative_df = pd.DataFrame(cur.fetchall(), columns=negative_columns)
    print(f"阴性样本获取完成，共 {len(negative_df)} 条")
    
    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)
    
    # 动态生成keep_columns
    keep_columns = get_dynamic_keep_columns(cur.description)
    print(f"动态保留的列：{keep_columns}")
    
    # 仅保留需要的列
    combined_df = combined_df[keep_columns]
    
    # 保存原始数据
    raw_output = "./data/vitals_first_day_raw.csv"
    combined_df.to_csv(raw_output, index=False)
    print(f"原始数据已保存到 {raw_output}")

except Exception as e:
    print(f"数据库操作失败：{str(e)}")
finally:
    if con:
        cur.close()
        con.close()

# ================== 数据预处理部分 ==================
from sklearn.preprocessing import LabelEncoder
# 读取数据
df = pd.read_csv('./data/vitals_first_day_raw.csv')

# 1. 动态保留列（基于查询结果）
keep_columns = get_dynamic_keep_columns([(col,) for col in df.columns])  # 模拟cursor.description
df = df[keep_columns]

# df['dilution_text'] = df['dilution_text'].str.extract(r'([\d\.]+)').astype(float)


# 2. 去除重复记录（基于hadm_id）
df = df.drop_duplicates(subset=['hadm_id'], keep='first')

# 3. 性别编码
df['gender'] = df['gender'].map({'M': 1, 'F': 0}).fillna(-1).astype(int)

# 4. 缺失值处理（保留hadm_id）
impute_cols = [col for col in df.columns if col not in ['hadm_id', 'match_flag']]  # 动态选择需要填补的列
imputer = IterativeImputer(
    estimator=XGBRegressor(n_estimators=50, random_state=42),
    max_iter=10,
    random_state=42
)
df[impute_cols] = imputer.fit_transform(df[impute_cols])

# 保存插值模型
joblib.dump(imputer, "./models/microbiologyevents_plus_xgb.pkl")
print("插值模型已保存！")

# 5. 标准化处理（排除hadm_id和标签列）
features = df.drop(columns=['hadm_id', 'match_flag'])
scaler = StandardScaler().fit(features)
# 保存标准化模型
scaler_path = "./models/vitals_first_day_standard_scaler.pkl"
joblib.dump(scaler, scaler_path)
print(f"标准化模型已保存至 {scaler_path}")

features_scaled = scaler.transform(features)

# 重组最终DataFrame
df_final = pd.DataFrame(features_scaled, columns=features.columns)
df_final = pd.concat([
    df[['hadm_id']].reset_index(drop=True),
    df_final,
    df['match_flag'].reset_index(drop=True)
], axis=1)

# 保存最终数据
final_output = './data/cleaned_vitals_first_day.csv'
df_final.to_csv(final_output, index=False)
print(f"处理后的数据已保存到 {final_output}")
print("数据特征分布：\n", df_final.describe())

# ================== 新增验证部分 ==================
# 验证hadm_id保留情况
assert 'hadm_id' in df_final.columns, "hadm_id列丢失！"
print("\n验证结果：")
print(f"总样本数：{len(df_final)}")
print(f"阳性样本数：{df_final.match_flag.sum()}")
print(f"阴性样本数：{len(df_final) - df_final.match_flag.sum()}")
print(f"唯一hadm_id数量：{df_final.hadm_id.nunique()}")

正在获取阳性样本...
阳性样本获取完成，共 12637 条
正在获取阴性样本...
阴性样本获取完成，共 12637 条
动态保留的列：['hadm_id', 'heartrate_min', 'heartrate_max', 'heartrate_mean', 'sysbp_min', 'sysbp_max', 'sysbp_mean', 'diasbp_min', 'diasbp_max', 'diasbp_mean', 'meanbp_min', 'meanbp_max', 'meanbp_mean', 'resprate_min', 'resprate_max', 'resprate_mean', 'tempc_min', 'tempc_max', 'tempc_mean', 'spo2_min', 'spo2_max', 'spo2_mean', 'glucose_min', 'glucose_max', 'glucose_mean', 'gender', 'match_flag']
原始数据已保存到 ./data/vitals_first_day_raw.csv




插值模型已保存！
标准化模型已保存至 ./models/vitals_first_day_standard_scaler.pkl
处理后的数据已保存到 ./data/cleaned_vitals_first_day.csv
数据特征分布：
              hadm_id  heartrate_min  heartrate_max  heartrate_mean  \
count   23680.000000   2.368000e+04   2.368000e+04    2.368000e+04   
mean   149928.050127   3.072617e-16  -7.681543e-17   -4.800964e-16   
std     28858.657107   1.000021e+00   1.000021e+00    1.000021e+00   
min    100001.000000  -3.475526e+00  -3.060160e+00   -2.790386e+00   
25%    124811.750000  -6.655692e-01  -7.002555e-01   -6.875893e-01   
50%    149996.000000  -1.670532e-01  -1.496112e-01   -1.671795e-01   
75%    174913.500000   4.000370e-01   5.190283e-01    4.690276e-01   
max    199999.000000   5.311091e+00   5.396164e+00    4.841986e+00   

          sysbp_min     sysbp_max    sysbp_mean    diasbp_min    diasbp_max  \
count  2.368000e+04  2.368000e+04  2.368000e+04  2.368000e+04  2.368000e+04   
mean  -5.185042e-16  2.856574e-16  6.025210e-16 -1.488299e-16 -1.344270e-16   
std    1.00

In [7]:
import psycopg2
import pandas as pd
import joblib
import numpy as np
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBRegressor

# ================== 数据库配置部分 ==================
# 数据库连接配置
con = psycopg2.connect(
    database="mimiciii",
    user="postgres",
    password="123456",
    host="localhost",
    port="5433"
)

# ================== 修改后的SQL查询 ==================
# 新的阳性样本查询
query_positive = '''
select  distinct m.hadm_id,
				m.spec_itemid,
				m.org_itemid,
				m.isolate_num,
				m.ab_itemid,
				m.dilution_text,
				m.dilution_value,
    			uofd.urineoutput,
				p.gender,
				1 AS match_flag
from microbiologyevents m
left join labevents l on l.hadm_id=m.hadm_id
left join urine_output_first_day uofd on uofd.hadm_id = m.hadm_id
left join diagnoses_icd d on d.hadm_id=m.hadm_id
left join d_icd_diagnoses di on di.icd9_code=d.icd9_code
left join patients p  on p.subject_id = d.subject_id
where l.itemid = 50912
	AND l.valuenum <= 150
	and(di.long_title ilike '% renal fail%' 
	or di.long_title ilike '%kidney fail%'
	or di.long_title ilike '%liver fail%'
	or di.long_title ilike '%spleen fail%')
'''

# 新的阴性样本查询（动态匹配阳性样本数量）
query_negative = '''
WITH positive_hadm AS (
    SELECT DISTINCT d.hadm_id
    FROM diagnoses_icd d
    JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
    WHERE di.long_title ILIKE '% renal fail%' 
        OR di.long_title ILIKE '%kidney fail%'
        OR di.long_title ILIKE '%liver fail%'
        OR di.long_title ILIKE '%spleen fail%'
)
SELECT * FROM (
    SELECT 
        DISTINCT m.hadm_id,
        m.spec_itemid,
        m.org_itemid,
        m.isolate_num,
        m.ab_itemid,
        m.dilution_text,
        m.dilution_value,
        uofd.urineoutput,
        p.gender,
        0 AS match_flag 
    FROM microbiologyevents m
    left join urine_output_first_day uofd on uofd.hadm_id = m.hadm_id
    LEFT JOIN patients p ON p.subject_id = m.subject_id
    WHERE NOT EXISTS (
        SELECT 1 
        FROM positive_hadm ph 
        WHERE ph.hadm_id = m.hadm_id
    )
) AS subquery
ORDER BY RANDOM()
LIMIT (SELECT COUNT(*) FROM positive_hadm);

'''

# ================== 数据获取部分 ==================
def get_dynamic_keep_columns(cursor_description):
    """
    从数据库查询结果中动态提取列名，并确保保留关键列。
    :param cursor_description: 数据库查询结果的description属性
    :return: 动态生成的keep_columns列表
    """
    # 提取所有列名
    all_columns = [desc[0] for desc in cursor_description]
    
    # 确保关键列存在
    required_columns = ['hadm_id', 'match_flag']
    for col in required_columns:
        if col not in all_columns:
            raise ValueError(f"关键列 {col} 不在查询结果中！")
    
    # 动态构建keep_columns
    # 排除不需要的列（可根据需要调整）
    exclude_columns = ['row_id', 'subject_id', 'icustay_id']  # 示例：排除这些列
    keep_columns = [col for col in all_columns if col not in exclude_columns]
    
    return keep_columns

try:
    cur = con.cursor()
    
    # 获取阳性样本
    print("正在获取阳性样本...")
    cur.execute(query_positive)
    positive_columns = [desc[0] for desc in cur.description]  # 提取列名
    positive_df = pd.DataFrame(cur.fetchall(), columns=positive_columns)
    print(f"阳性样本获取完成，共 {len(positive_df)} 条")
    
    # 动态更新阴性样本查询
    query_negative = query_negative.replace('SELECT COUNT(*) FROM positive_hadm', str(len(positive_df)))
    
    # 获取阴性样本
    print("正在获取阴性样本...")
    cur.execute(query_negative)
    negative_columns = [desc[0] for desc in cur.description]  # 提取列名
    negative_df = pd.DataFrame(cur.fetchall(), columns=negative_columns)
    print(f"阴性样本获取完成，共 {len(negative_df)} 条")
    
    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)
    
    # 动态生成keep_columns
    keep_columns = get_dynamic_keep_columns(cur.description)
    print(f"动态保留的列：{keep_columns}")
    
    # 仅保留需要的列
    combined_df = combined_df[keep_columns]
    
    # 保存原始数据
    raw_output = "./data/microbiologyevents_plus_raw.csv"
    combined_df.to_csv(raw_output, index=False)
    print(f"原始数据已保存到 {raw_output}")

except Exception as e:
    print(f"数据库操作失败：{str(e)}")
finally:
    if con:
        cur.close()
        con.close()

# ================== 数据预处理部分 ==================
from sklearn.preprocessing import LabelEncoder
# 读取数据
df = pd.read_csv('./data/microbiologyevents_plus_raw.csv')

# 1. 动态保留列（基于查询结果）
keep_columns = get_dynamic_keep_columns([(col,) for col in df.columns])  # 模拟cursor.description
df = df[keep_columns]

df['dilution_text'] = df['dilution_text'].str.extract(r'([\d\.]+)').astype(float)


# 2. 去除重复记录（基于hadm_id）
df = df.drop_duplicates(subset=['hadm_id'], keep='first')

# 3. 性别编码
df['gender'] = df['gender'].map({'M': 1, 'F': 0}).fillna(-1).astype(int)

# 4. 缺失值处理（保留hadm_id）
impute_cols = [col for col in df.columns if col not in ['hadm_id', 'match_flag','dilution_comparison']]  # 动态选择需要填补的列
imputer = IterativeImputer(
    estimator=XGBRegressor(n_estimators=50, random_state=42),
    max_iter=10,
    random_state=42
)
df[impute_cols] = imputer.fit_transform(df[impute_cols])

# 保存插值模型
joblib.dump(imputer, "./models/microbiologyevents_plus_xgb.pkl")
print("插值模型已保存！")

# 5. 标准化处理（排除hadm_id和标签列）
features = df.drop(columns=['hadm_id', 'match_flag'])
scaler = StandardScaler().fit(features)
# 保存标准化模型
scaler_path = "./models/microbiologyevents_plus_standard_scaler.pkl"
joblib.dump(scaler, scaler_path)
print(f"标准化模型已保存至 {scaler_path}")

features_scaled = scaler.transform(features)

# 重组最终DataFrame
df_final = pd.DataFrame(features_scaled, columns=features.columns)
df_final = pd.concat([
    df[['hadm_id']].reset_index(drop=True),
    df_final,
    df['match_flag'].reset_index(drop=True)
], axis=1)

# 保存最终数据
final_output = './data/cleaned_microbiologyevents_plus.csv'
df_final.to_csv(final_output, index=False)
print(f"处理后的数据已保存到 {final_output}")
print("数据特征分布：\n", df_final.describe())

# ================== 新增验证部分 ==================
# 验证hadm_id保留情况
assert 'hadm_id' in df_final.columns, "hadm_id列丢失！"
print("\n验证结果：")
print(f"总样本数：{len(df_final)}")
print(f"阳性样本数：{df_final.match_flag.sum()}")
print(f"阴性样本数：{len(df_final) - df_final.match_flag.sum()}")
print(f"唯一hadm_id数量：{df_final.hadm_id.nunique()}")

正在获取阳性样本...
阳性样本获取完成，共 175293 条
正在获取阴性样本...
阴性样本获取完成，共 175293 条
动态保留的列：['hadm_id', 'spec_itemid', 'org_itemid', 'isolate_num', 'ab_itemid', 'dilution_text', 'dilution_value', 'urineoutput', 'gender', 'match_flag']
原始数据已保存到 ./data/microbiologyevents_plus_raw.csv




插值模型已保存！
标准化模型已保存至 ./models/microbiologyevents_plus_standard_scaler.pkl
处理后的数据已保存到 ./data/cleaned_microbiologyevents_plus.csv
数据特征分布：
              hadm_id   spec_itemid    org_itemid   isolate_num     ab_itemid  \
count   41765.000000  4.176500e+04  4.176500e+04  4.176500e+04  4.176500e+04   
mean   149927.860840 -9.032748e-14  3.831300e-14 -6.018305e-16  8.359753e-14   
std     28887.730593  1.000012e+00  1.000012e+00  1.000012e+00  1.000012e+00   
min    100001.000000 -1.318042e+00 -1.730640e+00 -1.407520e+00 -3.267225e+00   
25%    124925.000000 -1.014602e+00 -6.531906e-01 -2.777443e-01 -6.116099e-01   
50%    149918.000000  4.743961e-02  7.943077e-02 -2.202983e-01  2.764951e-02   
75%    174934.000000  1.018449e+00  3.356062e-01 -2.106527e-02  5.996970e-01   
max    199999.000000  1.443266e+00  3.281220e+00  2.140600e+01  2.946137e+00   

       dilution_text  dilution_value   urineoutput        gender    match_flag  
count   4.176500e+04    4.176500e+04  4.176500e+04  4.176500e+0

In [6]:
import psycopg2
import pandas as pd
import numpy as np
import joblib
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor

# ================== 数据库配置部分 ==================
# 数据库连接配置
con = psycopg2.connect(
    database="mimiciii",
    user="postgres",
    password="123456",
    host="localhost",
    port="5433"
)

# ================== 修改后的SQL查询 ==================
# 新的阳性样本查询
query_positive = '''
select  distinct lfd.*,
				vfd.hadm_id,
				vfd.heartrate_min,vfd.heartrate_max,vfd.heartrate_mean,
				vfd.sysbp_min,vfd.sysbp_max,vfd.sysbp_mean,
				vfd.diasbp_min,vfd.diasbp_max,vfd.diasbp_mean,
				vfd.meanbp_min,vfd.meanbp_max,vfd.meanbp_mean,
				vfd.resprate_min,vfd.resprate_max,vfd.resprate_mean,
				vfd.tempc_min,vfd.tempc_max,vfd.tempc_mean,
				vfd.spo2_min,vfd.spo2_max,vfd.spo2_mean,
				vfd.glucose_min,vfd.glucose_max,vfd.glucose_mean,
				p.gender,
				1 AS match_flag
from vitals_first_day vfd
left join labs_first_day lfd on lfd.hadm_id = vfd.hadm_id
left join labevents l on l.hadm_id=vfd.hadm_id
left join diagnoses_icd d on d.hadm_id=vfd.hadm_id
left join d_icd_diagnoses di on di.icd9_code=d.icd9_code
left join patients p  on p.subject_id = d.subject_id
where l.itemid = 50912
	AND l.valuenum <= 150
	and(di.long_title ilike '% renal fail%' 
	or di.long_title ilike '%kidney fail%'
	or di.long_title ilike '%liver fail%'
	or di.long_title ilike '%spleen fail%')
'''

# 新的阴性样本查询（动态匹配阳性样本数量）
query_negative = '''
WITH positive_hadm AS (
    SELECT DISTINCT d.hadm_id
    FROM diagnoses_icd d
    JOIN d_icd_diagnoses di ON di.icd9_code = d.icd9_code
    WHERE di.long_title ILIKE '% renal fail%'
        OR di.long_title ILIKE '%kidney fail%'
        OR di.long_title ILIKE '%liver fail%'
        OR di.long_title ILIKE '%spleen fail%'
)
SELECT
    lfd.*,
    vfd.hadm_id,
    vfd.heartrate_min,vfd.heartrate_max,vfd.heartrate_mean,
    vfd.sysbp_min,vfd.sysbp_max,vfd.sysbp_mean,
    vfd.diasbp_min,vfd.diasbp_max,vfd.diasbp_mean,
    vfd.meanbp_min,vfd.meanbp_max,vfd.meanbp_mean,
    vfd.resprate_min,vfd.resprate_max,vfd.resprate_mean,
    vfd.tempc_min,vfd.tempc_max,vfd.tempc_mean,
    vfd.spo2_min,vfd.spo2_max,vfd.spo2_mean,
    vfd.glucose_min,vfd.glucose_max,vfd.glucose_mean,
    p.gender,
    0 AS match_flag
FROM vitals_first_day vfd
LEFT JOIN labs_first_day lfd on lfd.hadm_id = vfd.hadm_id
LEFT JOIN patients p ON p.subject_id = lfd.subject_id
WHERE NOT EXISTS (
    SELECT 1
    FROM positive_hadm ph
    WHERE ph.hadm_id = lfd.hadm_id
)
ORDER BY RANDOM()
LIMIT (SELECT COUNT(*) FROM positive_hadm);
'''


#================== 数据获取部分 ==================
def get_dynamic_keep_columns(cursor_description):
    """
    从数据库查询结果中动态提取列名，并确保保留关键列。
    :param cursor_description: 数据库查询结果的description属性
    :return: 动态生成的keep_columns列表
    """
    # 提取所有列名
    all_columns = [desc[0] for desc in cursor_description]

    # 确保关键列存在
    required_columns = ['hadm_id', 'match_flag']
    for col in required_columns:
        if col not in all_columns:
            raise ValueError(f"关键列 {col} 不在查询结果中！")

    # 动态构建keep_columns
    # 排除不需要的列（可根据需要调整）
    exclude_columns = ['row_id', 'subject_id', 'icustay_id']  # 示例：排除这些列
    keep_columns = [col for col in all_columns if col not in exclude_columns]

    return keep_columns


try:
    cur = con.cursor()

    # 获取阳性样本
    print("正在获取阳性样本...")
    cur.execute(query_positive)
    positive_columns = [desc[0] for desc in cur.description]  # 提取列名
    positive_df = pd.DataFrame(cur.fetchall(), columns=positive_columns)
    print(f"阳性样本获取完成，共 {len(positive_df)} 条")

    # 动态更新阴性样本查询
    query_negative = query_negative.replace('SELECT COUNT(*) FROM positive_hadm', str(len(positive_df)))

    # 获取阴性样本
    print("正在获取阴性样本...")
    cur.execute(query_negative)
    negative_columns = [desc[0] for desc in cur.description]  # 提取列名
    negative_df = pd.DataFrame(cur.fetchall(), columns=negative_columns)
    print(f"阴性样本获取完成，共 {len(negative_df)} 条")

    # 合并数据
    combined_df = pd.concat([positive_df, negative_df], ignore_index=True)

    # 动态生成keep_columns
    keep_columns = get_dynamic_keep_columns(cur.description)
    print(f"动态保留的列：{keep_columns}")

    # 仅保留需要的列
    combined_df = combined_df[keep_columns]

    # 保存原始数据
    raw_output = "./data/labs_first_day_plus_vitals_first_day_raw.csv"
    combined_df.to_csv(raw_output, index=False)
    print(f"原始数据已保存到 {raw_output}")

except Exception as e:
    print(f"数据库操作失败：{str(e)}")
finally:
    if con:
        cur.close()
        con.close()

# ================== 数据预处理部分 ==================
# 读取数据
df = pd.read_csv('./data/labs_first_day_plus_vitals_first_day_raw.csv')

# 1. 动态保留列（基于查询结果）
keep_columns = get_dynamic_keep_columns([(col,) for col in df.columns])  # 模拟cursor.description
df = df[keep_columns]

# 2. 去除重复记录（基于hadm_id）
df = df.drop_duplicates(subset=['hadm_id'], keep='first')

# 3. 性别编码
df['gender'] = df['gender'].map({'M': 1, 'F': 0}).fillna(-1).astype(int)

# 4. 缺失值处理（保留hadm_id）
impute_cols = [col for col in df.columns if col not in ['hadm_id', 'match_flag']]  # 动态选择需要填补的列
imputer = IterativeImputer(
    estimator=LGBMRegressor(n_estimators=50, random_state=42),
    max_iter=10,
    random_state=42
)
df[impute_cols] = imputer.fit_transform(df[impute_cols])

# 保存插值模型
joblib.dump(imputer, "./models/labs_first_day_plus_vitals_first_day_lgbm.pkl")
print("插值模型已保存！")


# 5. 标准化处理（排除hadm_id和标签列）
features = df.drop(columns=['hadm_id', 'match_flag'])
scaler = StandardScaler().fit(features)
# 保存标准化模型
scaler_path = "./models/labs_first_day_plus_vitals_first_day_standard_scaler.pkl"
joblib.dump(scaler, scaler_path)
print(f"标准化模型已保存至 {scaler_path}")


# features_scaled = MinMaxScaler().fit_transform(scaler.transform(features))
features_scaled = scaler.transform(features)

# 重组最终DataFrame
df_final = pd.DataFrame(features_scaled, columns=features.columns)
df_final = pd.concat([
    df[['hadm_id']].reset_index(drop=True),
    df_final,
    df['match_flag'].reset_index(drop=True)
], axis=1)

# 保存最终数据
final_output = './data/cleaned_labs_first_day_plus_vitals_first_day_raw.csv'
df_final.to_csv(final_output, index=False)
print(f"处理后的数据已保存到 {final_output}")
print("数据特征分布：\n", df_final.describe())

# ================== 新增验证部分 ==================
# 验证hadm_id保留情况
assert 'hadm_id' in df_final.columns, "hadm_id列丢失！"
print("\n验证结果：")
print(f"总样本数：{len(df_final)}")
print(f"阳性样本数：{df_final.match_flag.sum()}")
print(f"阴性样本数：{len(df_final) - df_final.match_flag.sum()}")
print(f"唯一hadm_id数量：{df_final.hadm_id.nunique()}")

正在获取阳性样本...
阳性样本获取完成，共 16127 条
正在获取阴性样本...
阴性样本获取完成，共 16127 条
动态保留的列：['hadm_id', 'aniongap_min', 'aniongap_max', 'albumin_min', 'albumin_max', 'bands_min', 'bands_max', 'bicarbonate_min', 'bicarbonate_max', 'bilirubin_min', 'bilirubin_max', 'creatinine_min', 'creatinine_max', 'chloride_min', 'chloride_max', 'glucose_min', 'glucose_max', 'hematocrit_min', 'hematocrit_max', 'hemoglobin_min', 'hemoglobin_max', 'lactate_min', 'lactate_max', 'platelet_min', 'platelet_max', 'potassium_min', 'potassium_max', 'ptt_min', 'ptt_max', 'inr_min', 'inr_max', 'pt_min', 'pt_max', 'sodium_min', 'sodium_max', 'bun_min', 'bun_max', 'wbc_min', 'wbc_max', 'hadm_id', 'heartrate_min', 'heartrate_max', 'heartrate_mean', 'sysbp_min', 'sysbp_max', 'sysbp_mean', 'diasbp_min', 'diasbp_max', 'diasbp_mean', 'meanbp_min', 'meanbp_max', 'meanbp_mean', 'resprate_min', 'resprate_max', 'resprate_mean', 'tempc_min', 'tempc_max', 'tempc_mean', 'spo2_min', 'spo2_max', 'spo2_mean', 'glucose_min', 'glucose_max', 'glucose_mea