In [None]:
# 导入需要的包
import dxpy
import pandas as pd
import subprocess
import glob
import os
import pyspark
from pyspark.sql import SQLContext

# ======================================================================
# 第一步：在这里定义您需要提取的所有表型ID
# 这也是您唯一需要修改的地方！
# ======================================================================
target_field_ids = ['31', '34', '52'] + [str(i) for i in range(22000, 22020)] # 示例ID，请替换为您自己的列表
print(f"目标：提取 {len(target_field_ids)} 个表型ID的数据。")


# ======================================================================
# 第二步：自动获取数据集信息和数据字典 (与蛋白质脚本相同)
# ======================================================================
print("\n--- 正在自动查找数据集信息... ---")
# 自动发现数据集ID和项目ID
dispensed_dataset = dxpy.find_one_data_object(
    typename="Dataset", name="app*.dataset", folder="/", name_mode="glob"
)
project_id = dxpy.find_one_project()["id"]
dataset = f"{project_id}:{dispensed_dataset['id']}"

# 使用-ddd命令仅下载数据字典
print("--- 正在下载数据字典... ---")
cmd = ["dx", "extract_dataset", dataset, "-ddd", "--delimiter", ","]
subprocess.check_call(cmd)

# 读取数据字典
path = os.getcwd()
data_dict_csv = glob.glob(os.path.join(path, "*.data_dictionary.csv"))[0]
data_dict_df = pd.read_csv(data_dict_csv)
print("数据字典下载并读取完毕。")


# ======================================================================
# 第三步：根据您定义的ID列表，从字典中筛选出完整的字段名
# ======================================================================
print("\n--- 正在从字典中筛选您指定的表型字段... ---")
# 注意：UKB数据字典中的Field ID是整数类型，先将我们的目标ID转换为整数
target_field_ids_int = [int(i) for i in target_field_ids]

# 从字典中筛选出field_id在我们目标列表中的行
pheno_fields_df = data_dict_df[data_dict_df["field_id"].isin(target_field_ids_int)]

# 构造用于查询的字段名 (格式为 "entity.name")
# 同时加入'eid'字段作为参与者ID
field_names_str = ['participant.eid'] + [
    f"{row['entity']}.{row['name']}" for index, row in pheno_fields_df.iterrows()
]
field_names_query = ",".join(field_names_str)
print(f"成功匹配到 {len(field_names_str) - 1} 个字段名，准备提取。")


# ======================================================================
# 第四步：生成SQL并使用Spark高效提取数据 (与蛋白质脚本相同)
# ======================================================================
print("\n--- 正在生成SQL查询并使用Spark提取数据... ---")
# 初始化Spark
conf = pyspark.SparkConf().set("spark.kryoserializer.buffer.max", "1024m")
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
sqlContext = SQLContext(sc)

# 使用 --sql 参数生成查询语句
cmd = [
    "dx", "extract_dataset", dataset,
    "--fields", field_names_query,
    "--delimiter", ",",
    "--output", "extracted_phenotypes.sql", # 输出的SQL文件名
    "--sql",
]
subprocess.check_call(cmd)

# 读取SQL文件并用Spark执行
with open("extracted_phenotypes.sql", "r") as file:
    retrieve_sql = "".join([line.strip() for line in file])

temp_df = spark.sql(retrieve_sql.strip(";"))

# 转换为Pandas DataFrame
pheno_pdf = temp_df.toPandas()
print("--- 数据提取完毕！---")
print(f"成功提取到 {pheno_pdf.shape[0]} 行, {pheno_pdf.shape[1]} 列的数据。")
print("数据预览:")
print(pheno_pdf.head())


# ======================================================================
# 第五步：保存并上传结果
# ======================================================================
# 定义输出文件名和上传路径
output_filename = 'my_phenotypes.csv'
output_upload_path = '/my_results/' # 您可以修改为您想要的文件夹

# 写出文件
print(f"\n--- 正在将数据保存为 {output_filename}... ---")
pheno_pdf.to_csv(output_filename, sep='\t', na_rep='NA', index=False, quoting=3)

# 上传到RAP项目文件夹中
print(f"--- 正在上传文件到 {output_upload_path}... ---")
# 使用subprocess调用dx upload，比%%bash更通用
upload_cmd = ["dx", "upload", output_filename, "-p", "--path", output_upload_path, "--brief"]
subprocess.check_call(upload_cmd)
print("--- 脚本执行完毕！---")