In [None]:
import json
from pathlib import Path
import re

from Bio import SeqIO
import pandas as pd
from tqdm import tqdm


FULLNAMELINEAGE_SQLITE3 = Path('fullnamelineage.sqlite3')
FORMAT = 'qseqid sseqid saccver evalue staxids length qlen slen pident stitle qseq sseq qcovs frames'

In [None]:
isoform_pattern = re.compile(r'(^TRINITY_DN[0-9]+_c[0-9]+_g[0-9]+_i[0-9]+)\.p[0-9]+')

def tdid2tid(tdid: str):
    # transdecoder_id to trinity_id
    if (res := isoform_pattern.search(tdid)) is not None:
        return res.group(1)
    else:
        return None

In [None]:
blast_result_file = Path('./local.blast_result.tsv')
blast_df = pd.read_csv(blast_result_file, delimiter='\t', header=None, names=FORMAT.split(' '))
blast_df = blast_df.astype({'qseqid': str, 'sseqid': str, 'saccver': str, 'evalue': float, 'staxids': str, 'length': int, 'qlen': int, 'slen': int, 'pident': float, 'stitle': str, 'qseq': str, 'sseq': str, 'qcovs': float, 'frames': str})
# blast_df.head()

In [None]:
blast_df['qseqid_trinity'] = [ tdid2tid(x) for x in blast_df['qseqid']]

In [None]:
def select_best_hits(df):
    return_list = []
    grouped = df.groupby('qseqid_trinity')

    for _, group in tqdm(grouped):
        min_evalue_rows = group[group['evalue'] == group['evalue'].min()]
        # if len(min_evalue_rows) > 1:
        max_b_rows = min_evalue_rows[min_evalue_rows['qcovs'] == min_evalue_rows['qcovs'].max()]
        max_pident_rows = max_b_rows[max_b_rows['pident'] == max_b_rows['pident'].max()]

        if len(max_pident_rows) > 1:
            max_pident_rows = max_pident_rows.sample(n=1)
        result = max_pident_rows.to_dict(orient='records')[0]
        return_list.append(result)
    return return_list

df_list = select_best_hits(blast_df)
blast_result_best_evalue_df = pd.DataFrame.from_records(df_list)

In [None]:
def load_IPS(file) -> pd.DataFrame:
    def process_match_and_create_annotations(match):
        signature_db = match['signature']['signatureLibraryRelease']['library']
        if signature_db == 'MOBIDB_LITE':
            return []
        if not match['signature']:
            return []
        entry = match['signature'].get('entry')
        if entry is None:
            return []
        accession = entry["accession"]
        description = entry.get("description") or match.get("name") or ''
        entry_type = entry["type"]

        annotation_entries = []
        for loc in match['locations']:
            location_str = f"{loc['start']}-{loc['end']}"
            evalue_str = str(loc['evalue']) if 'evalue' in loc else ''
            score_str = str(loc['score']) if 'score' in loc else ''

            for _id in ids:
                annotation_dict = {
                    'id': _id,
                    'accession': accession,
                    'evalue': evalue_str,
                    'score': score_str,
                    'description': description,
                    'type': entry_type,
                    'location': location_str,
                    'signature_db': signature_db,
                }
                annotation_entries.append(annotation_dict)

        return annotation_entries

    with open(file, 'r') as json_file:
        annotation_data = json.load(json_file)

    annotations = []
    for result in annotation_data['results']:
        ids = [x['id'] for x in result['xref']]

        for match in result['matches']:
            annotations += process_match_and_create_annotations(match)

    annotation_info = pd.DataFrame(annotations).drop_duplicates()

    return annotation_info


interproscan_df = load_IPS('./interproscan_result.json')

In [None]:
interproscan_df['evalue'] = interproscan_df['evalue'].apply(lambda x: float(x) if x != '' else 1e+100)
interproscan_df['score'] = interproscan_df['score'].apply(lambda x: float(x) if x != '' else 0.0)
interproscan_df['trinity_id'] = [ tdid2tid(x) for x in interproscan_df['id']]

In [None]:
def select_best_hits_inter(df):
    list_ = []
    grouped = df.groupby('trinity_id')

    for _, group in tqdm(grouped):
        min_a_rows = group[group['evalue'] == group['evalue'].min()]
        max_b_rows = min_a_rows[min_a_rows['score'] == min_a_rows['score'].max()]

        if len(max_b_rows) > 1:
            max_b_rows = max_b_rows.sample(n=1)
        result = max_b_rows.to_dict(orient='records')[0]
        list_.append(result)
    return list_

interproscan_best_evalue_list = select_best_hits_inter(interproscan_df)
interproscan_best_evalue_df = pd.DataFrame.from_records(interproscan_best_evalue_list)

In [None]:
blast_join_df = blast_result_best_evalue_df.copy()
blast_join_df['trinity_id'] = blast_join_df['qseqid_trinity'].copy()
blast_join_df.drop(['qseqid_trinity', 'saccver', 'staxids', 'length', 'qlen', 'slen', 'pident', 'qcovs', 'frames'], axis=1, inplace=True)

ipr_join_df = interproscan_best_evalue_df.copy()
ipr_join_df.drop(['accession', 'location', 'signature_db'], axis=1, inplace=True)

merged_df = blast_join_df.merge(ipr_join_df, on='trinity_id', how='outer', suffixes=('_blast', '_ipr'))

In [None]:
def select_row_based_on_evalue(row):
    d_ = {}
    if pd.notnull(row['evalue_blast']) and pd.notnull(row['evalue_ipr']):
        if row['evalue_blast'] <= 1e-40:
            d_ = {'transdecoder_id': row['qseqid'],
                'trinity_id': row['trinity_id'],
                'sseqid': row['sseqid'],
                'stitle': row['stitle'],
                'evalue': row['evalue_blast'],}
        else:
            d_ = {'transdecoder_id': row['id'],
                'trinity_id': row['trinity_id'],
                'sseqid': '',
                'stitle': row['description'],
                'evalue': row['evalue_ipr'],}
    elif pd.notnull(row['evalue_blast']) and pd.isnull(row['evalue_ipr']):
        d_ = {'transdecoder_id': row['qseqid'],
              'trinity_id': row['trinity_id'],
              'sseqid': row['sseqid'],
              'stitle': row['stitle'],
              'evalue': row['evalue_blast'],}
    elif pd.isnull(row['evalue_blast']) and pd.notnull(row['evalue_ipr']):
        d_ = {'transdecoder_id': row['id'],
              'trinity_id': row['trinity_id'],
              'sseqid': '',
              'stitle': row['description'],
              'evalue': row['evalue_ipr'],}
    else:
        raise ValueError('Unexpected value.')
    return d_

d_list = merged_df.apply(select_row_based_on_evalue, axis=1)
selected_df = pd.DataFrame.from_records(d_list)

In [None]:
# Product name
uniprot_pattern = re.compile(r'^sp\|')
uniprot_product_OS_suffix_pattern = re.compile(r'\sOS=.*$')
trichomonas_pattern = re.compile(r'^Trichomonas_vaginalis_G3_PRJNA16084.fasta#')
trepomonas_pattern = re.compile(r'^Trepomonas_sp._PC1_PRJNA288252.fasta#')
giardia_pattern = re.compile(r'^Giardia_intestinalis_PRJNA1439.fasta#')
spironucleus_pattern = re.compile(r'^Spironucleus.salmonicida_PRJNA60811.fasta#')

def processing_blast_stitle(row):
	def remove_saccver_from_stitle(x) -> str:
		return x['stitle'].replace(x['sseqid'], '').strip()
	def extract_product_name(x: str) -> str:
		return x.split(' [')[0].strip()

	if row['sseqid'] != '':  # blast
		product_name = remove_saccver_from_stitle(row)
		if uniprot_pattern.match(row['sseqid']):
			product_name = product_name.split(' OS=')[0]
		else:
			product_name = extract_product_name(product_name)
	else:  # interproscan: row['sseqid'] == ''
		product_name = row['stitle']
	return product_name

selected_df['product_name'] = selected_df.apply(processing_blast_stitle, axis=1)

In [None]:
# For hypothetical proteins
annotated_trinity_id = selected_df['trinity_id'].unique()

trinity_assembly_file = Path('./Trinity.fasta')  # RNA-seq trinity, original
with open(trinity_assembly_file) as f:
    assemblyId2records = SeqIO.to_dict(SeqIO.parse(trinity_assembly_file, 'fasta'))
trinity_assembly_ids = set(assemblyId2records.keys())
unannotated_trinity_ids = trinity_assembly_ids - set(annotated_trinity_id)

In [None]:
pep_file = Path('./longest_orf.pep')
records = list(SeqIO.parse(pep_file, 'fasta'))

In [None]:
unannotated_trinity_id_to_unannotated_transdecoder_record = {}
for record in records:
    trinity_id_ = tdid2tid(record.id)
    if trinity_id_ in unannotated_trinity_ids:
        if unannotated_trinity_id_to_unannotated_transdecoder_record.get(trinity_id_, False):
            unannotated_trinity_id_to_unannotated_transdecoder_record[trinity_id_].append(record)
        else:
            unannotated_trinity_id_to_unannotated_transdecoder_record[trinity_id_] = [record]

In [None]:
min_length = 300
hypo_candidates_transdecoder_ids_300aa = {}
for key_trinity_id, values_td_records in unannotated_trinity_id_to_unannotated_transdecoder_record.items():
	for record in values_td_records:
		len_seq = len(str(record.seq))
		if len_seq >= min_length:
			if not hypo_candidates_transdecoder_ids_300aa.get(key_trinity_id, False):
				hypo_candidates_transdecoder_ids_300aa[key_trinity_id] = record
			else:
				if len_seq > len(str(hypo_candidates_transdecoder_ids_300aa[key_trinity_id].seq)):
					hypo_candidates_transdecoder_ids_300aa[key_trinity_id] = record	
					# print('{} -> {}.'.format(len_seq, len(str(hypo_candidates_transdecoder_ids_300aa[key_trinity_id].seq))))
				else:
					pass
		else:
			pass
hypo_candidates_transdecoder_ids_more_than_300 = {k: v for k, v in sorted(hypo_candidates_transdecoder_ids_300aa.items(), key=lambda item: len(str(item[1].seq)), reverse=True)}

In [None]:
d_list = []
for k, v in hypo_candidates_transdecoder_ids_more_than_300.items():
	d_list.append({'transdecoder_id': v.id, 'trinity_id': k, 'product_name': 'Hypothetical protein', 'evalue': '99.9'})
hyp_df = pd.DataFrame(d_list)

In [None]:
final_df = pd.concat([selected_df, hyp_df], axis=0)
final_df.reset_index(drop=True, inplace=True)
# final_df.head()

In [None]:
product_names_df = final_df[['transdecoder_id', 'trinity_id', 'product_name', 'evalue']]

In [None]:
product_names_df.to_csv('product_names.tsv', sep='\t', index=False)