In [1]:
from pathlib import Path
import pandas as pd

from Bio.SeqRecord import SeqRecord
from Bio.SwissProt import FeatureTable

swiss_file_path = Path("~/database/uniprot/swiss/").expanduser()

In [2]:
df_no_evid = pd.read_table("../analysis/no_evidence_pred_mbps.tsv")
posi_to_resi_species = dict()
for _, row in df_no_evid.iterrows():
    posi_to_resi_species[(row['seq_id'], row['resi_seq_posi_1'])] = (row['resi_1'], row['species'])
    posi_to_resi_species[(row['seq_id'], row['resi_seq_posi_2'])] = (row['resi_2'], row['species'])

records = []
for p, r in posi_to_resi_species.items():
    records.append({
        "seq_id": p[0],
        "species": r[1],
        "resi_seq_num": p[1] + 1,
        "resi": r[0],
    })
df_no_evid_resi = pd.DataFrame(records)
len(df_no_evid_resi)

22447

### get anno info

In [3]:
from Bio import SwissProt
from Bio.Seq import Seq

def gen_swiss_record(source):
    swiss_records = SwissProt.parse(source)
    for swiss_record in swiss_records:
        # Convert the SwissProt record to a SeqRecord
        record = SeqRecord(
            Seq(swiss_record.sequence),
            id=swiss_record.accessions[0],
            name=swiss_record.entry_name,
            description=swiss_record.description,
            features=swiss_record.features,
        )
        for cross_reference in swiss_record.cross_references:
            if len(cross_reference) < 2:
                continue
            database, accession = cross_reference[:2]
            description = cross_reference[2] if len(cross_reference) >= 3 else ""
            
            dbxref = f"{database}; {accession}; {description}"
            if dbxref not in record.dbxrefs:
                record.dbxrefs.append(dbxref)
        yield record

In [4]:
anno_types = {
    "ACT_SITE",
    "BINDING",
    "CBAROHYD",
    "CA_BIND",
    "CROSSLNK",
    "DISULFID",
    "DNA_BIND",
    "LIPID",
    "METAL",
    "MOD_RES",
    "NP_BIND",
    "ZN_FING",
}

def annotate(
    swiss_file: str,
    target_uniprots: set,
) -> pd.DataFrame:
    records = []
    for r in gen_swiss_record(swiss_file):
        r: SeqRecord
        uniprot = r.id
        
        if uniprot not in target_uniprots: continue
        
        for f in r.features:
            f: FeatureTable
            feat_type = f.type
            
            if feat_type not in anno_types: continue
            try: note = f.qualifiers['note']
            except: note = " "
            try: locations = list(f.location)
            except: continue
            
            for posi in locations:
                try:
                    records.append({
                        "seq_id": r.id,
                        "resi_seq_num": posi + 1,
                        "resi": r.seq[posi],
                        "anno_type": feat_type,
                        "anno_note": note,
                    })
                except IndexError:
                    continue
    return pd.DataFrame(records)    

def get_family(
    swiss_file: str,
    target_uniprots: set,
) -> pd.DataFrame:
    records = []
    
    for r in gen_swiss_record(swiss_file):
        r: SeqRecord
        uniprot = r.id
        
        if uniprot not in target_uniprots: continue
        
        pfam_info = ("", "")
        supfam_info = ("", "")
        for dbxref in r.dbxrefs:
            database, id, desc = dbxref.split("; ")
            if database == "Pfam": pfam_info = tuple((id, desc))
            if database == "SUPFAM": supfam_info = tuple((id, desc))
        pfam_id, pfam_desc = pfam_info
        supfam_id, supfam_desc = supfam_info
        
        records.append({
            "seq_id": uniprot,
            "pfam_id": pfam_id,
            "pfam_desc": pfam_desc,
            "supfam_id": supfam_id,
            "supfam_desc": supfam_desc
        })
    return pd.DataFrame(records)

In [5]:
record_dfs = []
for species, df_sp in df_no_evid_resi.groupby(['species']):
    swiss_file = swiss_file_path / f"{species}.txt"
    uniprots = set(df_sp['seq_id'])

    record_dfs.append(annotate(swiss_file, uniprots))
df_anno = pd.concat(record_dfs)
df = pd.merge(df_no_evid_resi, df_anno, on=["seq_id", "resi_seq_num", "resi"], how="left")

  for species, df_sp in df_no_evid_resi.groupby(['species']):


In [6]:
# there may be other annos for pred residues, such as heme, ssbond, etc
anno_seqs = set(df[df['anno_type'].notna()]['seq_id'])
df_filtered = df_no_evid[df_no_evid['seq_id'].map(lambda x: x not in anno_seqs)]

### get filtered result (with family)

In [7]:
record_dfs = []
for species, df_sp in df_filtered.groupby(['species']):
    swiss_file = swiss_file_path / f"{species}.txt"
    uniprots = set(df_sp['seq_id'])

    record_dfs.append(get_family(swiss_file, uniprots))

  for species, df_sp in df_filtered.groupby(['species']):


In [8]:
pd.concat(record_dfs).to_csv("potential_mbps_family.tsv", sep="\t", index=None)
df_filtered.to_csv("potential_mbps.tsv", sep="\t", index=None)