In [1054]:
import pandas as pd
import numpy as np

In [1055]:
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'


def get_phrog(row):
    """ get phrog cluster """
    if 'phrog_' in str(row['target']):
        return int(row['target'].split('_')[-1])
    else:
        return 0
    
    
def curate_columns(row):
    
    phrog = str(row['phrog'])
    alan_profile = str(row['alan_profile'])
    
    if phrog != '0': return phrog
    elif alan_profile != '0': return alan_profile
    else: return '0'

In [1056]:
print('Load tables for testing... ', end='')

phrogs_df = pd.read_csv('tables/phrogs.tsv', sep='\t')
alan_df = pd.read_csv('tables/alan.tsv', sep='\t')
pfam_df = pd.read_csv('tables/pfam.tsv', sep='\t')
ecod_df = pd.read_csv('tables/ecod.tsv', sep='\t')

# metadata tables

# PHROGS
phrogs_annot_df = pd.read_csv('tables/phrog_annot_v4.tsv', sep='\t')
phrogs_annot_df = phrogs_annot_df.fillna('unknown function')

# ALANDB
alan_annot_df = pd.read_csv('tables/alan_annot.tsv', sep=',')
alan_annot_df = alan_annot_df.rename(columns={'definition': 'annot', 'funct': 'category', 'profile': 'alan_profile'})
alan_annot_df = alan_annot_df.drop(['abbrev', 'hmm.name', 'family.name', 'family.name.base', 'where.it.occurs', 'is.custom', 'custom.hmm.exists'], axis=1)

print('Done!')


print('Report hits... ', end='')
print('PHROGS metadata... ', end='')
phrogs_df['phrog'] = phrogs_df.apply(get_phrog, axis=1)
phrogs_df = phrogs_df.merge(phrogs_annot_df, on='phrog', how='left')

print('ALANDB metadata... ', end='')
alan_df['alan_profile'] = alan_df['target']
alan_df = alan_df.merge(alan_annot_df, on='alan_profile', how='left')

print('final table... ', end='')
df = pd.concat([phrogs_df, alan_df, pfam_df, ecod_df], axis=0)
df[['phrog', 'alan_profile']] = df[['phrog', 'alan_profile']].fillna('0')
df['phrog'] = df['phrog'].astype(int)
df['phrog/alan_profile'] = df.apply(curate_columns, axis=1)
df = df.drop(['phrog', 'alan_profile'], axis=1)
print('Done!')

Load tables for testing... Done!
Report hits... PHROGS metadata... ALANDB metadata... final table... Done!


In [1038]:
def report_phrogs(df, max_evalue=10**-3, nfunc2report=5):
    
    """
    1. Filter eval 10**-3
    2. Remove unknown function [REPORT ONLY WHEN NO OTHER FUNCTION]
    3. Remove non-informative functions (lytic tail protein, tail protein, structural protein, virion structural protein, minor tail protein ...) [REPORT ONLY WHEN NO OTHER FUNCTION]
    4. Group by unique functions. For each function (with hits of this function) report independently max bitscore and max qcov
    5. Report best confidence within function and add it to the function string.
    6. Take two functions with highest bitscores.
    6. Report {confidence} {function} in one genbank field, and in seperate field bitscore and qcov.
    7. Report two best PHROG hits seperataly (in total four PHROGS field: 2x function with confidence and 2x params: bitscore and qcov)
    """
    
    print('Add mapping of PHROGs to the MGG specific table curated by Rafal and Bogna (somwhare on Dropbox in custom folder).')
    print('Version number 3_12!!! function.org to function.new')

    #### get informative hits
    noninformative_functions = ['lytic tail protein', 'tail protein', 'structural protein', 'virion structural protein', 'minor tail protein']

    filt_evalue = 'evalue <= @max_evalue'
    filt_unknown = 'annot != "unknown function"'
    fitl_noninformative_functions = '~(annot.isin(@noninformative_functions))'

    query = ' and '.join([filt_evalue, filt_unknown, fitl_noninformative_functions])
    df.query(query)

    
    
    
    
    df = df.sort_values('qcov x bits', ascending=False)
    
    informative_df = df.query('annot != "unknown function" and bits >= @min_bits').copy()
    informative_df['qcov & bits'] = df['qcov'].astype(str) + ' & ' + df['bits'].astype(int).astype(str)
    
#     informative_df = informative_df.groupby(['annot'], as_index=False).agg({'qcov & bits': ' || '.join})
#     functions = [f'{annot} ({qcov_bits})' for annot, qcov_bits in informative_df[['annot', 'qcov & bits']].itertuples(index=False)]

#     functions = functions + ['-'] * nfunc2report

#     # silce
#     try: functions = functions[:nfunc2report]
#     except: pass
        
    return functions


def report_alan(df, min_prob=0.95, nfunc2report=3):

    print("CHECK IF ITS ACCURATE: ERROR IN PC000??1")
    df = df.query('prob >= @min_prob').sort_values('bits', ascending=False)
    # df = df.groupby(['category'], as_index=False).agg({'target': ' || '.join})
    
    df['evalue & bits'] = df['qcov'].astype(str) + ' & ' + df['evalue'].astype(int).astype(str)
    df = df.groupby(['category'], as_index=False).agg({'evalue & bits': ' || '.join})
    functions = [f'{cat} ({qcov_bits})' for cat, qcov_bits in df[['category', 'evalue & bits']].itertuples(index=False)]

    functions = functions + ['-'] * nfunc2report
    
    # silce
    try: functions = functions[:nfunc2report]
    except: pass
        
    return functions

def report_pfam(df, eval_intervals=(10**-10, 10**-5, 10**-3, 1)):
    
    # remove domains of unknown functions
    filt_not_duf = ~ (df['name'].str.contains("DUF"))
    df = df.loc[filt_not_duf].copy()
    
    # get confidence indicators to report
    confidence_intervals = ['*' * i for i in range(len(eval_intervals)).__reversed__()]
    confidence_intervals = confidence_intervals[:-1] + ['!']
    confidence_intervals = [i.ljust(5, ' ') for i in confidence_intervals]

    # report best bit score for lowest evalues (stepwise)
    for confidence, significance in zip(confidence_intervals, eval_intervals):
        try:
            name, evalue, bits = df.loc[df.query('evalue <= @significance')['bits'].idxmax(), ['name', 'evalue', 'bits']].to_list()
            pfamID, func_short, func_detailed = name.split(';')[0].strip(), name.split(';')[1].strip(), name.split(';')[2].strip()
            report = f'{confidence} {func_detailed} [{func_short}] ({pfamID}) ({evalue:.2E}, {int(bits)})'
            break
        
        except ValueError:
            report = 'no significant hit'

    return report


def report_ecod(df, eval_intervals=(10**-10, 10**-5, 10**-3, 1)):
    """ from the most sigfinicat to the none-significat """
    
    # get confidence indicators to report
    confidence_intervals = ['*' * i for i in range(len(eval_intervals)).__reversed__()]
    confidence_intervals = confidence_intervals[:-1] + ['!']
    confidence_intervals = [i.ljust(5, ' ') for i in confidence_intervals]

    # report best bit score for lowest evalues (stepwise)
    for confidence, significance in zip(confidence_intervals, eval_intervals):
        try:
            name, evalue, bits = df.loc[df.query('evalue <= @significance')['bits'].idxmax(), ['name', 'evalue', 'bits']].to_list()
            
            F_INDEX, ecod_levels = name.split('|')[1].strip(), name.split('|')[3]
            T, F = ecod_levels.split(': ')[4].strip(', F'), ecod_levels.split(': ')[5].strip()

            report = f'{confidence} T: {T}, F: {F} ({F_INDEX}) ({evalue:.2E}, {int(bits)})'
            break
        
        except ValueError:
            report = 'no significant hit'

    return report

evalue <= @max_evalue and annot != "unknown function" and ~(annot.isin(@noninformative_functions))


In [1053]:
df.query(f'query == "PC00002" and {query}')

Unnamed: 0,query,target,prob,pvalue,ident,qcov,tcov,bits,qstart,qend,...,tstart,tend,tlength,evalue,db,name,color,annot,category,phrog/alan_profile
6,PC00002,phrog_1015,0.993,1.6e-20,0.12,0.89,0.17,120.7,4,115,...,587,710,725,1.4e-16,PHROGS,phrog_1015 ## NC_024213_p98,#07e9a2,tail protein with lysin activity,tail,1015
7,PC00002,phrog_1259,0.992,6.899999999999999e-20,0.18,0.75,0.37,104.0,20,114,...,122,208,238,5.9e-16,PHROGS,phrog_1259 ## HQ641352_p206,#07e9a2,baseplate hub subunit and tail lysozyme,tail,1259
9,PC00002,phrog_998,0.989,3.3e-17,0.11,0.83,0.36,92.5,12,115,...,21,127,296,2.9e-13,PHROGS,phrog_998 ## NC_015274_p59,#f35f49,endolysin,lysis,998
11,PC00002,phrog_8781,0.985,1.8e-15,0.16,0.73,0.57,76.8,2,93,...,12,103,160,1.6e-11,PHROGS,phrog_8781 ## p117526 VI_00306,#f35f49,endolysin,lysis,8781
12,PC00002,phrog_21357,0.985,3.5e-15,0.2,0.69,0.61,74.2,20,106,...,35,128,155,3.1e-11,PHROGS,phrog_21357 ## p368593 VI_04152,#f35f49,endolysin,lysis,21357
13,PC00002,phrog_13463,0.981,8.9e-14,0.18,0.77,0.3,74.1,8,104,...,17,117,335,8e-10,PHROGS,phrog_13463 ## p82052 VI_05313,#f35f49,endolysin,lysis,13463
15,PC00002,phrog_3861,0.979,3.4e-13,0.13,0.83,0.24,73.2,11,114,...,20,126,449,3e-09,PHROGS,phrog_3861 ## p214236 VI_00869,#f35f49,amidase,lysis,3861
18,PC00002,phrog_489,0.972,3.6e-11,0.18,0.8,0.22,64.8,3,103,...,13,114,470,3.1e-07,PHROGS,phrog_489 ## NC_003288_p65,#f35f49,endolysin,lysis,489
19,PC00002,phrog_12367,0.972,3.6e-11,0.23,0.79,0.41,58.6,3,102,...,14,117,252,3.2e-07,PHROGS,phrog_12367 ## p92424 VI_05198,#f35f49,endolysin,lysis,12367
20,PC00002,phrog_7887,0.972,3.6e-11,0.08,0.72,0.38,59.9,2,92,...,9,97,236,3.1e-07,PHROGS,phrog_7887 ## KY303907_p62,#f35f49,endolysin,lysis,7887


In [1028]:
pcs, pfams, ecods = [], [], []
phrogs1, phrogs2, phrogs3, phrogs4, phrogs5 = [], [], [], [], []
alans1, alans2, alans3= [], [], []

for pcid, pc in df.groupby('query'):

    # default: no hit
    phrog_func1, phrog_func2, phrog_func3, phrog_func4, phrog_func5 = '-', '-', '-', '-', '-'
    alan, pfam, ecod = '-', '-', '-'
    
    for dbid, db in pc.groupby('db'):
        
        # report function
        if dbid == 'PHROGS': phrog_func1, phrog_func2, phrog_func3, phrog_func4, phrog_func5 = report_phrogs(db, min_bits=50, nfunc2report=5)
        elif dbid == 'ALANDB': alan_func1, alan_func2, alan_func3 = report_alan(db, min_prob=0.95)
        elif dbid == 'PFAM': pfam = report_pfam(db)
        elif dbid == 'ECOD': ecod = report_ecod(db)
        else: print('Error')


    pcs.append(pcid)
    
    phrogs1.append(phrog_func1)
    phrogs2.append(phrog_func2)
    phrogs3.append(phrog_func3)
    phrogs4.append(phrog_func4)
    phrogs5.append(phrog_func5)

    alans1.append(alan_func1)
    alans2.append(alan_func2)
    alans3.append(alan_func3)

    pfams.append(pfam)
    ecods.append(ecod)

In [1029]:
results_df = pd.DataFrame({'pc':pcs, 'pfam': pfams, 'ecod': ecods,
                           'phrogs1': phrogs1, 'phrogs2': phrogs2, 'phrogs3': phrogs3, 'phrogs4':phrogs4, 'phrogs5': phrogs5,
                           'alan1': alans1, 'alans2': alans2, 'alans3': alans3})

In [1030]:
results_df.to_csv('/Users/januszkoszucki/Downloads/results.csv', sep='?', index=False)

Unnamed: 0,query,target,prob,pvalue,ident,qcov,tcov,bits,qstart,qend,...,tstart,tend,tlength,evalue,db,name,color,annot,category,phrog/alan_profile


('Phage tail proteins', 'COG4379_4th')

In [476]:
def get_row(row_type, PC='PC0'):
    """ row_type: 'holder' or 'no hit'"""
    indecies = ['query', 'target', 'prob', 'pvalue', 'ident', 'qcov', 'tcov', 'bits', 'qstart', 'qend', 'qlength', 'tstart', 'tend', 'tlength', 'evalue', 'db', 'name', 'color', 'annot', 'category', 'phrog/alan_profile']
    
    if row_type == 'holder':
        values = ['UNK', '0', 0, 0,0,0,0,0,0,0,0,0,0,0,0,'', 'UNK', 'UNK', 'UNK', 'UNK', 'UNK']
        holder_row = pd.Series(values, index=indecies).to_frame().T
        return holder_row

    elif row_type == 'no hit':
        values = [PC, '0', 0, 0,0,0,0,0,0,0,0,0,0,0,0,'no hit', 'no hit', '#ffffff', 'no hit', 'no hit', 'no hit']
        no_hit_row = pd.Series(values, index=indecies).to_frame().T
        return no_hit_row
    

def get_confidence(prob, qcov, tcov, prob_intervals=(0.95, 0.70, 0.50), cov_intervals=(0.80, 0.60, 0.50)):
    """ Based on probability and coverage interals add confidence to the annotation to report (func2report) """
    
    prob_high, prob_medium, prob_low = prob_intervals
    cov_high, cov_medium, cov_low = cov_intervals

    # get confidence
    if prob >= prob_high: brackets = '[]'
    elif prob >= prob_medium: brackets = '()'
    elif prob >= prob_low: brackets = '{}'
    else: brackets = '!!'

    # get coverage size
    if qcov >= cov_high and tcov >= cov_high: size='L'
    elif qcov >= cov_medium and tcov >= cov_medium: size='M'
    elif qcov >= cov_low and tcov >= cov_low: size='S'
    else: size='XS'
    
    return f'{brackets[0]}{size}{brackets[1]}'


def get_hit_and_function2report(df, nfunc2report=1):
    """ alwas take first row to report, but depending on nrows modify func2report"""
    
    ### only one best hit to report
    best_row = df.iloc[0]
    # convert series to dataframe
    best_frame = best_row.to_frame().T
    
    if nfunc2report == 1:

        prob, qcov, tcov, annot = best_row['prob'], best_row['qcov'], best_row['tcov'], best_row['annot']
        
        # get confidence
        confidence = get_confidence(prob, qcov, tcov)
        func2report = f'{confidence} {annot}'
    
    ### report multiple hits
    elif nfunc2report >= 2:

        multiple_function2report = []
        for i, (prob, qcov, tcov, annot) in enumerate(df[['prob', 'qcov', 'tcov', 'annot']].itertuples(index=False)):
            i = i + 1

            # get confidence
            confidence = get_confidence(prob, qcov, tcov)
            multiple_function2report.append(f'F{i}{confidence}: {annot}')
            func2report = ' || '.join(multiple_function2report)
            
            # break depending on number of functions to report
            if i == nfunc2report: break
                                                    
    else: print('Error')
        

    return best_frame, func2report


def get_func2report(df, nfunc2report=1, verbose=True):
    """ get best hit and add confidence to the annotation to report (func2report) """
    
    ### report only first best hit
    if nfunc2report == 1:
        # verbose
        if verbose: print(f'{bcolors.OKGREEN}Reporting first best hit with highest bitscore and single function!{bcolors.ENDC}')
        
        # sort by best bitscore
        df = df.sort_values('bits', ascending=False)

        # get single bests hit
        best_row, func2report = get_hit_and_function2report(df, nfunc2report=nfunc2report)
        
    elif nfunc2report >= 2:
        
        # verbose
        if verbose: print(f'{bcolors.OKCYAN}Reporting first best hit with highest bitscore and mutiple function (n={nfunc2report})!{bcolors.ENDC}')
        
        # only unique functions with highest bitscore each
        unique_functions_df = df.groupby('annot', group_keys=False) \
                                .apply(lambda x: x.loc[x.bits.idxmax()]) \
                                .sort_values('bits', ascending=False).reset_index(drop=True)
        
        # verbose
        if verbose:
            print('\nUNIQUE FUNCTIONS WITH BEST BITSCORES:')
            display(unique_functions_df)
            
        # report these hits (indicate the number based )
        best_row, func2report = get_hit_and_function2report(unique_functions_df, nfunc2report=nfunc2report)
    else: print('Error! You have to have at least one function to report and df cannot be empty!')
        
    # verbose
    if verbose:
        print('\nPROTEIN FAMILY HITS:')
        display(df)
        print('\nPROTEIN FAMILY BEST HIT:')
        display(best_row)
        print(f'FUNCTION TO REPORT: {func2report}\n\n\n\n')

    return best_row, func2report


def report_phrogs(df, prob_intervals, cov_intervals, verbose=True):
    """ ... """
    
    #### informative hits
    informative_df = df.query('annot != "unknown function"')
    
    # reporting one hit
    if informative_df['annot'].nunique() == 1:
        
        if verbose: print(f'{bcolors.OKGREEN}Unique function informative hits! Reporting... {bcolors.ENDC}') # verbose
        best_hit, func2report = get_func2report(informative_df, nfunc2report=1, verbose=verbose)  # report hits
        
        return best_hit, func2report
    
    # reporting more hits
    elif informative_df['annot'].nunique() >= 2:
        
        if verbose: print(f'{bcolors.OKCYAN}Nonunique function informative hits! Reporting... {bcolors.ENDC}')  # verbose
        best_hit, mutilple_func2report = get_func2report(informative_df, nfunc2report=10, verbose=verbose)  # report hits

        return best_hit, mutilple_func2report
    
    else: pass
        

    ### none-informative hits

    # best unknown function
    if df.shape[0]:
        
        if verbose: print(f'{bcolors.WARNING}Unknown function hits only! Reporting... {bcolors.ENDC}')  # verbose
        best_unk_hit, func2report = get_func2report(df, nfunc2report=1, verbose=verbose) # report hits
        return best_unk_hit, func2report
    else:
        if verbose: print(f'{bcolors.FAIL}No significant hits! Reporting... {bcolors.ENDC}')  # verbose
        no_hit, func2report = get_row(row_type='no hit'), 'no hit'
        return no_hit, func2report
    

    # by default holder row
    holder_row, func2report = get_row('holder'), 'holder'
    return holder_row, func2report
