### author by yangshichen
### 注意：脚本仅供参考，使用前请仔细阅读

In [5]:
import os
import pandas as pd
import subprocess
from multiprocessing import Pool
import json
from collections import defaultdict

### 配置数据

In [3]:
# 生成gene_chr_rename
gtf = pd.read_csv('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/genetics/gene_annotation.txt', sep='\t')
gtf["chr"] = gtf["chr"].astype(str).str.replace("^chr", "", regex=True)
gene_chr = dict(zip(gtf['gene_id'], gtf['chr']))
with open('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/gene_chr_rename.json', 'w') as f:
    json.dump(gene_chr, f)

In [30]:
# 生成region_file
gtf = pd.read_csv('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/genetics/gene_annotation.txt', sep='\t')
gtf["chr"] = gtf["chr"].astype(str).str.replace("^chr", "", regex=True)
gtf.columns = ["gene", "chrom", "start", "end"]

# 转换 start 和 end 为整数
gtf["start"] = gtf["start"].astype(int)
gtf["end"] = gtf["end"].astype(int)

# 输出目录
outdir = "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/region_file"
os.makedirs(outdir, exist_ok=True)

for _, row in gtf.iterrows():
    gene = row["gene"]
    chrom = row["chrom"]
    start = row["start"]

    start_new = max(0, start - 1000000)  # 避免负数
    end_new = start + 1000000

    outpath = os.path.join(outdir, f"{gene}.txt")
    with open(outpath, "w") as f:
        f.write(f"{chrom}\t{start_new}\t{end_new}\n")

In [5]:
# 生成celltype_gene_rename
phenofile_dir = '/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/phenofile/'
cell_type_genes = {}  # 存放每个细胞类型对应的基因列表

for file in os.listdir(phenofile_dir):
    if file.endswith('.txt'):
        file_path = os.path.join(phenofile_dir, file)
        cell_type_name = os.path.splitext(file)[0]  # 文件名去掉后缀作为细胞类型
        
        # 读取文件
        df = pd.read_csv(file_path, sep='\t')
        
        # 去掉前 18 列，取剩余列名作为基因
        gene_list = list(df.iloc[:, 18:].columns)
        
        # 保存每个细胞类型的基因列表
        cell_type_genes[cell_type_name] = gene_list

# 保存 JSON
with open('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/celltype_gene_rename.json', 'w') as f:
    json.dump(cell_type_genes, f)

In [35]:
# 生成subcelltype_gene_rename
# 读取之前生成的小细胞类型基因字典
with open('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/celltype_gene_rename.json', 'r') as f:
    cell_type_genes = json.load(f)

output_dir = '/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/'

# 定义每个小细胞类型对应的大类
# 可以根据你实际的小细胞类型命名规则填写
celltype_to_major = {"CD4_Naive_T-CCR7":"CD4+ T","CD4_Naive_T-SOX4":"CD4+ T","CD4_Tcm-GPR183":"CD4+ T",
                     "CD4_Tcm-SOX4":"CD4+ T","CD4_Tcm-CXCR5":"CD4+ T","CD4_Tcm-IFIT3":"CD4+ T",
                     "CD4_Tem-CCR7neg":"CD4+ T","CD4_Tfh_like-CXCR5":"CD4+ T","CD4_Th-TNFRSF11A":"CD4+ T",
                     "CD4_Th1-GZMK":"CD4+ T","CD4_Th17-RORC":"CD4+ T","CD4_Th22-CCR10":"CD4+ T",
                     "CD4_Treg-FCRL3":"CD4+ T","CD4_Treg-FOXP3":"CD4+ T","CD8_Naive_T-CCR7":
                     "CD8+ T & unconvensional T","CD8_Tcm-GPR183":"CD8+ T & unconvensional T",
                     "CD8_Tcm-IFI44L":"CD8+ T & unconvensional T","CD8_Tcm-GZMK":"CD8+ T & unconvensional T",
                     "CD8_Tem-GZMK":"CD8+ T & unconvensional T","CD8_CTL-GZMK":"CD8+ T & unconvensional T",
                     "CD8_CTL-GZMB":"CD8+ T & unconvensional T","MAIT-SLC4A10":"CD8+ T & unconvensional T",
                     "gdT2-GZMK":"CD8+ T & unconvensional T","gdT2-GZMH":"CD8+ T & unconvensional T",
                     "gdT2-IL12RB2":"CD8+ T & unconvensional T","Cycling_T-MKI67":"CD8+ T & unconvensional T",
                     "NKT-NCR1":"CD8+ T & unconvensional T","NKT-IFIT3":"CD8+ T & unconvensional T",
                     "NK_bright-XCL1":"NK","Transitional_NK-GZMK":"NK","Mature_NK_dim-FCGR3A":"NK",
                     "Inflamed_NK_dim-IFI44L":"NK","Terminal_NK_dim-CD160neg":"NK","cMono-CD14":"Myeloid",
                     "cMono-IFI44L":"Myeloid","cMono-IL1B":"Myeloid","ncMono-FCGR3A":"Myeloid",
                     "ncMono-IFI44L":"Myeloid","ncMono-IFIT1":"Myeloid","ncMono-C1QA":"Myeloid",
                     "cDC1-BATF3":"Myeloid","cDC2-CD1C":"Myeloid","pDC-LILRA4":"Myeloid","pDC-AXL":"Myeloid",
                     "Naive_B-TCL1A":"B","Naive_B-IL6":"B","Naive_B-IFIT3":"B","Transitional_B-NEIL1":"B",
                     "Aptypical_Memory_B-ITGAX":"B","Unswitched_Memory_B-CD1C":"B","Unswitched_Memory_B-IFIT3":"B",
                     "Unswitched_Memory_B-JAM3":"B","Switched_Memory_B-CD27":"B","Switched_Memory_B-CD86":"B",
                     "Switched_Memory_B-IGHE":"B","Plasma_B-IGHA1":"B","Plasma_B-IGHG1":"B",
                     "Plasmablast-MKI67":"B"}

# 初始化大类字典
major_type_genes = defaultdict(dict)
for cell_type, genes in cell_type_genes.items():
    major_type = celltype_to_major.get(cell_type)
    if major_type:
        major_type_genes[major_type][cell_type] = genes

# 为每个大类生成单独的 JSON 文件
for major_type, sub_dict in major_type_genes.items():
    output_file = os.path.join(output_dir, f"{major_type}.json")
    with open(output_file, 'w') as f:
        json.dump(sub_dict, f, indent=2)
    print(f"生成 JSON 文件: {output_file}")

生成 JSON 文件: /media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/CD8+ T & unconvensional T.json
生成 JSON 文件: /media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/NK.json
生成 JSON 文件: /media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/B.json
生成 JSON 文件: /media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/CD4+ T.json
生成 JSON 文件: /media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/Myeloid.json


### 配置参数 （Step 1 and Step 2）

In [None]:
PLINK_FILE = "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/genetics/HIV_snp"
BASE_OUTPUT_DIR = "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/result/"
REGION_FILE_DIR = "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/region_file_ysc"
PHENO_INPUT_DIR = "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/phenofile"
GENES_PER_BATCH = 80  # 每个步骤并行处理的基因数

In [None]:
with open('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/gene_chr_rename.json', 'r') as f:
    gene_chr = json.load(f)
with open('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/CD4+ T.json', 'r') as f:
    cell_type_genes_dict = json.load(f)

In [None]:
def run_step1(args):
    """
    并行执行step1 - 拟合NULL模型
    args: (cell_type, gene, pheno_file_path)
    """
    cell_type, gene, pheno_file = args
    try:
        # 创建输出目录
        cell_type_dir = os.path.join(BASE_OUTPUT_DIR, cell_type)
        gene_dir = os.path.join(cell_type_dir, gene)
        os.makedirs(gene_dir, exist_ok=True)
        
        # 第一步：拟合NULL模型
        step1_prefix = os.path.join(gene_dir, f"{gene}_null_model")
        step1_cmd = [
            "/home/yangshichen/mambaforge/envs/R/bin/Rscript", "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/script/step1_fitNULLGLMM_qtl.R",
            "--useSparseGRMtoFitNULL=FALSE", #没有强烈的群体结构
            "--useGRMtoFitNULL=FALSE", #没有强烈的群体结构
            f"--phenoFile={pheno_file}", #表型文件，行是细胞或样本，列包括表型（基因表达）、协变量等
            f"--phenoCol={gene}", #表型列名，就是当前要测试的基因的表达量
            "--covarColList=age,treat_time,geno_PC1,geno_PC2,pct_counts_mt,PC_1,PC_2,PC_3,PC_4,PC_5", #细胞级别的协变量
            "--sampleCovarColList=age,treat_time,geno_PC1,geno_PC2", #样本级别的协变量
            "--offsetCol=log_total_counts", #使用每个单元的总读取计数的对数作为偏移量的选项
            "--sampleIDColinphenoFile=orig.ident", #样本 ID 列
            "--cellIDColinphenoFile=cell_barcode", #细胞条形码列
            "--traitType=count", #表型是 count data (单细胞基因表达计数)
            f"--outputPrefix={step1_prefix}", #输出表头
            "--skipVarianceRatioEstimation=FALSE", #是否跳过方差比估计
            "--isRemoveZerosinPheno=FALSE", #是否在建模前去掉表型为 0 的细胞
            "--isCovariateOffset=FALSE", #协变量不是 offset，而是作为固定效应
            "--isCovariateTransform=TRUE", #对协变量进行标准化或变换
            "--skipModelFitting=FALSE", #不跳过模型拟合
            "--tol=0.00001", #模型收敛容忍度，越小模型收敛要求越严格
            f"--plinkFile={PLINK_FILE}", #基因型输入文件（plink 格式 .bed/.bim/.fam），用于后续 QTL 关联测试
            "--IsOverwriteVarianceRatioFile=TRUE" #如果已有 varianceRatio 文件，允许覆盖
        ]
        
        print(f"处理 {cell_type} - {gene}: 第一步拟合NULL模型")
        subprocess.run(step1_cmd, check=True)
        return (cell_type, gene, "step1成功")
    
    except subprocess.CalledProcessError as e:
        return (cell_type, gene, f"step1失败: {str(e)}")
    except Exception as e:
        return (cell_type, gene, f"step1其他错误: {str(e)}")

In [None]:
def run_step2(args):
    """
    并行执行step2 - 关联分析
    args: (cell_type, gene)
    """
    cell_type, gene = args
    try:
        # 获取当前基因的染色体
        chrom = gene_chr.get(gene)
        if not chrom:
            return (cell_type, gene, f"step2失败: 基因 {gene} 的染色体信息缺失")
        
        # 检查region文件是否存在
        gene_region_file = os.path.join(REGION_FILE_DIR, f"{gene}.txt")
        if not os.path.exists(gene_region_file):
            return (cell_type, gene, f"step2失败: region文件 {gene_region_file} 不存在")
        
        # 构建路径
        cell_type_dir = os.path.join(BASE_OUTPUT_DIR, cell_type)
        gene_dir = os.path.join(cell_type_dir, gene)
        step1_prefix = os.path.join(gene_dir, f"{gene}_null_model")
        step2_output = os.path.join(gene_dir, f"{gene}_step2_output")
        
        # 第二步：关联分析
        step2_cmd = [
            "/home/yangshichen/mambaforge/envs/R/bin/Rscript", "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/script/step2_tests_qtl.R",
            f"--bedFile={PLINK_FILE}.bed", #基因型数据（PLINK 格式）
            f"--bimFile={PLINK_FILE}.bim", #基因型数据（PLINK 格式）
            f"--famFile={PLINK_FILE}.fam", #基因型数据（PLINK 格式）
            f"--SAIGEOutputFile={step2_output}", #结果文件输出路径
            f"--chrom={chrom}", #要分析的染色体号（通常是数字 1–22 或 X）
            "--minMAF=0.1", #过滤掉次要等位基因频率小于 10% 的 SNP
            "--minMAC=20", #过滤掉次要等位基因支持数小于 20 的 SNP
            "--LOCO=FALSE", #是否使用 Leave-One-Chromosome-Out (LOCO) 方法计算 GRM（遗传相关矩阵）
            f"--GMMATmodelFile={step1_prefix}.rda", #输入 step1 的 null model 文件（广义线性混合模型）
            "--SPAcutoff=2", #SPA (saddlepoint approximation) 的阈值，控制用于小样本稀有变异的精确校正。默认 2
            f"--varianceRatioFile={step1_prefix}.varianceRatio.txt", #输入 step1 生成的 方差比文件
            f"--rangestoIncludeFile={gene_region_file}", #指定要测试的 SNP 区域（通常是某个基因 ±1Mb 的区间）
            "--markers_per_chunk=10000" #每次计算时处理的 SNP 数量，避免内存溢出
        ]
        
        print(f"处理 {cell_type} - {gene} (chr{chrom}): 第二步关联分析")
        subprocess.run(step2_cmd, check=True)
        return (cell_type, gene, "step2成功")
    
    except subprocess.CalledProcessError as e:
        return (cell_type, gene, f"step2失败: {str(e)}")
    except Exception as e:
        return (cell_type, gene, f"step2其他错误: {str(e)}")

In [None]:
def process_cell_type(cell_type, genes):
    """
    处理单个细胞类型的所有基因，分两步并行执行
    """
    # 准备表型文件路径
    pheno_file = os.path.join(PHENO_INPUT_DIR, f"{cell_type}.txt")
    
    # 检查表型文件是否存在
    if not os.path.exists(pheno_file):
        print(f"\n警告: 表型文件 {pheno_file} 不存在，跳过细胞类型 {cell_type}")
        return 0, len(genes), 0, len(genes)
    
    print(f"\n开始处理细胞类型: {cell_type}")
    print(f"待处理基因数: {len(genes)}")
    
    # 第一步：并行执行所有基因的step1
    print("\n开始第一步处理...")
    step1_tasks = [(cell_type, gene, pheno_file) for gene in genes]
    step1_success = 0
    step1_failures = 0
    
    with Pool(GENES_PER_BATCH) as pool:
        for result in pool.imap_unordered(run_step1, step1_tasks):
            cell, gene, status = result
            if "成功" in status:
                step1_success += 1
            else:
                step1_failures += 1
                print(f"第一步失败: {cell} - {gene}: {status}")
    
    # 第二步：并行执行所有基因的step2（仅对step1成功的基因）
    print("\n开始第二步处理...")
    step2_tasks = [(cell_type, gene) for gene in genes]
    step2_success = 0
    step2_failures = 0
    
    with Pool(GENES_PER_BATCH) as pool:
        for result in pool.imap_unordered(run_step2, step2_tasks):
            cell, gene, status = result
            if "成功" in status:
                step2_success += 1
            else:
                step2_failures += 1
                print(f"第二步失败: {cell} - {gene}: {status}")
    
    # 统计结果
    print(f"\n细胞类型 {cell_type} 处理结果:")
    print(f"第一步成功: {step1_success}, 失败: {step1_failures}")
    print(f"第二步成功: {step2_success}, 失败: {step2_failures}")
    
    return step1_success, step1_failures, step2_success, step2_failures

In [None]:
##开始运行
# 检查基因染色体映射是否完整
missing_chr = [gene for genes in cell_type_genes_dict.values() 
                for gene in genes if gene not in gene_chr]
if missing_chr:
    print(f"警告: 以下基因缺少染色体信息: {', '.join(missing_chr)}")
    
# 确保基础目录存在
os.makedirs(BASE_OUTPUT_DIR, exist_ok=True)
    
total_step1_success = 0
total_step1_failures = 0
total_step2_success = 0
total_step2_failures = 0
    
# 按顺序处理每个细胞类型
for cell_type, genes in cell_type_genes_dict.items():
    s1_success, s1_failures, s2_success, s2_failures = process_cell_type(cell_type, genes)
    total_step1_success += s1_success
    total_step1_failures += s1_failures
    total_step2_success += s2_success
    total_step2_failures += s2_failures

print("\n分析完成")
print(f"第一步总成功: {total_step1_success}, 总失败: {total_step1_failures}")
print(f"第二步总成功: {total_step2_success}, 总失败: {total_step2_failures}")

### 运行：Step 1: Null Model Fitting and Step 2: Association Testing

In [None]:
nohup python run_saige_qtl_CD4_T.py > run_CD4.log 2>&1 &

### 配置参数 （Step 3）

In [10]:
BASE_OUTPUT_DIR = "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/result/"
with open('/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/input/dict/CD4+ T.json', 'r') as f:
    cell_type_genes_dict = json.load(f)
cell_type_genes_dict

{'CD4_Naive_T-CCR7': ['AAGAB',
  'AAK1',
  'AAMP',
  'AARS',
  'AARS2',
  'AARSD1',
  'AASDH',
  'AASDHPPT',
  'AATF',
  'ABCA5',
  'ABCA7',
  'ABCB7',
  'ABCC1',
  'ABCC5',
  'ABCD2',
  'ABCD3',
  'ABCD4',
  'ABCE1',
  'ABCF1',
  'ABCF2',
  'ABCF3',
  'ABHD10',
  'ABHD13',
  'ABHD14B',
  'ABHD16A',
  'ABHD17A',
  'ABHD18',
  'ABHD2',
  'ABHD3',
  'ABI1',
  'ABI2',
  'ABL2',
  'ABLIM1',
  'ABR',
  'ABRACL',
  'ABRAXAS1',
  'ABT1',
  'ABTB1',
  'ACAA1',
  'ACAA2',
  'ACAD11',
  'ACAD8',
  'ACAD9',
  'ACADM',
  'ACADSB',
  'ACADVL',
  'ACAP1',
  'ACAP2',
  'ACAT1',
  'ACAT2',
  'ACBD3',
  'ACBD5',
  'ACBD6',
  'ACCS',
  'ACD',
  'ACIN1',
  'ACLY',
  'ACO2',
  'ACP1',
  'ACP5',
  'ACSF3',
  'ACSL3',
  'ACSL4',
  'ACSL5',
  'ACSL6',
  'ACSS1',
  'ACTB',
  'ACTG1',
  'ACTL6A',
  'ACTN1',
  'ACTN4',
  'ACTR10',
  'ACTR1A',
  'ACTR1B',
  'ACTR2',
  'ACTR3',
  'ACTR6',
  'ACYP1',
  'ACYP2',
  'ADA',
  'ADA2',
  'ADAM10',
  'ADAM17',
  'ADAM8',
  'ADAR',
  'ADARB1',
  'ADAT1',
  'ADAT2',
  'ADC

In [11]:
def run_step3(args):
    """
    并行执行 step3 - 计算 gene-level p-value
    args: (cell_type, gene)
    """
    cell_type, gene = args
    try:
        # 构建输入输出路径
        gene_dir = os.path.join(BASE_OUTPUT_DIR, cell_type, gene)
        step2_prefix = os.path.join(gene_dir, f"{gene}_step2_output")
        step3_output = os.path.join(gene_dir, f"{gene}_genePval_output.csv")

        # step3 命令
        step3_cmd = [
            "/home/yangshichen/mambaforge/envs/R/bin/Rscript",
            "/media/AnalysisDisk2/Yangshichen/0_HIV_RNA/SAIGEQTL/script/step3_gene_pvalue_qtl.R",
            f"--assocFile={step2_prefix}",
            f"--geneName={gene}",
            f"--genePval_outputFile={step3_output}"
        ]

        subprocess.run(step3_cmd, check=True)
        return (cell_type, gene, "step3成功")

    except subprocess.CalledProcessError as e:
        return (cell_type, gene, f"step3失败: {str(e)}")
    except Exception as e:
        return (cell_type, gene, f"step3其他错误: {str(e)}")


In [12]:
def process_step3_for_celltype(cell_type, genes, threads=80):
    """
    对单个细胞类型的所有基因执行 step3 并行处理
    """
    print(f"\n开始处理细胞类型 {cell_type} 的 step3，共 {len(genes)} 个基因")

    step3_success = 0
    step3_failures = 0

    step3_tasks = [(cell_type, gene) for gene in genes]

    with Pool(threads) as pool:
        for result in pool.imap_unordered(run_step3, step3_tasks):
            cell, gene, status = result
            if "成功" in status:
                step3_success += 1
            else:
                step3_failures += 1
                print(f"step3失败: {cell} - {gene}: {status}")

    print(f"细胞类型 {cell_type} step3 完成: 成功 {step3_success}, 失败 {step3_failures}")
    return step3_success, step3_failures

### 运行：Step 3: Gene-level Analysis

In [None]:
total_success = 0
total_failures = 0

for cell_type, genes in cell_type_genes_dict.items():
    s_success, s_failures = process_step3_for_celltype(cell_type, genes, threads=80)
    total_success += s_success
    total_failures += s_failures

In [14]:
print(f"\n所有细胞类型 step3 完成: 成功 {total_success}, 失败 {total_failures}")


所有细胞类型 step3 完成: 成功 81783, 失败 5047
