原先GO获得是做成了硬编码 太稀疏了
现在在原先得到蛋白质id对应的GO类别的基础上做修改，根据UniProt的GO 注释展示的各小类总括构建mutil-hot encoding
将几千几万的维度降维到几百 并划分了BP MF CC三类

GO类——处理GO官网下载的数据 用于获得每个GO的祖先 ID 便于后续归类查找

In [None]:
import math
import random
from collections import Counter, deque

import numpy as np
import torch
from numpy.random import randint
from sklearn.metrics import (auc, confusion_matrix, precision_recall_curve,
                             roc_curve)
from torch.utils.data import Dataset

BIOLOGICAL_PROCESS = 'GO:0008150'
MOLECULAR_FUNCTION = 'GO:0003674'
CELLULAR_COMPONENT = 'GO:0005575'
FUNC_DICT = {
    'cc': CELLULAR_COMPONENT,
    'mf': MOLECULAR_FUNCTION,
    'bp': BIOLOGICAL_PROCESS
}

NAMESPACES = {
    'cc': 'cellular_component',
    'mf': 'molecular_function',
    'bp': 'biological_process'
}

# ------------------------------------------------------------------------------------------
# Gene Ontology based on .obo File


# Gene Ontology based on .obo File
class Ontology(object):
    def __init__(self,
                 filename=r'E:\xj\Paper\PO2GO\protein-annotation-master\data\go-basic.obo',
                 with_rels=False,
                 include_alt_ids=True):
        super().__init__()
        self.ont, self.format_version, self.data_version = self.load(
            filename, with_rels, include_alt_ids)
        self.ic = None

    # ------------------------------------
    def load(self, filename, with_rels, include_alt_ids):
        '''
        filename: .obo file  GO总文件
        with_rels: 是否包含关系 计算part_of
        include_alt_ids: 是否包含alt_ids
        '''
        ont = dict()
        format_version = []  # 存储格式版本
        data_version = []  # 存储数据版本
        obj = None
        with open(filename, 'r') as f:
            for line in f:
                line = line.strip()
                # 如果是空行 跳过
                if not line:
                    continue
                    # format version line
                # 记录格式版本
                if line.startswith('format-version:'):
                    l = line.split(': ')
                    format_version = l[1]
                # data version line
                # 记录数据版本
                if line.startswith('data-version:'):
                    l = line.split(': ')
                    data_version = l[1]
                # item lines
                # 如果是[Term] 说明是一个新的GO term  把新的GO ID写入字典
                if line == '[Term]':
                    if obj is not None:
                        ont[obj['id']] = obj
                    obj = dict()
                    # 为该GO建立字典，继续存储GO相关信息 主要涉及它相关的五个关系 
                    # four types of relations to others: is a, part of, has part, or regulates
                    obj['is_a'] = list()
                    obj['part_of'] = list()
                    obj['relationship'] = list()
                    # alternative GO term id
                    obj['alt_ids'] = list()  # 替代GO ID
                    # is_obsolete
                    obj['is_obsolete'] = False
                    continue
                # 如果是[Typedef] 说明是一个类型定义的开始
                elif line == '[Typedef]':
                    if obj is not None:
                        ont[obj['id']] = obj
                    obj = None
                # 否则，这一行表示术语的属性
                else:
                    if obj is None:
                        continue
                    l = line.split(': ')
                    if l[0] == 'id':
                        obj['id'] = l[1]
                    elif l[0] == 'alt_id':
                        obj['alt_ids'].append(l[1])
                    elif l[0] == 'namespace':
                        obj['namespace'] = l[1]
                    elif l[0] == 'is_a':
                        obj['is_a'].append(l[1].split(' ! ')[0])
                    elif with_rels and l[0] == 'relationship':
                        it = l[1].split()
                        # add all types of relationships revised
                        if it[0] == 'part_of':
                            obj['part_of'].append(it[1])
                        obj['relationship'].append([it[1], it[0]])
                    elif l[0] == 'name':
                        obj['name'] = l[1]
                    # is_obsolete 过时GO ID
                    elif l[0] == 'is_obsolete' and l[1] == 'true':
                        obj['is_obsolete'] = True
            if obj is not None:
                ont[obj['id']] = obj
        # dealing with alt_ids, why
        for term_id in list(ont.keys()):
            # 如果包含替代GO ID  那么将替代GO ID也加入到字典中
            if include_alt_ids:
                for t_id in ont[term_id]['alt_ids']:
                    ont[t_id] = ont[term_id]
            # 如果GO ID是过时的  那么将该GO ID从字典中删除
            if ont[term_id]['is_obsolete']:
                del ont[term_id]
        # is_a -> children
        # 对于每一个GO ID  如果有children 那么将children加入到字典中
        # 然后把这个GO的所有is_a part of 关系的GO ID加入到children中
        for term_id, val in ont.items():
            if 'children' not in val:
                val['children'] = set()
            for p_id in val['is_a'] + val['part_of']:
                if p_id in ont:
                    if 'children' not in ont[p_id]:
                        ont[p_id]['children'] = set()
                    ont[p_id]['children'].add(term_id)
        return ont, format_version, data_version

    # ------------------------------------
    def has_term(self, term_id):
        return term_id in self.ont

    def get_term(self, term_id):
        if self.has_term(term_id):
            return self.ont[term_id]
        return None

    def calculate_ic(self, annots):
        cnt = Counter()
        for x in annots:
            cnt.update(x)
        self.ic = {}
        for go_id, n in cnt.items():
            parents = self.get_parents(go_id)
            if len(parents) == 0:
                min_n = n
            else:
                min_n = min([cnt[x] for x in parents])

            self.ic[go_id] = math.log(min_n / n, 2)

    def get_ic(self, go_id):
        if self.ic is None:
            raise Exception('Not yet calculated')
        if go_id not in self.ic:
            return 0.0
        return self.ic[go_id]

    # revised 'part_of'
    # 获得GO的所有祖先  这个获取的GO关系更加广泛 获得了GO的父元素，以及父元素的所有父元素 相当于把这个种族的家族都拿遍了
    def get_ancestors(self, term_id):
        if term_id not in self.ont:
            return set()
        term_set = set()
        q = deque()  # 双端队列
        q.append(term_id)  # 先将当前的GO加入队列中
        # 当队列不为空时，从队列中弹出一个GO术语，如果这个GO不在term_set中，就加入，并把这个GO的所有父元素加入队列
        while (len(q) > 0):
            t_id = q.popleft()
            if t_id not in term_set:
                term_set.add(t_id)
                for parent_id in (self.ont[t_id]['is_a'] +
                                  self.ont[t_id]['part_of']):
                    if parent_id in self.ont:
                        q.append(parent_id)
        # terms_set.remove(term_id)
        return term_set

    # revised
    # 获得GO的父节点  这个获取是小范围的，只获得与指定GO关系为is_a part of 的GO
    def get_parents(self, term_id):
        if term_id not in self.ont:
            return set()
        term_set = set()
        # 获得GO ID的所有父元素： 父元素 与这个GO关系为is_a part of的GO
        for parent_id in (self.ont[term_id]['is_a'] +
                          self.ont[term_id]['part_of']):
            if parent_id in self.ont:
                term_set.add(parent_id)
        return term_set

    # get the root terms(only is_a)
    # 获得的是GO的根祖先 即GO的is_a关系以及父节点的所有is_a关系  就是祖先集合的只包含is_a关系的版本 近亲
    def get_root_ancestors(self, term_id):
        if term_id not in self.ont:
            return set()
        term_set = set()
        q = deque()
        q.append(term_id)
        while (len(q) > 0):
            t_id = q.popleft()
            if t_id not in term_set:
                term_set.add(t_id)
                for parent_id in self.ont[t_id]['is_a']:
                    if parent_id in self.ont:
                        q.append(parent_id)
        # terms_set.remove(term_id)
        return term_set

    # 获得GO的所有根元素
    def get_roots(self, term_id):
        if term_id not in self.ont:
            return set()
        root_set = set()
        for term in self.get_root_ancestors(term_id): # 遍历该GO的所有根祖先
            if term not in self.ont:
                continue
            # 如果该GO的父元素为空 那么就是根元素
            if len(self.get_parents(term)) == 0:
                root_set.add(term)

        return root_set

    def get_namespace_terms(self, namespace):
        terms = set()
        for go_id, obj in self.ont.items():
            if obj['namespace'] == namespace:
                terms.add(go_id)
        return terms

    def get_namespace(self, term_id):
        return self.ont[term_id]['namespace']

    # all children
    # 获得该GO的所有孩子元素 BFS搜索 从term_id开始逐层向下搜索所有的子术语 直到没有  这个获得了所有的孩子 即获得GO的子元素后，又纳入子元素的子元素
    def get_term_set(self, term_id):
        if term_id not in self.ont:
            return set()
        term_set = set()
        q = deque()
        q.append(term_id)
        while len(q) > 0:
            t_id = q.popleft()
            if t_id not in term_set:
                term_set.add(t_id)
                for ch_id in self.ont[t_id]['children']:
                    q.append(ch_id)
        return term_set

    # only one layer children
    # 获得该GO的一层孩子元素  只获得一层孩子元素
    def get_child_set(self, term_id):
        if term_id not in self.ont:
            return set()
        term_set = set()
        if term_id not in term_set:
            for ch_id in self.ont[term_id]['children']:
                term_set.add(ch_id)
        return term_set


# ------------------------------------------------------------------------------------------
# functions for evaluation
def get_matrix(labels, preds, threshold=0.3):
    preds = preds.flatten()
    preds[preds >= threshold] = 1
    preds = preds.astype('int8')
    tn, fp, fn, tp = confusion_matrix(labels.flatten(), preds).ravel()
    return tn, fp, fn, tp


def get_level_matrix(labels, preds, level, threshold=0.3):
    preds = preds[..., level]
    preds = preds.flatten()
    preds[preds >= threshold] = 1
    preds = preds.astype('int8')
    labels = labels[..., level]
    tn, fp, fn, tp = confusion_matrix(labels.flatten(), preds).ravel()
    return tn, fp, fn, tp


def compute_roc(labels, preds):
    # Compute ROC curve and ROC area for each class
    fpr, tpr, _ = roc_curve(labels.flatten(), preds.flatten())
    roc_auc = auc(fpr, tpr)
    return roc_auc


def compute_aupr_level(labels, preds, level):
    labels = labels[..., level]
    preds = preds[..., level]
    precision, recall, _ = precision_recall_curve(labels.flatten(),
                                                  preds.flatten())
    aupr = auc(recall, precision)
    return aupr


def compute_aupr(labels, preds):
    precision, recall, _ = precision_recall_curve(labels.flatten(),
                                                  preds.flatten())
    aupr = auc(recall, precision)
    return aupr


# set random seed
def set_random_seed(seed=10, deterministic=False, benchmark=False):
    # random.seed(seed)
    # np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    if deterministic:
        torch.backends.cudnn.deterministic = True
    if benchmark:
        torch.backends.cudnn.benchmark = True


# contrast training
class con_pair_dataset(Dataset):
    def __init__(self,
                 con_pair,
                 contrast_dict,
                 terms,
                 terms_dict,
                 neg_num=80,
                 neg=0.5,
                 neg1_len=0.25):
        super().__init__()
        self.len_df = len(con_pair)
        self.n_cc = list(contrast_dict['n_cc'])
        self.n_bp = list(contrast_dict['n_bp'])
        self.n_mf = list(contrast_dict['n_mf'])
        self.terms = terms
        self.contrast_dict = contrast_dict
        self.terms_dict = terms_dict
        self.neg_num = neg_num
        self.con_pair = con_pair
        self.neg = neg
        self.neg1_len = neg1_len

    def __len__(self):
        return self.len_df

    def __getitem__(self, idx):
        terms_list = [self.con_pair[idx][0], self.con_pair[idx][1]]
        negs1 = set()
        neg1_len = min(len(self.con_pair[idx][2][0]),
                       int(self.neg_num * self.neg1_len))
        if neg1_len > 0:
            negs1 = set(random.sample(self.con_pair[idx][2][0], k=neg1_len))
        negs1 = list(negs1)
        random.shuffle(negs1)

        negs2 = set()
        neg2_len = int((self.neg_num - neg1_len) * self.neg)
        if len(self.contrast_dict[self.con_pair[idx][0]]) <= neg2_len:
            negs2 = negs2 | set(
                random.sample(self.contrast_dict[self.con_pair[idx][0]],
                              k=len(
                                  self.contrast_dict[self.con_pair[idx][0]])))
            negs2 = negs2 | set(
                random.sample(self.contrast_dict[self.con_pair[idx][0]],
                              k=neg2_len -
                              len(self.contrast_dict[self.con_pair[idx][0]])))
        else:
            negs2 = negs2 | set(
                random.sample(self.contrast_dict[self.con_pair[idx][0]],
                              k=neg2_len))
        negs2 = list(negs2)
        random.shuffle(negs2)

        neg_len = neg1_len + neg2_len
        neg_num = self.neg_num - neg_len
        negs3 = set()
        if self.contrast_dict[self.terms[terms_list[0]]] == 'GO:0005575':
            while len(negs3) < neg_num // 3:
                m = randint(0, len(self.n_mf) - 1)
                if self.terms_dict[self.n_mf[m]] not in negs3:
                    negs3.add(self.terms_dict[self.n_mf[m]])
            while len(negs3) < neg_num:
                m = randint(0, len(self.n_bp) - 1)
                if self.terms_dict[self.n_bp[m]] not in negs3:
                    negs3.add(self.terms_dict[self.n_bp[m]])
        elif self.contrast_dict[self.terms[terms_list[0]]] == 'GO:0003674':
            while len(negs3) < neg_num // 5:
                m = randint(0, len(self.n_cc) - 1)
                if self.terms_dict[self.n_cc[m]] not in negs3:
                    negs3.add(self.terms_dict[self.n_cc[m]])
            while len(negs3) < neg_num:
                m = randint(0, len(self.n_bp) - 1)
                if self.terms_dict[self.n_bp[m]] not in negs3:
                    negs3.add(self.terms_dict[self.n_bp[m]])
        elif self.contrast_dict[self.terms[terms_list[0]]] == 'GO:0008150':
            while len(negs3) < neg_num // 3:
                m = randint(0, len(self.n_cc) - 1)
                if self.terms_dict[self.n_cc[m]] not in negs3:
                    negs3.add(self.terms_dict[self.n_cc[m]])
            while len(negs3) < neg_num:
                m = randint(0, len(self.n_mf) - 1)
                if self.terms_dict[self.n_mf[m]] not in negs3:
                    negs3.add(self.terms_dict[self.n_mf[m]])
        negs3 = list(negs3)
        random.shuffle(negs3)

        neg1_num = [neg1_len for i in range(neg1_len)]
        neg2_num = [neg2_len for i in range(neg2_len)]
        neg3_num = [neg_num for i in range(neg_num)]
        neg_num = neg1_num + neg2_num + neg3_num
        neg_num = 1 / np.array(neg_num)
        terms_list = terms_list + negs1 + negs2 + negs3
        return torch.LongTensor(terms_list).view(
            len(terms_list)), torch.from_numpy(neg_num)


调用GO类 计算处理GO官网下载得到的obo数据 用于获得每个GO的祖先 ID

In [None]:
import argparse
import pickle
from collections import defaultdict as ddt


parser = argparse.ArgumentParser(description='extract all terms in a go.obo file.',
                                 add_help=False)
parser.add_argument('--go-file',
                    '-gf',
                    default=r'E:\xj\Paper\PO2GO\protein-annotation-master\data\go-basic.obo',
                    type=str,
                    help='go file downloaded from Gene Ontology website')
parser.add_argument('--terms-file',
                    '-tf',
                    default=r'E:\xj\Paper\PO2GO\protein-annotation-master\data\terms_all.pkl',
                    type=str,
                    help='A DataFrame stored all terms')
parser.add_argument('--out-ancestor',
                    '-oa',
                    default=r'E:\xj\Paper\PO2GO\protein-annotation-master\data\go_ancestor.pkl',
                    type=str,
                    help='output file for ancestor')
parser.add_argument('--out-children',
                    '-oc',
                    default=r'E:\xj\Paper\PO2GO\protein-annotation-master\data\go_children.pkl',
                    type=str,
                    help='output file for children')



def main(go_file, terms_all_file, out_ancestor, out_children):
    # INPUT FILES
    go = Ontology(go_file, with_rels=True, include_alt_ids=False)
    # 读取所有GO ID
    with open(terms_all_file, 'rb') as fd:
        terms = pickle.load(fd)
        terms = list(terms['terms'])
    terms_set = set(terms)  # GO ID 集合
    terms_dict = {v: i for i, v in enumerate(terms)}  # 构建GO映射 如{'GO:0000001': 0, 'GO:0000002': 1, ...}
    # one layer parents, no self
    # parents_dict 获得并存储每个GO术语的父元素—— is_a part of
    parents_dict = ddt(set)  # 创建默认空字典 如果访问key不存在时不报错，返回默认值
    for i in range(len(terms)):
        parents_dict[terms[i]] = terms_set.intersection(go.get_parents(terms[i]))
    
    # all ancestors, no self
    # ancestor_dict 获得每个GO的所有祖先元素 这个获取比父元素更为广泛 获得的是自己的父元素，以及所有父元素的父元素
    ancestor_dict = ddt(set)
    for i in range(len(terms)):
        temp_set = go.get_ancestors(terms[i])
        temp_set.remove(terms[i])
        ancestor_dict[terms[i]] = terms_set.intersection(temp_set)
    
    # 获得所有GO各自的根元素 一个GO有相关的父元素，通过BFS找到这个GO家族的根元素 可能有多个
    root_dict = ddt(set)
    for i in range(len(terms)):
        root_dict[terms[i]] = go.get_roots(terms[i])
    # 这里简化了字典 只保留了一个根元素 如{'GO:0000001': {1,2,3,4,5}} 变为{'GO:0000001': 1}
    for k, v in root_dict.items():
        root_dict[k] = list(v)[0]
    # 获得所有GO各自的所有子元素 即获得该GO的子元素后，继续获得子元素的子元素，直到没有子元素
    child_dict = ddt(set)
    for i in range(len(terms)):
        child_dict[terms[i]] = terms_set.intersection(go.get_term_set(terms[i]))
    # 获得所有GO各自的所有子元素 这个只有一层子元素 只获得该GO的子元素 不往后获得孙子辈的
    child_one_dict = ddt(set)
    for i in range(len(terms)):
        child_one_dict[terms[i]] = terms_set.intersection(
            go.get_child_set(terms[i]))

    go_ancestor = ddt(list)
    go_children = ddt(list)
    count = 0
    for i in terms:  # 遍历所有 GO ID
        temp_anc_set = ancestor_dict[i]  # 获得该GO的所有祖先元素 即该GO的父元素，以及所有父元素的父元素
        temp_child_set = go.get_term_set(i)  # 获得该GO的所有子元素 即该GO的子元素，以及所有子元素的子元素

        go_ancestor[i] = list(temp_anc_set)
        go_children[i] = list(temp_child_set)
        print('{} is ok'.format(count))
        count += 1


    with open(r'E:\xj\Paper\PO2GO\protein-annotation-master\data\go_ancestor.pkl', 'wb') as fd:
        pickle.dump(go_ancestor, fd)

    with open(r'E:\xj\Paper\PO2GO\protein-annotation-master\data\go_children.pkl', 'wb') as fd:
        pickle.dump(go_children, fd)


if __name__ == '__main__':
    # args = parser.parse_args()
    # main(args.go_file, args.terms_file, args.out_ancestor, args.out_children)

    import pickle

    with open(r'E:\xj\Paper\PO2GO\protein-annotation-master\data\go_ancestor.pkl', 'rb') as f:
        ancestor_data = pickle.load(f)
    print(len(ancestor_data))

In [None]:
# 读取pkl文件 查看

import pickle

# 读取pkl文件
with open(r'E:\xj\MyPaper\MyPaper\paper1\GO\go_ancestor.pkl', 'rb') as f:
    go_ancestors = pickle.load(f)
print(go_ancestors['GO:0000001'])

蛋白质的GO信息获取

先通过序列BLAST比对获得uniprot id
方法： 下载uniprot数据库 使用本地BLAST对比 获得blast输出结果

blast输出结果转csv 

In [57]:
import csv

def read_blast_results(blast_file):
    with open(blast_file, 'r') as f:
        # 只保留前三列
        return [line.strip().split('\t')[:3] for line in f]

def write_blast_results_to_csv(blast_results, csv_file):
    with open(csv_file, 'w', newline='') as f:
        writer = csv.writer(f)
        # 写入列名
        writer.writerow(['SampleID', 'MatchID', 'Similarity'])
        # 写入数据
        writer.writerows(blast_results)

# 读取BLAST结果
blast_results = read_blast_results(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_100.blast')

# 将BLAST结果写入CSV文件
write_blast_results_to_csv(blast_results, r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_blast_100.csv')

对csv文件进行去重和相似度划分 找到uniprot比对到的蛋白质和没有比对到的蛋白质

In [58]:
import pandas as pd

# 读取CSV文件
df = pd.read_csv(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_blast_100.csv')

# 对样本ID进行排序，然后按照样本ID进行分组
groups = df.sort_values('Similarity', ascending=False).groupby('SampleID')

# 对每个组，只保留相似度最高的行
df_unique = groups.first().reset_index()

# 将结果写入新的CSV文件
df_unique.to_csv(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_blast_100.csv', index=False)

处理csv文件，获得uniprot.txt文件，用于脚本自动化搜索获得GO

In [59]:
import pandas as pd

# 读取CSV文件
df_100 = pd.read_csv(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_blast_100.csv')

# # 选择Similarity列等于100的行
# df_100 = df[df['Similarity'] == 100]

# 从MatchID列切出Uniprot ID
df_100['UniprotID'] = df_100['MatchID'].apply(lambda x: x.split('|')[1])

# 将Uniprot ID写入TXT文件
df_100['UniprotID'].to_csv(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_blast_100_uniprot_ids.txt', index=False, header=False)

脚本自动化搜索获得GO
可以直接在uniprot上批量查询下载txt文件

处理text文件 获得对应的GO

In [60]:
import re

def get_go_annotations_from_text(text):
    go_annotations = []
    lines = text.split("\n")
    for line in lines:
        if line.startswith("DR   GO"):
            # Extracting GO annotations from the DR line
            go_info = line.split(";")[1].strip()
            go_annotations.append(go_info)
    return go_annotations

def main(input_file, output_file):
    with open(input_file, "r") as infile, open(output_file, "w") as outfile:
        text = infile.read()
        # 使用正则表达式分割文本，获取每个Uniprot ID的信息
        uniprot_infos = re.split("//\n", text)
        for uniprot_info in uniprot_infos:
            # 获取Uniprot ID
            match = re.search("AC   (\w+);", uniprot_info)  # 修改这里，从"AC"行获取Uniprot ID
            if match:
                uniprot_id = match.group(1)
                # 获取GO注释
                go_annotations = get_go_annotations_from_text(uniprot_info)
                outfile.write(f"{uniprot_id}: {', '.join(go_annotations)}\n")

if __name__ == "__main__":
    input_file_path = r"D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_blast_100_uniprot_search_result.txt"  # 替换为实际的输入文件路径
    output_file_path = r"D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_100_uniprot_go_conform.txt"  # 替换为实际的输出文件路径

    main(input_file_path, output_file_path)

然后UniProt ID映射回原样本

In [61]:
import pandas as pd

# 假设原始数据文件和映射文件的路径
original_txt_path = r"D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_100_uniprot_go_conform.txt"
mapping_csv_path = r"D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_blast_100.csv"
updated_txt_path = r"D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_protein2go.txt"

# 读取映射文件
id_mapping_df = pd.read_csv(mapping_csv_path)

# 创建从UniProt ID到新样本ID的映射字典
id_mapping = {}
for _, row in id_mapping_df.iterrows():
    uni_prot_id = row['MatchID'].split('|')[1]  # 提取UniProt ID
    if uni_prot_id not in id_mapping:
        id_mapping[uni_prot_id] = []
    id_mapping[uni_prot_id].append(row['SampleID'])

# 读取原始txt文件
with open(original_txt_path, 'r') as file:
    lines = file.readlines()

# 更新样本ID
updated_lines = []
for line in lines:
    if ': GO' in line:
        parts = line.strip().split(': ', 1)  # 分割UniProt ID和GO信息
    else:
        parts = line.strip().split(':', 1)  # 分割UniProt ID和GO信息
    uni_prot_id = parts[0]
    go_info = parts[1] if len(parts) > 1 else ''
    # if uni_prot_id in id_mapping:
    #     # 对于每个匹配的UniProt ID，为每个对应的样本ID创建一行
    #     for sample_id in id_mapping[uni_prot_id]:
    #         updated_line = f"{sample_id}: {go_info}\n"  # 保持原始格式不变
    #         updated_lines.append(updated_line)
    # else:
    #     # 如果没有映射，保留原行
    #     updated_lines.append(line)
    # 对于每个匹配的UniProt ID，为每个对应的样本ID创建一行
    for sample_id in id_mapping[uni_prot_id]:
        updated_line = f"{sample_id}: {go_info}\n"  # 保持原始格式不变
        updated_lines.append(updated_line)

# 将更新后的数据写入新的txt文件
with open(updated_txt_path, 'w') as file:
    file.writelines(updated_lines)

得到GO数据库的所有GO各自祖先信息，以及蛋白质对应的GO ID
下一步将每个蛋白质的GO归纳到UniProt的GO描述小类中 这里划分了BP MF CC三类

In [62]:
import pickle

# 读取pkl文件
with open(r'D:\MyPaper\paper1\data\go_ancestor.pkl', 'rb') as f:
    go_ancestors = pickle.load(f)

# 读取txt文件
with open(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_protein2go.txt', 'r') as f:
    lines = f.readlines()

# 以下GO ID 来自UniProtKB GOA的GO ID
mf_keys = ['GO:0001618', 'GO:0003677', 'GO:0003723', 'GO:0003774', 'GO:0003824', 'GO:0003924', 'GO:0005198', 'GO:0005215', 'GO:0008092', 
           'GO:0008289', 'GO:0009975', 'GO:0016209', 'GO:0016491', 'GO:0016740', 'GO:0016787', 'GO:0016829', 'GO:0016853', 'GO:0016874', 
           'GO:0031386', 'GO:0038024', 'GO:0042393', 'GO:0044183', 'GO:0045182', 'GO:0045735', 'GO:0048018', 'GO:0060089', 'GO:0060090',
           'GO:0090729', 'GO:0098631', 'GO:0098772', 'GO:0120274', 'GO:0140096', 'GO:0140097', 'GO:0140098', 'GO:0140104', 'GO:0140110',
           'GO:0140223', 'GO:0140299', 'GO:0140313', 'GO:0140657', 'GO:0003674']

bp_keys = ['GO:0000278', 'GO:0000910', 'GO:0002181', 'GO:0002376', 'GO:0003012', 'GO:0003013', 'GO:0003014', 'GO:0003016', 'GO:0005975',
           'GO:0006091', 'GO:0006260', 'GO:0006281', 'GO:0006310', 'GO:0006325', 'GO:0006351', 'GO:0006355', 'GO:0006399', 'GO:0006457',
           'GO:0006486', 'GO:0006520', 'GO:0006575', 'GO:0006629', 'GO:0006766', 'GO:0006790', 'GO:0006886', 'GO:0006913', 'GO:0006914',
           'GO:0006954', 'GO:0007005', 'GO:0007010', 'GO:0007018', 'GO:0007031', 'GO:0007040', 'GO:0007059', 'GO:0007155', 'GO:0007163',
           'GO:0012501', 'GO:0015979', 'GO:0016071', 'GO:0016073', 'GO:0016192', 'GO:0022414', 'GO:0022600', 'GO:0023052', 'GO:0030154',
           'GO:0030163', 'GO:0030198', 'GO:0031047', 'GO:0032200', 'GO:0034330', 'GO:0042060', 'GO:0044782', 'GO:0048856', 'GO:0048870',
           'GO:0050877', 'GO:0050886', 'GO:0051604', 'GO:0055085', 'GO:0055086', 'GO:0061007', 'GO:0061024', 'GO:0065003', 'GO:0071554',
           'GO:0071941', 'GO:0072659', 'GO:0098542', 'GO:0098754', 'GO:0140013', 'GO:0140014', 'GO:0140053', 'GO:1901135', 'GO:1901135',
           'GO:0008150']

cc_keys = ['GO:0000228', 'GO:0005576', 'GO:0005615', 'GO:0005618', 'GO:0005634', 'GO:0005635', 'GO:0005654', 'GO:0005694', 'GO:0005730',
           'GO:0005739', 'GO:0005764', 'GO:0005768', 'GO:0005773', 'GO:0005777', 'GO:0005783', 'GO:0005794', 'GO:0005811', 'GO:0005815',
           'GO:0005829', 'GO:0005840', 'GO:0005856', 'GO:0005886', 'GO:0005929', 'GO:0009536', 'GO:0009579', 'GO:0030312', 'GO:0031012',
           'GO:0031410', 'GO:0043226', 'GO:0005575']

protein_go = {}  # 创建一个字典 用于存储蛋白质的GO信息

protein_conform_go = {}  

# 创建一个函数来根据One-hot向量的索引找到对应的GO id
def get_go_ids(one_hot_vector, go_ids):
    return [go_ids[i] for i, val in enumerate(one_hot_vector) if val == 1]


# 大循环遍历每一行 每一行包含蛋白质的ID 以及对应的GO list
for line in lines:
    mf_mutil_vec = [0]*len(mf_keys)  # 创建该蛋白质的MF MUTI-HOT VECTOR
    bp_mutil_vec = [0]*len(bp_keys)  # 创建该蛋白质的BP MUTI-HOT VECTOR
    cc_mutil_vec = [0]*len(cc_keys)  # 创建该蛋白质的CC MUTI-HOT VECTOR
    mf_one_vec = [0]*len(mf_keys)  # 创建该蛋白质的MF ONE-HOT VECTOR
    bp_one_vec = [0]*len(bp_keys)  # 创建该蛋白质的BP ONE-HOT VECTOR
    cc_one_vec = [0]*len(cc_keys)  # 创建该蛋白质的CC ONE-HOT VECTOR
    
    # 分割每一行为UniPort ID和GO信息
    parts = line.strip().split(': ')
    # 如果len(parts) < 2 说明这个蛋白质没有GO 构建成全0的向量即可
    if len(parts) < 2:
                # print(f"Invalid line: {parts[0]}")
                id = parts[0][:-1]
                protein_go[id] = {'MF': {'mutil': mf_mutil_vec, 'one': mf_one_vec},
                              'BP': {'mutil': bp_mutil_vec, 'one': bp_one_vec},
                              'CC': {'mutil': cc_mutil_vec, 'one': cc_one_vec}}
                continue
    uniprot_id = parts[0]
    go_info = parts[1]
    # 将GO信息分割为多个GO
    go_list = go_info.split(', ')  # 获得该蛋白质的GO LIST
    

    # 循环遍历该蛋白质的每个GO ID  填充到三个对应的vec中
    for go in go_list:
        # 如果该蛋白质的这个GO id 在GO数据库中 即在go_ancestors中 
        if go in go_ancestors.keys():
            # MF 类填充
            for index, mf_key in enumerate(mf_keys):  # 遍历MF的所有小类的GO id
                if mf_key in  go_ancestors[go]:  # 如果当前遍历的MF小类的GO id 在该蛋白质的这个GO的祖先列表中 说明该GO属于这个小类 is_a / part of回溯到了这个叶子节点
                    mf_mutil_vec[index] += 1  # 所以在mutil_vec中，当前索引位置+1
                    mf_one_vec[index] = 1  # 在one_vec中，当前索引位置修改为1
            # BP 类填充
            for index, bp_key in enumerate(bp_keys):
                if bp_key in go_ancestors[go]:
                    bp_mutil_vec[index] += 1
                    bp_one_vec[index] = 1
            # CC 类填充
            for index, cc_key in enumerate(cc_keys):
                if cc_key in go_ancestors[go]:
                    cc_mutil_vec[index] += 1
                    cc_one_vec[index] = 1
        else:
            print(f"GO not found: {go}")
    
    # 现在获得了这个蛋白质的MF BP CC的MUTIL-HOT VECTOR 和 ONE-HOT VECTOR 下一步要把这三类存储到字典中
    protein_go[uniprot_id] = {'MF': {'mutil': mf_mutil_vec, 'one': mf_one_vec},
                              'BP': {'mutil': bp_mutil_vec, 'one': bp_one_vec},
                              'CC': {'mutil': cc_mutil_vec, 'one': cc_one_vec}}
    # 存储有对应 GO ID 的项
    go_mf_ids = [mf_keys[i] for i in range(len(mf_keys)) if mf_one_vec[i] == 1]
    go_bp_ids = [bp_keys[i] for i in range(len(bp_keys)) if bp_one_vec[i] == 1]
    go_cc_ids = [cc_keys[i] for i in range(len(cc_keys)) if cc_one_vec[i] == 1]
    go_total = go_mf_ids + go_bp_ids + go_cc_ids
    protein_conform_go[uniprot_id] = go_total

with open(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_100_uniprot_conform_protein2go.txt', 'w') as f:
    for uniprot_id, go_total in protein_conform_go.items():
        f.write(f"{uniprot_id}: {', '.join(go_total)}\n")


# 将protein_go字典存储到pkl文件中
with open(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_100_uniprot_go_conform.pkl', 'wb') as f:
    pickle.dump(protein_go, f)
    

读取pkl文件检查

In [84]:
with open(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_test_uniprot_go_conform_mutil_concat.pkl', 'rb') as f:
    protein_go = pickle.load(f)
for key, value in protein_go.items():
    print(f"Key: {key}, Value: {len(value)}")
    break

Key: acpS, Value: 144


拼接成一个GO向量 存成和原先GO_get中的pkl一致的版本 mutil和one分别存两版
如果想要整合直接训 就直接读取直接训
如果想要三类分开学 就用切片切开 
拼接统一按MF+BP+CC 三类都是定长的

In [83]:
import pickle
import numpy as np

with open(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_train_100_uniprot_go_conform.pkl', 'rb') as f:
    protein_go = pickle.load(f)
protein_go_mutil = {}
protein_go_one = {}
for key, value in protein_go.items():
    protein_go_mutil[key] = []
    protein_go_one[key] = []
    # 按照MF BP CC三类进行拼接
    for go_type in ['MF', 'BP', 'CC']:
        protein_go_mutil[key].extend(value[go_type]['mutil'])  # 将当前蛋白质的MF BP CC的MUTIL-HOT VECTOR拼接到一起
        protein_go_one[key].extend(value[go_type]['one'])  # 将当前蛋白质的MF BP CC的ONE-HOT VECTOR拼接到一起
    # 转换为 NumPy 数组
    protein_go_mutil[key] = np.array(protein_go_mutil[key])
    protein_go_one[key] = np.array(protein_go_one[key])

# 将拼接后的MUTIL-HOT VECTOR 和 ONE-HOT VECTOR 存储到pkl文件中
with open(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_train_uniprot_go_conform_mutil_concat.pkl', 'wb') as f:
    pickle.dump(protein_go_mutil, f)

with open(r'D:\MyPaper\paper1\data\eSOL\eSOL_go\eSOL_train_uniprot_go_conform_onehot_concat.pkl', 'wb') as f:
    pickle.dump(protein_go_one, f)

接上GO_get的后续ID替换拼接部分

读取pkl文件查看结果

In [80]:
with open(r'E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_mutil_concat_E.coli_3100_1_900_rowid.pkl', 'rb') as f:
    protein_go_rowid = pickle.load(f)
for key, value in protein_go.items():
    print(f"Key: {key}, Value: {value}")
    break

with open(r'E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_mutil_concat_E.coli_3100_1_900.pkl', 'rb') as f:
    protein_go = pickle.load(f)
for key, value in protein_go.items():
    print(f"Key: {key}, Value: {value}")
    break

print(protein_go_rowid['BSGC-BSGCAIR30353'] == protein_go['Q50336'])



Key: P07550, Value: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 5, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 33, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 5, 8, 12]
Key: P07550, Value: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 5, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 33, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 5, 8, 12]
True


将pickle文件的id对应到原数据的id 便于下一步和fasta文件对应，concat fea vec

In [66]:
import pandas as pd

# 读取CSV文件
train_df = pd.read_csv(r"D:\MyPaper\paper1\MutSol\train_mut_blast_1.csv")
# test_df = pd.read_csv(r"E:\xj\MyPaper\MyPaper\paper1\data\E.coli\nesg_3100\test_nesg_new_blast_1.csv")
# 创建从Uniprot ID到SampleID的映射
# 创建从Uniprot ID到SampleID的映射
train_id_mapping = {}
# test_id_mapping = {}

# 读取pkl文件
with open(r'E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_one_concat_E.coli_3100_1_900.pkl', 'rb') as f:
    train_vectors = pickle.load(f)
# with open(r'E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_one_concat_test_nesg.pkl', 'rb') as f:
#     test_vectors = pickle.load(f)
for _, row in train_df.iterrows():
    id = row['MatchID'].split('|')[1]
    if id not in train_id_mapping:
        train_id_mapping[id] = []
    train_id_mapping[id].append(row['SampleID'])

# for _, row in test_df.iterrows():
#     id = row['MatchID'].split('|')[1]
#     if id not in test_id_mapping:
#         test_id_mapping[id] = []
#     test_id_mapping[id].append(row['SampleID'])

# 更新训练向量的ID
train_vectors_updated = {}
for id, vector in train_vectors.items():
    if id in train_id_mapping:
        sample_ids = train_id_mapping[id]
        for sample_id in sample_ids:
            train_vectors_updated[sample_id] = vector

# # 更新测试向量的ID
# test_vectors_updated = {}
# for id, vector in test_vectors.items():
#     if id in test_id_mapping:
#         sample_ids = test_id_mapping[id]
#         for sample_id in sample_ids:
#             test_vectors_updated[sample_id] = vector

# 保存更新后的训练向量
with open(r"E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_one_concat_E.coli_3100_1_900_rowid.pkl", "wb") as f:
    pickle.dump(train_vectors_updated, f)

# with open(r"E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_one_concat_test_nesg_rowid.pkl", "wb") as f:
#     pickle.dump(test_vectors_updated, f)


验证 ID 映射是否正确

In [90]:
import pandas as pd

with open(r'E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_mutil_concat_eSOL_test_rowid.pkl', 'rb') as f:
    protein_go_rowid = pickle.load(f)

with open(r'E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_mutil_concat_eSOL_test.pkl', 'rb') as f:
    protein_go = pickle.load(f)

# 读取CSV文件
df = pd.read_csv(r"E:\xj\MyPaper\MyPaper\paper1\data\temp\eSOL_test.csv")
# 创建从Uniprot ID到SampleID的映射
id_mapping = {}
for _, row in df.iterrows():
    id = row['MatchID'].split('|')[1]
    if id not in id_mapping:
        id_mapping[id] = []
    id_mapping[id].append(row['SampleID'])

for key, value in protein_go.items():
    if protein_go[key] != protein_go_rowid[id_mapping[key][0]]:
        print(f"Key: {key}, Value: {value}, Protein GO: {protein_go[key]}")

    

把GO存到fasta后面

In [71]:
from Bio import SeqIO
import pickle

# 读取pkl文件
with open(r"E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_one_concat_test_nesg_rowid.pkl", "rb") as f:
    train_vectors = pickle.load(f)

# 读取fasta文件
fasta_sequences = list(SeqIO.parse(r"E:\xj\MyPaper\MyPaper\paper1\data\E.coli\nesg_3100\test_set_nesg_new_fea.fasta", "fasta"))

# 更新fasta文件中的feature
for seq in fasta_sequences:
    id = seq.id.split(' ')[0]  # 获取id
    if id in train_vectors:
        vector = train_vectors[id]  # 获取对应的向量
        go_str = 'GO=[' + ', '.join(map(str, vector)) + ']'  # 创建新的GO字符串
        seq.description = seq.description + ' ' + go_str  # 更新description
    else:
        go_str = 'GO=[' + ', '.join(['0']*144) + ']'  # 创建长度为144的全零向量
        seq.description = seq.description + ' ' + go_str  # 更新description

# 保存更新后的fasta文件
SeqIO.write(fasta_sequences, r"E:\xj\MyPaper\MyPaper\paper1\data\E.coli\nesg_3100\test_set_nesg_new_fea_GO_one_concat.fasta", "fasta")

3100

验证是否匹配正确

In [75]:
from Bio import SeqIO
import re
import pickle

# 读取fasta文件
fasta_sequences = list(SeqIO.parse(r"E:\xj\MyPaper\MyPaper\paper1\data\eSOL\eSOL_train_fea_GO_one_concat.fasta", "fasta"))

with open(r"E:\xj\MyPaper\MyPaper\paper1\data\temp\uniprot_go_one_concat_eSOL_train_rowid.pkl", "rb") as f:
    train_vectors = pickle.load(f)

# 使用正则表达式从description中提取label和label_noise
pattern = re.compile(r'label=(\d+) label_noise=(\d+) feature=\[([^\]]+)\] GO=\[([^\]]+)\]')

# 用于存储提取的信息的列表
fasta_go_list=[]        

for seq in fasta_sequences:
    match = pattern.search(seq.description)
    if match:
        id = seq.id.split(' ')[0]  # 获取id
        fasta_go_list = [int(x) for x in match.group(4).split(',')]  # 获取GO列表

        # 如果id在pkl文件中
        if id in train_vectors:
            pkl_go_list = train_vectors[id]  # 获取pkl文件中的GO列表

            # 如果两个GO列表不一致，打印出来
            if set(fasta_go_list) != set(pkl_go_list):
                print(f"Sample {id} GO list not match:")
                print(f"Fasta GO list: {fasta_go_list}")
                print(f"Pkl GO list: {pkl_go_list}")
            else:
                # print(f"Sample {id} GO list match")
                pass
        else:
            print(f"Sample {id} not found in pkl file")
    else:
        print(f"Invalid description: {seq.description}")

Sample ydfP not found in pkl file
Sample yfjO not found in pkl file
Sample ykiB not found in pkl file
Sample ymfH not found in pkl file
Sample ymfO not found in pkl file
