* cd-hit

In [6]:
a = f"./results/1/2"
print(a.split('/')[2])

1


In [11]:
import subprocess
import os
root_path = os.getcwd()
print("当前 root_path:", root_path)

def run_cd_hit(input_fasta, output_prefix, similarity=0.9, threads=4):
    """
    output_prefix: 输出文件前缀
    similarity: 相似性阈值 (0-1)
    threads: 使用的CPU线程数
    """
    
    # 构建cd-hit命令
    cmd = [
        'cd-hit',
        '-i', input_fasta,
        '-o', output_prefix,
        '-c', str(similarity),
        '-T', str(threads),
        '-M', '16000'  # 内存限制16GB
    ]
    
    try:
        # 执行命令
        result = subprocess.run(cmd, check=True, capture_output=True, text=True)
        print("cd-hit运行成功!")
        print("输出文件：")
        print(f"  - 代表序列: {output_prefix}")
        print(f"  - 聚类信息: {output_prefix}.clstr")
        return True
    except subprocess.CalledProcessError as e:
        print(f"cd-hit运行失败: {e}")
        print(f"错误输出: {e.stderr}")
        return False

def get_temp_fasta(input_file:str):
    temp_file: str = input_file.replace('.txt', '.fasta')
    print(f'temp:{temp_file}')
    store_site: dict = dict()
    with open(temp_file, 'w', encoding='utf-8') as writer_file:
        with open(input_file, 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith('>'):
                    protein_name = line
                    seq = next(f)
                    sites = next(f)
                    store_site[protein_name] = sites
                    writer_file.writelines([protein_name, seq])
    run_cd_hit(temp_file, "clustered_output", 0.9, 8)
    final_file: str = input_file.replace('.txt', '_final.txt')
    with open(final_file, 'w', encoding='utf-8') as f:
        with open(temp_file, 'r', encoding='utf-8') as reader_file:
            for line in reader_file:
                protein_name = line
                seq = next(reader_file)
                sites = store_site[protein_name]
                f.writelines([protein_name, seq, sites])
    os.remove(temp_file)
    os.remove(os.path.join(root_path, 'clustered_output'))
    os.remove(os.path.join(root_path, 'clustered_output.clstr'))


# 使用示例
if __name__ == "__main__":
    ions = ['zn', 'co', 'cu', 'fe', 'fe2', 'k', 'mg', 'mn', 'na', 'ni', 'zn']
    types = ['train', 'test']
    for ion in ions:
        for train in types:
            input_dir = f'data_make/{ion}'
            file_path = os.path.join(input_dir, f'{train}.txt')
            get_temp_fasta(file_path)

当前 root_path: /data1/ysq/GPU文件/需备份文件/T5模型参数
temp:data_make/zn/train.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/zn/test.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/co/train.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/co/test.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/cu/train.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/cu/test.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/fe/train.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/fe/test.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_output.clstr
temp:data_make/fe2/train.fasta
cd-hit运行成功!
输出文件：
  - 代表序列: clustered_output
  - 聚类信息: clustered_

* 滑动窗口

In [14]:
from typing import Union
import os
root_path = os.getcwd()
print("当前 root_path:", root_path)

def slice_fragment(
    input_file: str, 
    output_dir: str = '.', 
    output_file_profix: str = '',
    focus: Union[list, bool]=None, 
    return_output:bool=False,
    pad_char='-',
    window_size=25
    ) -> Union[None, tuple]:
    """
    DESCRIPTION:
        slice length==25 fragemnt of protein sequence.
        the postive fragment is the central residue is bind site and focus
    :param input_file:
    :param focus:
    :return:
    """
    half_window = window_size // 2
    if not focus:
        focus = ["C", "D", "E", "G", "H", "K", "N", "R", "S"]
        pass
    positive_fragments: str = ''
    negative_fragments: str = ''
    with open(input_file, 'r') as f:
        for line in f:
            if line.startswith('>'):
                name: str = line.strip().split("\t")[0]
                seq: str = next(f).strip()
                site: str = next(f).strip()
                padded_sequence = pad_char * half_window + seq + pad_char * half_window
                fragments = [padded_sequence[i:i+window_size] for i in range((len(site)))]
                positive_fragment = [name+'\n'+fragments[i]+'\n'+'0'*half_window+site[i]+'0'*half_window+'\n' for i in range(len(site)) if site[i] == '1']
                negative_fragment = [name+'\n'+fragments[i]+'\n'+'0'*half_window+site[i]+'0'*half_window+'\n' for i in range(len(site)) if site[i] == '0']
                positive_fragments += ''.join(positive_fragment)
                negative_fragments += ''.join(negative_fragment)


    if return_output:							
        return positive_fragments, negative_fragments
    else:
        with open(f'{output_dir}/{output_file_profix}positive_fragment.txt', 'w', encoding='utf-8') as writer:
            writer.write(positive_fragments)
        with open(f'{output_dir}/{output_file_profix}negative_fragment.txt', 'w', encoding='utf-8') as writer:
            writer.write(negative_fragments)


ions = ['zn', 'co', 'cu', 'fe', 'fe2', 'k', 'mg', 'mn', 'na', 'ni', 'zn']
types = ['train', 'test']
for ion in ions:
    for train in types:
        save_dir = os.path.join(root_path, 'data_make', 'noreduce', train, f'{ion}_{train}')
        os.makedirs(save_dir, exist_ok=True)
        file_path = os.path.join(root_path, 'data_make', ion, f'{train}_final.txt')
        slice_fragment(
            input_file=file_path, 
            output_dir=save_dir,
            output_file_profix=''
        )

当前 root_path: /data1/ysq/GPU文件/需备份文件/T5模型参数


* 约化氨基酸

In [2]:
import os
from typing import Union
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
cpu_count = os.cpu_count()
root_path = os.getcwd()
print("当前 root_path:", root_path)

class IrapSeq:
    def __init__(self) -> None:
        self.irap_path: str = os.path.join(root_path, 'data_make', 'irap.txt')
        self.iraps: dict = self.__readirap__()
    
    def __readirap__(self) -> dict[str, str]:
        irap: dict[str, str] = dict()
        with open(self.irap_path, 'r') as file:
            for line in file:
                line = line.strip().split(' ')
                the_type: str = line[1]
                the_size: str = line[3]
                context: str = line[-1]
                name = f'type_{the_type}+size_{the_size}+{context}'
                assert name not in irap.keys(), f'{name} 重复'
                irap[name] = context
        return irap
        
    def irap_dict(self, type:str, size: str) -> str:
        name: str = f'type:{type}+size:{size}'
        return self.iraps[name]
    
    def irap_dicts(self) -> dict[str, str]:
        return self.iraps
        
    def irap(self, seq:str, type_and_size: Union[bool, str] = None) -> str:
        if type_and_size:
            name: str = type_and_size
        else:
            name: str = f'type:0+size:1'
        irap_context: list = self.iraps[name].split("-")
        return self.__seqtoirap__(seq.upper(), irap_context)
        
    @staticmethod
    def __seqtoirap__(seq: str, irap_list: list) -> str:
        irap_seq: str = ''
        for res in seq:
            if res == '-':  
                irap_seq += '-'
            else:
                matched = False
                for irap_type in irap_list:
                    if res in irap_type:
                        irap_seq += irap_type[0]
                        matched = True
                        break   # 一旦匹配到就退出，避免重复追加
                if not matched:
                    irap_seq += res   # 没匹配到时给默认符号
        return irap_seq


def process_one(input_dir, save_dir, irap, k):
    with open(f'{input_dir}/positive_fragment.txt', 'r') as positive:
        output_path = f'{save_dir}/positive_fragment.txt'
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            pass
        with open(output_path, 'w') as writer:
            for line in positive:
                if line.startswith('>'):
                    protein_name = line.strip()
                    seq = next(positive).strip()
                    sites = next(positive).strip()
                    output = irap.irap(seq=seq, type_and_size=k)
                    assert len(output) == 25, f'len is large, {len(seq)}, {len(output)}, {seq}, {output}, {k}'
                    writer.write('\n'.join([protein_name, output, sites])+'\n')

    with open(f'{input_dir}/negative_fragment.txt', 'r', encoding='utf-8') as negative:
        output_path = f'{save_dir}/negative_fragment.txt'
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            pass
        with open(output_path, 'w') as writer:
            for line in negative:
                if line.startswith('>'):
                    protein_name = line.strip()
                    seq = next(negative).strip()
                    sites = next(negative).strip()
                    output = irap.irap(seq=seq, type_and_size=k)
                    assert len(output) == 25, f'len is large, {seq}, {output}'
                    writer.write('\n'.join([protein_name, output, sites])+'\n')


def select_dataset():
    
    irap = IrapSeq()
    types = irap.irap_dicts()
    # ions = ['zn', 'co', 'cu', 'fe', 'fe2', 'k', 'mg', 'mn', 'na', 'ni', 'zn']
    ions = ['ca']
    train_test = ['train', 'test']
    for ion in ions:
        for train in train_test:
            with ProcessPoolExecutor(max_workers=cpu_count-4) as executor:
                futures = []
                for k, v in types.items():
                    save_dir = '/'.join(
                        [
                        root_path, 
                        'data_make',
                        'reduce', 
                        train, 
                        f'{ion}_{train}', 
                        k]
                    )
                    os.makedirs(save_dir, exist_ok=True)
                    input_dir = os.path.join(root_path, 'data_make', 'noreduce', train, f'{ion}_{train}')
                    futures.append(executor.submit(process_one, input_dir, save_dir, irap, k))
                for f in tqdm(as_completed(futures), total=len(futures), desc="DSSP running', ncols=80"):
                    try:
                        f.result()
                    except Exception as e:
                        print(f"error: {e}")

if __name__ == '__main__':
    select_dataset()
    

当前 root_path: /data1/ysq/GPU文件/需备份文件/T5模型参数


DSSP running', ncols=80: 100%|██████████| 671/671 [03:42<00:00,  3.01it/s]
DSSP running', ncols=80: 100%|██████████| 671/671 [00:26<00:00, 24.90it/s]


* 转化为tfrecord

In [3]:

import tensorflow as tf
from typing import Union
import os
import sys
import random
from  math import ceil

sys.path.append(os.path.dirname(os.getcwd()))
from T5Model.Configs import Config
from data_make.utils import ion_reduce_types
root_path = os.getcwd()
print("当前 root_path:", root_path)


class FullTokenizer(object):


	def __init__(self, vocab_file: str) -> None:
		self.vocab: dict[str, int] = dict()
		with open(vocab_file, 'r') as vocab_file:
			vocab_lines: list[str] = vocab_file.readlines()
			index: int = 0
			for vocab_line in vocab_lines:
				token: str = vocab_line.strip()
				self.vocab[token] = index
				index += 1

	def get_vocab(self) -> dict[str, int]:
		return self.vocab


def loading_data(input_file: str) -> tuple[list, list, list]:

	print(input_file)
	names: list = []
	seqs: list = []
	labels: list = []
	with open(input_file, 'r') as f:
		lines = iter(f.readlines())
		for line in lines:
			if line.startswith('>'):
				name: str = line.strip().split(' ')[0]

				seq: str = next(lines).strip()

				label: str = next(lines).strip()
				label: list = list(map(int, label))

				names.append(name)
				seqs.append(seq)
				labels.append(label)
	return names, seqs, labels


def convert_tokens_to_ids(tokens: list[str], vocab: dict[str, int]) -> list[int]:

	ids: list = []
	for position in range(len(tokens)):
		if tokens[position] in vocab.keys():
			ids += [vocab[tokens[position].upper()]]
		else:
			ids += [vocab['-']]
	return ids


def get_features(
		name: str,
		seq: str,
		label: Union[list, int],
		vocab: dict[str, int],
		max_length: int) -> tuple[bytes, list, list, list]:

	pdb_id: bytes = name.encode('utf-8')
	max_length: int = max_length - 2
	assert len(seq) == max_length, f'seq length:{len(seq)} < max_length {max_length}'

	seq_list = [x if x != '-' else '[PAD]' for x in seq]

	start: int = vocab["[CLS]"]
	end: int = vocab["[SEP]"]

	seq_list: list[int] = convert_tokens_to_ids(seq_list, vocab)

	seq_list: list[int] = [start] + seq_list + [end]

	pad: list[int] = [1] + [1 if x != '-' else 0 for x in seq] + [1]

	labels: list[int] = [0] + label + [0]
	# print(f'pdb_id:{pdb_id}\nseq_id:{len(seq_id)}\npad:{len(pad)}\nlabel:{len(labels)}')
	return pdb_id, seq_list, pad, labels


def tokenize(
		file_path: str,
		output_path: str,
		vocab_file: str,
		) -> None:

	config: Config = Config()
	names, seqs, labels = loading_data(file_path)
	vocabs: dict[str, int] = FullTokenizer(vocab_file).get_vocab()
	with tf.io.TFRecordWriter(output_path) as writer:
		for line_index in range(len(names)):
			pdb_id, seq_id, pad, label = get_features(
				names[line_index],
				seqs[line_index],
				labels[line_index],
				vocabs,
				config.max_seq_length)

			features: dict[str, tf.Tensor] = {
				'protein_name': tf.train.Feature(bytes_list=tf.train.BytesList(value=[pdb_id])),
				'input_ids': tf.train.Feature(int64_list=tf.train.Int64List(value=seq_id)),
				'input_mask': tf.train.Feature(int64_list=tf.train.Int64List(value=pad)),
				'labels': tf.train.Feature(int64_list=tf.train.Int64List(value=label)),
			}
			example = tf.train.Example(features=tf.train.Features(feature=features))
			writer.write(example.SerializeToString())
		writer.close()



def add_negative(
	positive_file: str,
	negative_file: str, 
	output_file: str, 
	scale=False, 
	random_seed=42) -> None:

	if scale:
		random.seed(random_seed)

		with open(positive_file, 'r') as pos_f:
			positive_count = sum(1 for line in pos_f if line.startswith('>'))
		print(f"正样本数量: {positive_count}")
		
		target_negative_count = ceil(positive_count * (10-scale) / scale)
		print(f"目标负样本数量: {target_negative_count}")
		
		# 统计负样本总数
		negative_total = 0
		with open(negative_file, 'r') as neg_f:
			negative_total = sum(1 for line in neg_f if line.startswith('>'))
		print(f"负样本总数: {negative_total}")
		
		actual_negative = min(target_negative_count, negative_total)
		
		if negative_total <= actual_negative:
			selected_indices = list(range(negative_total))
		else:
			selected_indices = random.sample(range(negative_total), actual_negative)
		
		selected_line_ranges = []
		for idx in selected_indices:
			start_line = idx * 3
			selected_line_ranges.append(range(start_line, start_line + 3))
		

		
		with open(positive_file, 'r') as positive:
			positive_content = positive.read()
		
		with open(output_file, 'w') as output:
			output.write(positive_content)
			
			with open(negative_file, 'r') as negative:
				negative_lines = negative.readlines()
			
			for line_range in selected_line_ranges:
				for i in line_range:
					if i < len(negative_lines):
						output.write(negative_lines[i])
		
		print(f"- 正样本: {positive_count}")
		print(f"- 负样本: {actual_negative}")
	else:
		with open(output_file, 'w') as output:
			with open(positive_file, 'r') as positive:
				for line in positive:
					output.write(line)
			with open(negative_file, 'r') as negative:
				for line in negative:

					output.write(line)
					

def reduce() -> None:
	"""convert txt or fasta to tfrecord"""
	train_test: str = ['train', 'test']
	vocab_path = os.path.join(root_path, 'data_make', 'vocab.txt')
	scale = 4
	for train in train_test:
		target_dir = os.path.join(root_path, 'data_make', 'reduce', train)
		targets: list[str] = os.listdir(target_dir)
		for target in targets:
			if 'k' not in target:
				continue
			path_dir: str = f'{target_dir}/{target}'
			files: list = os.listdir(path_dir)
			for file_dir in files:
				if file_dir != ion_reduce_types[target.split('_')[0]]:
				# if file_dir != 'type_0+size_20+A-C-D-E-F-G-H-I-K-L-M-N-P-Q-R-S-T-V-W-Y':
					continue
				print(file_dir)
				file_path: str = f'{path_dir}/{file_dir}'
				positive_file: str = f'{file_path}/positive_fragment.txt'
				negative_file: str = f'{file_path}/negative_fragment.txt'
				output_file = f'{file_path}/{train}_{scale}.txt' if scale else f'{file_path}/{train}.txt'
				tf_file = output_file.replace('txt', 'tfrecord') 
				if os.path.exists(output_file):
					add_negative(positive_file, negative_file, output_file=output_file, scale=scale)
				else:
					add_negative(positive_file, negative_file, output_file=output_file, scale=scale)
				if os.path.exists(f'{file_path}/{train}.tfrecord'):
					tokenize(file_path=output_file, output_path=tf_file, vocab_file=vocab_path)
				else:
					tokenize(file_path=output_file, output_path=tf_file, vocab_file=vocab_path)



def main() -> None:
	"""convert txt or fasta to tfrecord"""
	train: str = 'train'
	operation: str = 'create'
	target_dir: str = f'./noreduce/{train}'
	# os.makedirs(target_dir, exist_ok=True)
	targets: list[str] = os.listdir(target_dir)
	for target in targets:
		path_dir: str = f'{target_dir}/{target}'
		print(path_dir)
		if operation == 'create':
			positive_file: str = f'{path_dir}/positive_fragment.txt'
			negative_file: str = f'{path_dir}/negative_fragment.txt'
			input_path: str = f'{path_dir}/{train}.txt'
			
			if os.path.exists(input_path):
				add_negative(positive_file, negative_file, train=train, output_dir=path_dir)
			else:
				add_negative(positive_file, negative_file, train=train, output_dir=path_dir)
			if os.path.exists(f'{path_dir}/{train}.tfrecord'):
				tokenize(file_path=input_path, output_path=path_dir, vocab_file='./vocab.txt', train=train)
			else:
				tokenize(file_path=input_path, output_path=path_dir, vocab_file='./vocab.txt', train=train)

			
if __name__ == '__main__':
	# main()
	reduce()
	


当前 root_path: /data1/ysq/GPU文件/需备份文件/T5模型参数
type_49+size_17+C-FY-W-ML-IV-G-P-A-T-S-N-H-Q-E-D-R-K
正样本数量: 2407
目标负样本数量: 3611
负样本总数: 93274
- 正样本: 2407
- 负样本: 3611
/data1/ysq/GPU文件/需备份文件/T5模型参数/data_make/reduce/train/k_train/type_49+size_17+C-FY-W-ML-IV-G-P-A-T-S-N-H-Q-E-D-R-K/train_4.txt
type_49+size_17+C-FY-W-ML-IV-G-P-A-T-S-N-H-Q-E-D-R-K
正样本数量: 293
目标负样本数量: 440
负样本总数: 10398
- 正样本: 293
- 负样本: 440
/data1/ysq/GPU文件/需备份文件/T5模型参数/data_make/reduce/test/k_test/type_49+size_17+C-FY-W-ML-IV-G-P-A-T-S-N-H-Q-E-D-R-K/test_4.txt


* 结果统计

In [None]:
import os
import csv

root_dir: str = '..'
target: str = 'fe2'
dataset: str = 'LigBind'
metrics_dir: str = os.path.join(root_dir, f'results_{dataset}_{target}')
files: list = os.listdir(metrics_dir)
with open(f'{root_dir}/totalMetrics_{target}.csv', 'w', encoding='utf-8',newline='') as writer_file: 
    writer = csv.writer(writer_file, delimiter=',')
    writer.writerow(['target_ion', 'reduce_type', 'auc_roc','auc_pr','precision','recall','f1', 'mcc','accuracy'])
    for file in files:
        if file.startswith('type'):
            metrics_path: str = os.path.join(metrics_dir, file)
        assert os.path.isdir(metrics_path), f'error {metrics_path} is not dir'
        metrics_path: str = os.path.join(metrics_path, 'metrics.txt')
        with open(metrics_path, 'r', encoding='utf-8') as reader:
            output = list(map(float, reader.readlines()[-1].split('\t')))
            output.insert(0, file)
            output.insert(0, target)
            writer.writerow(output)




* 数据统计

In [None]:
import os

input_dir = './noreduce/train'

files = os.listdir(input_dir)
for file in files:
    positive_file = os.path.join(input_dir, file, 'positive_fragment.txt')
    negative_file = os.path.join(input_dir, file, 'negative_fragment.txt')
    positive_dict = []
    negative_dict = []
    with open(positive_file, 'r' , encoding='utf-8') as f:
        for line in f:
            if line.startswith('>'):
                positive_dict.append(line.strip())
                # print(line.strip()[0:5])

    with open(negative_file, 'r' , encoding='utf-8') as f:
        for line in f:
            if line.startswith('>'):
                negative_dict.append(line.strip())
    positive_len = len(positive_dict)
    negative_len = len(negative_dict)
    total_len = positive_len+negative_len
    reduce_len = total_len*671
    print(
        file, 
        '\t', 
        f'positive:{positive_len}, negative:{negative_len}, total:{total_len}, reduce_len:{reduce_len}')

In [None]:
import tensorflow as tf

# 1. 定义特征描述
# 关键：这里必须和写入tfrecord文件时的特征结构完全一致
feature_description = {
    'protein_name': tf.io.FixedLenFeature([], tf.string),  # 字节特征
    'center_res': tf.io.FixedLenFeature([], tf.string),   # 整型特征
    'input_ids': tf.io.FixedLenFeature([27], tf.int64), # 浮点特征
    'input_mask': tf.io.FixedLenFeature([27], tf.int64), # 浮点特征
}

# 2. 解析函数
def parse_example(serialized_example):
    parsed_features = tf.io.parse_single_example(serialized_example, feature_description)
    # 如果需要，对解析后的特征进行后续处理，例如解码图像等
    return parsed_features

# 3. 读取并解析文件
dataset = tf.data.TFRecordDataset('../temp.tfrecord')
parsed_dataset = dataset.map(parse_example)

# 4. 迭代查看内容
for i, parsed_record in enumerate(parsed_dataset):
    print(f"Example {i}:")
    for feature_name, feature_value in parsed_record.items():
        print(f"  {feature_name}: {feature_value.numpy()}") # 获取Tensor的numpy值
    print()
    if i >= 4:  # 限制打印前5个样本，避免输出过长
        print("... (and more)")
        break

In [3]:
import os

root_path = os.getcwd()
root_path

'/home/ysq/需备份文件/T5模型参数/data_make'