In [None]:
'''
把标准格式转化为xTrimol格式。
'''
import pandas as pd

def process_csv(file_path, col1_name, col2_name, col3_name, output_file_path):
    # 1. 使用 pandas 读取一个 csv
    df = pd.read_csv(file_path)
    # 去除第一列和第二列字符串中的分号
    df.iloc[:, 0] = df.iloc[:, 0].astype(str).str.replace(';', '')
    df.iloc[:, 1] = df.iloc[:, 1].astype(str).str.replace(';', '')
    # 2. 将第一列的 str 和第二列的 str 用“|”合并，保存在第二列
    df.iloc[:, 1] = df.iloc[:, 0].astype(str) + '|' + df.iloc[:, 1].astype(str)

    # 3. 原先的第一列替换为 uid_0, uid_1 这种记行的信息
    df.iloc[:, 0] = [f'uid_{i}' for i in range(len(df))]

    # 4. 最后把列名改了，总共三列，列名自定义
    df.columns = [col1_name, col2_name, col3_name]
    
    df.to_csv(output_file_path, index=False)

    return df

# 示例使用
file_path = '/home/users/hcdai/AI-peptide/Seq2Score/Henya/dataset/Example_majority2.csv'  # 替换为你的 CSV 文件路径
col1_name = 'unique_id'
col2_name = 'aa_seq'
col3_name = 'label'
output_file_path = '/home/users/hcdai/AI-peptide/Seq2Score/Henya/dataset/xTrimo_tcr_train.csv'

result_df = process_csv(file_path, col1_name, col2_name, col3_name, output_file_path)
print(result_df)

       unique_id                        aa_seq  label
0          uid_0      ATDALMTGY|CAISESQGNTEAFF      1
1          uid_1      ATDALMTGY|CAISEDRALVSYTF      1
2          uid_2      ATDALMTGY|CAISEDRALNEQFF      1
3          uid_3      ATDALMTGY|CAVQPGQGMQPQHF      1
4          uid_4     ATDALMTGY|CAISEGAMGNQPQHF      1
...          ...                           ...    ...
22277  uid_22277     NLVPMVATV|CASRAGTGYYNEQFF      0
22278  uid_22278      NLVPMVATV|CASKRGVGEDTQYF      0
22279  uid_22279      NLVPMVATV|CASSLPRTRDTQYF      0
22280  uid_22280    NLVPMVATV|CASSYSGQGSSYGYTF      0
22281  uid_22281  NLVPMVATV|CASSRLPATGGVTQPQHF      0

[22282 rows x 3 columns]


In [None]:
# %%
'''
RunRosetta.py,试图改成可提取序列的
'''
import time
# import json
import os
import subprocess
import shutil
import datetime
import csv
import pandas as pd
from Bio.PDB.PDBParser import PDBParser
from Bio.PDB import Selection, PDBIO

# Load the config JSON file
# config_path = "/home/users/hcdai/AI-peptide/RunRosetta/config.json"

workspace = ''
index_csv_path = ''
# with open(config_path, "r") as f:
#     config = json.load(f)
# workspace = config["workspace"]
# rosetta_path = config["rosetta"]
os.chdir(workspace)
index_csv_pd = pd.read_csv(index_csv_path,index_col = 0)

# %%
# 读取pdb文件的索引信息
pdb_index = pd.read_csv(config['pdb_index'], index_col=0)
print(pdb_index.head())
# ab = pdb_index.loc['1wej']["ligand_chain"].split(';')
# print(ab)

# %%
# 定义待处理的pdb文件的生成器

pdb_dir = config['input']
pdb_file_list = os.listdir(pdb_dir)
pdb_file_num = len(pdb_file_list)

def pdb_generator():
    "pdb文件生成器，用于input文件夹中读取pdb文件，仅限后缀为.pdb的文件"
    for i in range(pdb_file_num):
        pdb_file:str = os.path.join(pdb_dir, pdb_file_list[i])
        if os.path.isfile(pdb_file) and pdb_file.endswith('.pdb'):
            yield pdb_file
        else:
            print(f"{pdb_file} is not a valid pdb file.")




# %%
# 定义pdb文件处理的函数

def pdb_parser(pdb_file: str):
    """
    解析pdb文件，返回序列等信息
    """

    # 获取pdb文件的名字
    pdb_part = pdb_file.split('/')[-1].split('.')[:-1]
    pdb_name = pdb_part[0]

    # 检测pdb_name是否在pdb_index中
    if pdb_name not in pdb_index.index:
        print(f"{pdb_name} not in pdb_index")
        return pdb_name


    receptor_chain:list = pdb_index.loc[pdb_name]["receptor_chain"].replace(' ', '').split(',')
    ligand_chain:list = pdb_index.loc[pdb_name]["ligand_chain"].replace(' ', '').split(',')


    # 定义pdb对象（解析器）
    parser = PDBParser(PERMISSIVE=1) # PERMISSIV 标签表示一些与PDB文件相关的问题会被忽略（注意某些原子和/或残基会丢失）。
    structure = parser.get_structure(pdb_name, pdb_file)

    # 从pdb对象中选取特定的链，并解析其序列


 
    # 定义氨基酸三个字符映射方式
    mapping = {'ALA': 'A', 
               'ARG': 'R', 
               'ASN': 'N', 
               'ASP': 'D', 
               'CYS': 'C', 
               'GLN': 'Q', 
               'GLU': 'E', 
               'GLY': 'G', 
               'HIS': 'H', 
               'ILE': 'I', 
               'LEU': 'L', 
               'LYS': 'K', 
               'MET': 'M', 
               'PHE': 'F', 
               'PRO': 'P', 
               'SER': 'S', 
               'THR': 'T', 
               'TRP': 'W', 
               'TYR': 'Y', 
               'VAL': 'V',
               'ACE': ''}
    
    # 初始化序列
    receptor_seq = {}
    ligand_seq = {}

    for r in receptor_chain:
        for model in structure:
            for chain in model:
                if chain.get_id() == r:
                    receptor_seq.update({r:''.join([mapping.get(item,'') for item in [residue.get_resname().strip() for residue in chain]])})
    for l in ligand_chain:
        for model in structure:
            for chain in model:
                if chain.get_id() == l:
                    ligand_seq.update({l:''.join([mapping.get(item,'') for item in [residue.get_resname().strip() for residue in chain]])})
                    
                    

    return pdb_name , ligand_seq , receptor_seq  


def run_command(command):
    result = subprocess.run(command, shell=True, check=True, text=True, capture_output=True)
    return result.stdout
    

# %%
def rosetta_score_changed(pdb_file_path, output_dir, receptor_chain, ligand_chain, rosetta_path:dict = rosetta_path, ):
    import os
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)  
    
    print("Running Rosetta scoring")
    

    # 执行命令的函数
    # def run_command(command):
    #     result = subprocess.run(command, shell=True, check=True, text=True, capture_output=True)
    #     return result.stdout
    
    rosetta_temp_path = os.path.join(config["temp"])
    if not os.path.exists(rosetta_temp_path):
        os.makedirs(rosetta_temp_path)
    scorefile_path = os.path.join(output_dir, "scores.sc")
    if os.path.exists(scorefile_path):
        os.remove(scorefile_path)
    print("!!!!!!!!!!!!!!!!!!!!!!!!!!")
    
    ori_path = os.getcwd()
    print(ori_path)
    # 下策：改变工作路径
    os.chdir(rosetta_temp_path)
    print('os.dir done')
 
    # 对输入文件进行打分
    score_command = f"{rosetta_path['score_executable']} -s {pdb_file_path} -no_optH false -ignore_unrecognized_res -out:pdb"
    print(score_command)
    run_command(score_command)
    print("Rosetta score_command successful.")

    # 将rosetta_temp_path中打分后的文件重命名，为原名称后面加上"_scored"
    pdb_files:list = [f for f in os.listdir(rosetta_temp_path) if f.endswith(".pdb")] # type: ignore
    for pdb_file in pdb_files:
        os.rename(os.path.join(rosetta_temp_path, pdb_file),
                   os.path.join(rosetta_temp_path, 
                                pdb_file.replace(".pdb", "_scored.pdb")))    
        
    print("rename successful.")
    print(output_dir)

    # 从rosetta_temp_path中逐条读取文件相对路径，进行接口分析，分析结果保存至output_dir中下以ligand_name命名的文件夹中
    for pdb_file in pdb_files:
        scored_pdb_file = pdb_file.replace(".pdb", "_scored.pdb")
        scored_pdb_path = os.path.join(rosetta_temp_path, scored_pdb_file)
        
    # 进行接口分析，输出结果到指定的文件夹
    analyze_command = f"{rosetta_path['InterfaceAnalyzer']} -s {scored_pdb_path} -fixedchains {''.join(receptor_chain)},{''.join(ligand_chain)} @{rosetta_path['pack_input_options']}"
    run_command(analyze_command)

    print(f"InterfaceAnalyzer successful for {scored_pdb_file}.")

    # 复制temp文件夹内的所有文件文件到output_dir
    for file in os.listdir(rosetta_temp_path):
        # if file.endswith(".pdb") or file.endswith(".sc"):
        if file.endswith(".sc"):
            shutil.copy(os.path.join(rosetta_temp_path, file), os.path.join(output_dir, file))
        

    # 下策：重新定义回旧工作路径    
    os.chdir(ori_path)
    print(os.getcwd())

    print("Rosetta scoring successful.")

# %%
def csv_add(csv_file_path, score_output_dir, ligand_name, ligand_sequence, receptor_name, receptor_sequence):
    '''将分数结果写入csv文件
    Args:
        csv_file_path: csv文件路径
        ligand_name: 配体名称
        ligand_sequence: 配体序列
        receptor_name: 受体名称
        receptor_sequence: 受体序列
        score_output_dir: 分数输出文件夹路径
    '''
    pack_score_path=os.path.join(score_output_dir, "pack_input_score.sc")
    scores_score_path=os.path.join(score_output_dir, "score.sc")
    time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')


    # 创建目录（如果不存在）
    # csv_dir = os.path.dirname(csv_file_path)
    # if not os.path.exists(csv_dir):
    #     os.makedirs(csv_dir)

    # 初始化结果列表
    result_list = []

    # 读取pack_input_scores.sc文件，获取打分数据
    with open(pack_score_path, 'r') as sc_file:
            lines = sc_file.readlines()
            if len(lines) >= 3:
                headers = [f"pack_{header}" for header in lines[1].strip().split()[1:]]  # 第二行为表头，去掉第一个，并添加前缀
                data = lines[2].strip().split()     # 第三行为数据
                # 去掉第一个表头和第一个数据
                # headers = headers[1:]
                data = data[1:]
                # 创建字典存储打分数据
                pack_score_data = dict(zip(headers, data))
    # 获取scores.sc打分数据
    with open(scores_score_path, 'r') as sc_file:
        lines = sc_file.readlines()
        if len(lines) >= 3:
            headers = [f"scores_{header}" for header in lines[1].strip().split()[1:]]  # 第二行为表头，去掉第一个，并添加前缀
           # 初始化一个字典来存储数据，键为表头，值为空列表
        scores_data = {header: [] for header in headers}
            # 从第三行开始读取数据（索引为2的行）
        for line in lines[2:]:
            # 分割每行的数据，并跳过第一个数据（索引为0的元素）
            data = line.strip().split()[1:]
            
            # 检查数据长度是否与表头长度一致
            if len(data) != len(headers):
                raise ValueError(f"数据长度 {len(data)} 与表头长度 {len(headers)} 不一致")
            
            # 将数据添加到对应的表头列表中
            for i, header in enumerate(headers):
                scores_data[header].append(data[i])
    
    # 合并打分数据
    merged_scores_data = {**scores_data, **pack_score_data}

    result_list.append({
        'ligand_name': ligand_name,
        'ligand_sequence': ligand_sequence,
        'receptor_name': receptor_name,
        'receptor_sequence': receptor_sequence, 
        'time': time,
        **merged_scores_data  # 将打分数据作为额外字段添加
    })

    # 写入csv文件
    with open(csv_file_path, mode='a', newline='', encoding='utf-8') as csv_file:
        fieldnames = ['ligand_name', 'ligand_sequence', 'receptor_name', 'receptor_sequence','time']
        # 添加打分数据的表头
        for key in result_list[0].keys():
            if key not in fieldnames:
                fieldnames.append(key)
        
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        # writer.writeheader()
        for row in result_list:
            writer.writerow(row)
 
    print(f"Results have been written to {csv_file_path}")

# %%
try:
    shutil.rmtree(config["temp"])
    print("Cleaned up temp directory.")
except:
    pass

# 新建error_pdb.txt文件
try:
    with open(config["error_pdb"], 'w') as file:
        pass
except:
    pass

if not os.path.isfile(config["output_csv"]):
    with open(config["output_csv"], mode='w', newline='', encoding='utf-8') as csv_file:
            fieldnames = ['ligand_name','ligand_sequence','receptor_name','receptor_sequence','time','scores_total_score','scores_dslf_fa13','scores_fa_atr','scores_fa_dun','scores_fa_elec','scores_fa_intra_rep','scores_fa_intra_sol_xover4','scores_fa_rep','scores_fa_sol','scores_hbond_bb_sc','scores_hbond_lr_bb','scores_hbond_sc','scores_hbond_sr_bb','scores_linear_chainbreak','scores_lk_ball_wtd','scores_omega','scores_overlap_chainbreak','scores_p_aa_pp','scores_pro_close','scores_rama_prepro','scores_ref','scores_yhh_planarity','scores_description','pack_total_score','pack_complex_normalized','pack_dG_cross','pack_dG_cross/dSASAx100','pack_dG_separated','pack_dG_separated/dSASAx100','pack_dSASA_hphobic','pack_dSASA_int','pack_dSASA_polar','pack_delta_unsatHbonds','pack_dslf_fa13','pack_fa_atr','pack_fa_dun','pack_fa_elec','pack_fa_intra_rep','pack_fa_intra_sol_xover4','pack_fa_rep','pack_fa_sol','pack_hbond_E_fraction','pack_hbond_bb_sc','pack_hbond_lr_bb','pack_hbond_sc','pack_hbond_sr_bb','pack_hbonds_int','pack_lk_ball_wtd','pack_nres_all','pack_nres_int','pack_omega','pack_p_aa_pp','pack_packstat','pack_per_residue_energy_int','pack_pro_close','pack_rama_prepro','pack_ref','pack_sc_value','pack_side1_normalized','pack_side1_score','pack_side2_normalized','pack_side2_score','pack_yhh_planarity','pack_description']
            writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
            writer.writeheader()

# %%
# 实例化生成器

for pdb_file_path in pdb_generator():
    parser = tuple(pdb_parser(pdb_file_path))
    print(pdb_file_path)
    print(pdb_parser(pdb_file_path))  
    print(parser)

    if len(parser) != 3:
        # 以追加模式将问题pdb写入error_pdb.txt文件
        with open('/home/users/hcdai/AI-peptide/RunRosetta/error_pdb.txt', 'a') as f:
            f.write("".join(list(parser)) + '\n')
        continue
        
    pdb_name, ligand_seq, receptor_seq = parser

    output_path = os.path.join(config['output'], pdb_name)
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    # 调用Rosetta的程序
    # try:
    #     time.sleep(1)
    #     print("sleep ended")
    rosetta_score_changed(
        pdb_file_path,
        output_path,
        receptor_chain = pdb_index.loc[pdb_name]["receptor_chain"].split(';'),
        ligand_chain = pdb_index.loc[pdb_name]["ligand_chain"].split(';'),
        rosetta_path=config['rosetta'],
        
    )
    # except:
    #     # 以追加模式将问题pdb写入error_pdb.txt文件
    #     print("Rosetta scoring failed.")
    #     with open('/home/users/hcdai/AI-peptide/RunRosetta/error_pdb.txt', 'a') as f:
    #         f.write("".join(list(parser[0])) + "255"+ '\n' )
    #     continue
            

    # 写入csv文件
    csv_add(
        csv_file_path=config['output_csv'],
        score_output_dir=output_path,
        ligand_name = pdb_name,
        ligand_sequence = ligand_seq,
        receptor_name = pdb_name,  # !!!!!!!!!!!!!!!!!
        receptor_sequence = receptor_seq,
    )

    try:
        shutil.rmtree(config["temp"])
        print("Cleaned up temp directory.")
    except:
        pass

print('Done!')

# %%





In [None]:
import time
import os
import subprocess
import shutil
import datetime
import csv
import pandas as pd
from Bio.PDB.PDBParser import PDBParser
from Bio.PDB import Selection, PDBIO

'''
args:
'''
workspace = '/home/users/hcdai/AI-peptide/Seq2Score/Henya/dataset/PPI-Affinity/PDB'
index_csv_path = '/home/users/hcdai/AI-peptide/Seq2Score/Henya/dataset/PPI-Affinity/PDB/PPB-Affinity.csv'
pdb_dir = ''

'''
vars:
'''
os.chdir(workspace)
index_csv_pd = pd.read_csv(index_csv_path,index_col = 0)
print(pdb_index.head())



: 

In [5]:
'''
适应xCAPT的调整
'''
import csv

# 输入输出文件路径
input_csv = "/home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/finetuning_test/for_compare.csv"
output_fasta = "/home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/finetuning_test/xcapt_forcompare.fasta"
output_tsv = "/home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/finetuning_test/xcapt_forcompare.tsv"

# 初始化计数器
pep_counter = 1
cdr_counter = 1

# 打开文件并处理
with open(input_csv, 'r') as csv_in, \
     open(output_fasta, 'w') as fasta_out, \
     open(output_tsv, 'w', newline='') as tsv_out:
    
    # 创建CSV读取器
    reader = csv.reader(csv_in)
    next(reader)  # 跳过标题行
    
    # 创建TSV写入器
    tsv_writer = csv.writer(tsv_out, delimiter='\t')
    
    # 遍历每一行数据
    for row in reader:
        # 解析各列数据
        pep_seq = row[0]
        cdr_seq = row[1]
        label = row[2]
        
        # 生成序列名称
        pep_name = f"pep-{pep_counter}"
        cdr_name = f"cdr-{cdr_counter}"
        
        # 写入FASTA文件
        fasta_out.write(f">{pep_name}\n{pep_seq}\n")
        fasta_out.write(f">{cdr_name}\n{cdr_seq}\n")
        
        # 写入TSV文件
        tsv_writer.writerow([pep_name, cdr_name, label])
        
        # 计数器递增
        pep_counter += 1
        cdr_counter += 1

print("处理完成！生成的文件：")
print(f"FASTA文件: {output_fasta}")
print(f"TSV文件: {output_tsv}")

处理完成！生成的文件：
FASTA文件: /home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/finetuning_test/xcapt_forcompare.fasta
TSV文件: /home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/finetuning_test/xcapt_forcompare.tsv


In [None]:
'''
适应xCAPT的调整2.0，增加了指定列名和默认全1的功能
'''
import csv

# 输入输出文件路径
input_csv = "/home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/target_pep/peps_fintuning_tset.csv"
output_fasta = "/home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/target_pep/peps_fintuning_tset_forcompare.fasta"
output_tsv = "/home/users/hcdai/AI-peptide/Seq2Score/xCAPT5/models/MCAPST5-X/protT5/seq/test/target_pep/peps_fintuning_tset_forcompare.tsv"

# 指定列名
pep_seq_col = 'Peptide_seq'  # 替换为实际的列名
cdr_seq_col = 'Receptor_seq'  # 替换为实际的列名
label_col = 'TF(T=1)'      # 替换为实际的列名

# 是否读取label的标志
read_label = True  # 可以修改为False来不读取label，全部默认为1

# 初始化计数器
pep_counter = 1
cdr_counter = 1

# 打开文件并处理
with open(input_csv, 'r') as csv_in, \
     open(output_fasta, 'w') as fasta_out, \
     open(output_tsv, 'w', newline='') as tsv_out:
    
    # 创建CSV读取器
    reader = csv.DictReader(csv_in)
    
    # 创建TSV写入器
    tsv_writer = csv.writer(tsv_out, delimiter='\t')
    
    # 遍历每一行数据
    for row in reader:
        # 检查当前行是否全为空
        if all(value.strip() == '' for value in row.values()):
            continue  # 如果全为空，跳过当前行
        # 解析各列数据
        pep_seq = row[pep_seq_col]
        cdr_seq = row[cdr_seq_col]
        if read_label:
            label = row[label_col]
        else:
            label = '1'
        
        # 生成序列名称
        pep_name = f"pep-{pep_counter}"
        cdr_name = f"cdr-{cdr_counter}"
        
        # 写入FASTA文件
        fasta_out.write(f">{pep_name}\n{pep_seq}\n")
        fasta_out.write(f">{cdr_name}\n{cdr_seq}\n")
        
        # 写入TSV文件
        tsv_writer.writerow([pep_name, cdr_name, label])
        
        # 计数器递增
        pep_counter += 1
        cdr_counter += 1

print("处理完成！生成的文件：")
print(f"FASTA文件: {output_fasta}")
print(f"TSV文件: {output_tsv}")

KeyError: 'rec'