In [44]:
# coding:utf-8
import pandas as pd
import numpy as np
from pathlib import Path
from collections import OrderedDict
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import psutil
import time

# 读取traits_df
traits_df = pd.read_csv('ukb_traits.csv', encoding='latin1', quotechar='"')
# 定义文件路径
file_path = "ukb43164.csv"
output_file = "output_data.csv"  # 输出文件
outfile = Path('tmp_out.csv')


# 自定义函数去掉浮点数中的 .0
def convert_and_strip(value):
    if pd.isna(value):  # 处理 NaN
        return 'nan'
    if isinstance(value, float) and value.is_integer():
        return str(int(value))  # 浮点数如果是整数，则转换为整数
    return str(value).strip()
    
traits_df = traits_df.map(convert_and_strip)

def check_row(apply_row, df):
    # 将所有数据转换为字符串
    value_types = df['value_type'].unique()
    field_ids = df['field_id'].unique()
    field_id = field_ids[0]
    
    if not len(field_ids) == 1:
        print(f"field_ids have more than one value: {field_ids}")
        raise KeyError
    
    out_dict = OrderedDict()
    value_type = value_types[0]
    array_values = apply_row.to_list()
    
    if value_type == '22':  # 多选题
        if field_id not in ['20002', '20001']:
            # df 是trait df, 这样筛选出来, 21只会筛选1个, 22会多个,  多选题只保留回答的选项, 空值不保留. 相对位置保持和df中预定一致. 每个回答匹配的array就作为一个col
            # 如果有匹配到值, 每个匹配的值就是输出文件的一个新的列, 如果没有匹配到值, 就是返回空表
            matched_value_df = df[df['value'].isin(array_values)] 
            count = 0 
            for index, row in matched_value_df.iterrows():
                col = row['trait'] + '_d' + apply_row.index[0] + '-' + str(count)  # 每个匹配到值的array给一个统一的名, 方便不同值的array紧凑组合.
                out_dict[col] = row['token_id']
                count += 1
        else:
            if len(apply_row.unique()) > 1:  # 对于全部都是nan的选项就不考虑赋值了
                # 对['20002', '20001'], 这两种记录所有的选项, 即便是没有匹配到值的选项也记录.
                for index, row in df.iterrows():
                    choice_value = row['value']
                    col = row['trait'] + '_d' + apply_row.index[0]  + '-' + choice_value  # apply_row.index[0]]用来识别读取的是原始数据哪里
                    selected_apply_row = apply_row[apply_row == choice_value]
                    if selected_apply_row.empty:
                        # 没匹配到预设的疾病, 那么这个疾病在这个患者种就是空的, 用not的token id
                        out_dict[col] = df[(df['value'] == row['value']) & (df['token'].str.contains('_"not"'))]['token_id'].iat[0]
                    else:
                        out_dict[col] = df[(df['value'] == row['value']) & (df['token'].str.contains('_"yes"'))]['token_id'].iat[0]
    else:  # value type 11, 31, 21
        matched_value_df = df[df['value'].isin(array_values)] 
        col = df['trait'].iat[0] + '_d' + apply_row.index[0] # 这三种类型只能有一个column

        if matched_value_df.empty:
            # 没有匹配到任何预定值情况下, 应该就是连续值了
            if 'Q1' in df['value'].values:
                # 用mean统一所有连续值
                value = float(apply_row.astype(float).mean())
                
                for q in ['Q1', 'Q2', 'Q3', 'Q4']:
                    matched_value_df = df[df['value'] == q]
                    if value <= float(matched_value_df['meaning'].iat[0]):
                        out_dict[col] = matched_value_df['token_id'].iat[0]
                        break
            else:
                out_dict[col] = np.nan  # 这个患者没回答任何值, 本身是空值.
        else:
            out_dict[col] = matched_value_df['token_id'].iat[0]


    out_s = pd.Series(out_dict)
    return out_s

def process_chunk(chunk_index, chunk_df, only_traits_df):
    out_chunk_df = pd.DataFrame({'eid': chunk_df['eid']})
    missing_field = []
    for row_index, row in enumerate(only_traits_df.itertuples(), 1):
        #if row_index>300:
        #    break
        instance_min = row.instance_min
        field_id = row.field_id
        try:
            selected_columns_chunk_df = chunk_df[[f'{row.field_id}-{instance_min}.{array}' for array in range(int(row.array_min), int(row.array_max) + 1)]]
        except KeyError:
            missing_field.append(f'{row.field_id}-{instance_min}')
            continue
        if traits_df[traits_df['field_id'] == row.field_id]['meaning'].iat[0] in ['Q1', 'Q2', 'Q3', 'Q4']:
            continue
        if not selected_columns_chunk_df.empty:
            token_trait_df = selected_columns_chunk_df.apply(check_row, axis=1, args=(traits_df[traits_df['field_id'] == row.field_id],))
            out_chunk_df = pd.concat([out_chunk_df, token_trait_df], axis=1, ignore_index=False)
        else:
            print(f"No columns match for {row.field_id}-{instance_min} in chunk {chunk_index + 1}")

    
    # 保存到CSV，第一个chunk使用写模式，之后使用追加模式
    print('Save data chunk_index=', chunk_index)
    out_chunk_df.to_csv(outfile, mode='a', header=True, index=False)        
    return out_chunk_df, missing_field
    

def submit_with_resource_limit(executor, func, *args, max_workers, cpu_threshold=80, mem_threshold=80, sleep_interval=10, **kwargs):
    while True:
        if len(executor._pending_work_items) < max_workers:
            cpu_usage = psutil.cpu_percent()
            mem_usage = psutil.virtual_memory().percent

            if cpu_usage < cpu_threshold and mem_usage < mem_threshold:
                return executor.submit(func, *args, **kwargs)
            else:
                print(f"High resource usage detected: CPU {cpu_usage}%, Memory {mem_usage}%. Waiting for resources to be available...")
                time.sleep(sleep_interval)
        else:
            print("Max workers limit reached. Waiting for an available slot...")
            time.sleep(sleep_interval)

# 读取数据并处理
chunksize = 2
only_traits_df = traits_df[['field_id', 'value_type', 'instance_min', 'array_min', 'array_max', 'private']].drop_duplicates()
only_traits_df = only_traits_df[only_traits_df['private'] == '0']

max_workers = 15  # 设置最大并行线程数

with ProcessPoolExecutor(max_workers=max_workers) as executor:
    outfile.unlink(missing_ok=True)
    futures = []
    for chunk_index, chunk_df in enumerate(pd.read_csv(file_path, chunksize=chunksize, encoding='latin1', low_memory=False, quotechar='"')):
        #if chunk_index >10:
        #    break
        chunk_df = chunk_df.map(convert_and_strip)
        print('chunk_index=', chunk_index)
        #futures.append(submit_with_resource_limit(executor, process_chunk, chunk_index, chunk_df, only_traits_df))
        future = submit_with_resource_limit(executor, process_chunk, chunk_index, chunk_df, only_traits_df, max_workers=max_workers)
        futures.append(future)
    
    out_df = pd.DataFrame()
    all_missing_fields = []

    n_future = 0
    for future in as_completed(futures):
        out_chunk_df, missing_field = future.result()
        print('n_future=', n_future)
        n_future += 1
        out_df = pd.concat([out_df, out_chunk_df], ignore_index=True)
        all_missing_fields.extend(missing_field)
    
    out_df.to_csv(output_file, mode='w', header=True, index=False)
    pd.DataFrame({'missing_field_id': all_missing_fields}).to_csv('missing_field_id.csv', mode='w', header=True, index=False)

print("Processing complete.")


  traits_df = pd.read_csv('ukb_traits.csv', encoding='latin1', quotechar='"')


chunk_index= 0
chunk_index= 1
chunk_index= 2
Save data chunk_index= 0
chunk_index= 3
chunk_index= 4
Save data chunk_index= 1
Save data chunk_index= 2
chunk_index= 5
chunk_index= 6
Save data chunk_index= 3
Save data chunk_index= 4
chunk_index= 7
Save data chunk_index= 5
chunk_index= 8
chunk_index= 9
Save data chunk_index= 6
chunk_index= 10
n_future= 0
n_future= 1
Save data chunk_index= 7
n_future= 2
n_future= 3
n_future= 4
n_future= 5
n_future= 6
n_future= 7
Save data chunk_index= 8
n_future= 8
Save data chunk_index= 9
n_future= 9
Save data chunk_index= 10
n_future= 10
Processing complete.
