# 13_dramv_functions

This notebook creates and tests functions for summarizing metabolic potential of viral genes identified by DRAM-v.

## Load Packages and example data for testing

In [None]:
import pandas as pd
import math
import glob
import os # these two packages are good for searching and navigating file systems
import os.path as op

pd.set_option('display.max_columns', None)

ex_file_path = '/Users/melissaherring/Google Drive/My Drive/MH_project/dramv/cv1_AM-654-B02/annotations.tsv' # create file path
columns_to_keep = ['Unnamed: 0', 'rank', 'kegg_hit', 'viral_hit', 'pfam_hits', 'vogdb_hits'] # make a list of columns to look at
df = pd.read_csv(ex_file_path, sep = "\t")[columns_to_keep] # read the file and store only the columns from the list as a variable named df

In [None]:
df.head()

## Function 1: get_ann_text

This function formats the text from the following columns: 'viral_hit', 'kegg_hit', 'pfam_hits', 'vogdb_hits'

In [None]:
def get_ann_text(hit_text, column_type = 'viral_hit'): # column_type = 'viral_hit' sets the default column_type as 'viral_hit'; 
                                                        # if column_type isn't specified, then the function assumes it is 'viral_hit'
    '''
    args:
        hit_text: text string from DRAMv for 'viral_hit' column
    returns:
        text string of just annotation information, not organism or hit id
    
    exe_input: YP_004325053.1 hypothetical protein PSSM7_226 [Prochlorococcus phage P-SSM7]
    exe_output: hypothetical protein PSSM7_226
    '''
    
    if type(hit_text) == float: # if the text belongs to the float type, return that text; float = NA in these data
        return hit_text
    
    if column_type == 'viral_hit': # if the column_type is 'viral_hit',
        no_org = hit_text.split("[")[0] # split the text by brackets ('[') and return the first part of the resulting text
        no_acc_id = " ".join(no_org.split(" ")[1:-1]) # join the resulting text from the line above with a space, 
                                                        # then split by a space and return the second part of the text without the 2nd to last character
        return no_acc_id
    
    if column_type in ['kegg_hit']: # if the column_type is 'kegg_hit',
        no_ee = hit_text.split("[")[0].strip() # split the text by brackets and return the first part of the resulting text; strip () removes trailing 
                                                    # space
        return no_ee
    
    if column_type == 'pfam_hits': # if the column_type is 'pfam_hits',
        no_pf_ids = ";".join([text.split("[")[0].strip() for text in hit_text.split(";")]) # split the text by brackets and return the first part of the
                                    # resulting text without the trailing space then split by a semicolon (;) 
                                    # and return all pfam annotations joined using a semicolon (there are multiple annotations in this one column)
        return no_pf_ids
    
    if column_type == 'vogdb_hits': # if the column_type is 'vogdb_hits',
        no_code = hit_text.split(";")[0] # split the text by a semicolon and return the first part of the resulting text
        no_acc = " ".join(no_code.split(" ")[1:]) # split the text from the line above with a space and return the second part of the text joined by a 
                                                    # space
        return no_acc

In [None]:
# get_ann_text test using cv1_AM-654-B02

df['viral_ann_text'] = df['viral_hit'].apply(get_ann_text, args = ('viral_hit',)) # create a new column that is the result of applying the get_ann_text 
                                                                        # function to the 'viral_hit' column using column_type = 'viral_hit'
df['kegg_ann_text'] = df['kegg_hit'].apply(get_ann_text, args = ('kegg_hit',)) # create a new column that is the result of applying the get_ann_text
                                                                        # function to the 'kegg_hit' column using column_type = 'kegg_hit'
df['pfam_ann_text'] = df['pfam_hits'].apply(get_ann_text, args = ('pfam_hits',)) # create a new column that is the result of applying the get_ann_text 
                                                                        # function to the 'pfam_hits' column using column_type = 'pfam_hits'
df['vogdb_ann_text'] = df['vogdb_hits'].apply(get_ann_text, args = ('vogdb_hits',))# create a new column that is the result of applying the get_ann_text 
                                                                        # function to the 'vogdb_hit' column using column_type = 'vogdb_hits'
df

## Function 2: grab_annotation

This function looks at the viral_hit, kegg_hit, pfam_hits, and vogdb_hits columns and decides which annotation to keep moving forward (to avoid overlap when looking at only one column at a time).

In [None]:
def grab_annotation(line):
    col_preference = ['kegg_hit', 'pfam_hits', 'viral_hit', 'vogdb_hits'] # create a list of columns to iterate through

    for col in col_preference: # for each column in the col_preference list,
        if type(line[col]) != str: # if the observation is not a string, continue
            continue
        elif 'hypothetical' not in line[col]: # if the observation is not hypothetical,
            keep_text = get_ann_text(line[col], column_type = col) # apply the get_ann_function using the column_type of the observation 
                                                                    # and store the result in a variable called keep_text
            keep_source = col # and store the observation's column_type in a variable called keep_source
            return keep_text, keep_source
        else: # otherwise continue
            continue
            
    for col in col_preference: # for each column in the col_preference list,
        if type(line[col]) != str: # if the observation is not a string, continue
            continue
        else: # if the observation is a string,
            keep_text = get_ann_text(line[col], column_type = col) # apply the get_ann_text function to the observation using the column_type 
                                                                    # of the observation and store the result in a variable called keep_text
            keep_source = col # store the observation's column_type in a variable called keep_source
            return keep_text, keep_source
    
    return math.nan, math.nan # return NAs as NaN

In [None]:
# grab_annotation test using cv1_AM-654-B02

df[['annotation','annotation_source']] = df.apply(grab_annotation, axis=1, result_type='expand') # create two new columns that are the result of applying
                                                                                            # the grab_annotation function
df

## Function 3: assign_annot

This function is an alternative to function 2 (grab_annotation). Based on the dramv ranks, this function chooses the best data base annotation column to keep for further analysis.

In [None]:
# if rank = A, grab kegg_hit
# if rank = B, grab viral_hit
# if rank = C, grab don't grab pfam_hits
# if rank = D, grab pfam_hits
# if rank = E, don't grab kegg_hit, viral_hit, or pfam_hits

def assign_annot(line):
        
        annot_source = math.nan
        
        if line['rank'] == 'A' :
            annot_source = 'kegg_hit'
        
        elif line['rank'] == 'B' :
            annot_source = 'viral_hit'
        
        elif line['rank'] == 'C' and type(line['kegg_hit']) == str :
            annot_source = 'kegg_hit'
        
        elif line['rank'] == 'C' and not pd.isna(line['kegg_hit']) and type(line['viral_hit']) == str :
            annot_source = 'viral_hit'
        
        elif line['rank'] == 'C' and pd.isna(line['kegg_hit']) and type(line['viral_hit']) == str :
            annot_source = 'vogdb_hits'

        elif line['rank'] == 'D' :
            annot_source = 'pfam_hits'
        
        elif line['rank'] == 'E' and not pd.isna(line['vogdb_hits']) and type(line['vogdb_hits']) == str :
            annot_source = 'vogdb_hits'
                                            
        elif line['rank'] == 'E' and pd.isna(line['vogdb_hits']) and type(line['vogdb_hits']) == str :
            annot_source = np.na
            
        else:
            return math.nan, math.nan
        
        keep_annot = get_ann_text(line[annot_source], column_type = annot_source)
        return keep_annot, annot_source
        


In [None]:
df[['annotation','annotation_source']] = df.apply(assign_annot, axis=1, result_type='expand')
df

## 4: For loop for trimming dramv annotation output files

In [None]:
# for loop to trim to include only columns I want to keep, create missing columns with NAs

tsv_pattern = "/Users/melissaherring/Google Drive/My Drive/MH_project/dramv/*/annotations.tsv"  # Replace with your file pattern
tsv_file_paths = glob.glob(tsv_pattern)
columns_to_keep = ['Unnamed: 0', 'rank', 'kegg_hit', 'viral_hit', 'pfam_hits', 'vogdb_hits']
num_columns_list = []


for file_path in tsv_file_paths:
    df = pd.read_csv(file_path, delimiter='\t')
    if 'kegg_hit' not in df.columns:
        df.insert(loc=len(df.columns), column='kegg_hit', value=pd.NA)
    if 'viral_hit' not in df.columns:
        df.insert(loc=len(df.columns), column='viral_hit', value=pd.NA)
    if 'pfam_hits' not in df.columns:
        df.insert(loc=len(df.columns), column='pfam_hits', value=pd.NA)
    if 'vogdb_hits' not in df.columns:
        df.insert(loc=len(df.columns), column='vogdb_hits', value=pd.NA)
    
    df_name = df.iloc[0, 1]
    
    dir_path = '/Users/melissaherring/Google Drive/My Drive/MH_project/dramv_trim/'
    file_name = f"{df_name}.csv"
    full_path = dir_path + file_name
    df[columns_to_keep].to_csv(full_path, index = False)

In [None]:
# for loop for the number of columns in each annotations.tsv file

tsv_pattern = "/Users/melissaherring/Google Drive/My Drive/MH_project/dramv_trim/*.csv"
tsv_file_paths = glob.glob(tsv_pattern)

# Create a list to store the number of columns for each file
num_columns_list = []

# Loop through the list of file paths and count the columns in each TSV file
for file_path in tsv_file_paths:
    df = pd.read_csv(file_path)
    num_columns = df.shape[1]
    num_columns_list.append(num_columns)

# Create a DataFrame with file paths and the number of columns
result_df = pd.DataFrame({'File': tsv_file_paths, 'Number of Columns': num_columns_list})
#result_df.to_csv('num_cols.csv')
result_df

## 5: For loop to apply all functions on trimmed data

In [None]:
# for loop for applying functions

tbl['viral_ann_text'] = tbl['viral_hit'].apply(get_ann_text, args = ('viral_hit',))
    tbl['kegg_ann_text'] = tbl['kegg_hit'].apply(get_ann_text, args = ('kegg_hit',)) 
    tbl['pfam_ann_text'] = tbl['pfam_hits'].apply(get_ann_text, args = ('pfam_hits',)) 
    tbl['vogdb_ann_text'] = tbl['vogdb_hits'].apply(get_ann_text, args = ('vogdb_hits',))
    
    tbl[['annotation','annotation_source']] = tbl.apply(assign_annot, axis=1, result_type='expand')