#### Kraken2


In [None]:
import pysam
import os
from hops import hdfs
import utils
import sys
from pyspark import SparkContext
import subprocess
import stat
import gzip
import shutil

#### Load arguments

In [None]:
# args=utils.load_arguments([0,'hdfs:///Projects/TCGA_viruses/Jupyter/settings_DJ.yml'])
args=utils.load_arguments(sys.argv)
if args is not None:
    args=args['Kraken']
else :
    sys.exit(utils.NO_CONFIG_ERR)
    
sc = SparkContext.getOrCreate()


inputRoot=args['INPUT_ROOT']
outputRoot=args['OUTPUT_ROOT']

kraken_path=args['KRAKEN_PATH']
tool=os.path.basename(kraken_path)
kk_db_path=args['KRAKEN_DB_PATH']
is_save_all_outputs=args['SAVE_FULL_OUTPUT']
threads=args['THREADS']

#### Helper functions

In [None]:
# install kraken from hdfs source
def load_kraken(kraken_path):
    tool=os.path.basename(kraken_path)
    
    hdfs.copy_to_local(kraken_path)

    st = os.stat(tool+'/kraken2')
    os.chmod(tool+'/kraken2', st.st_mode | stat.S_IEXEC)
    
    st = os.stat(tool+'/classify')
    os.chmod(tool+'/classify', st.st_mode | stat.S_IEXEC)



def compress_file(file):
    compress_file=file+'.gz'
    with open(file, 'rb') as f_in:        
        with gzip.open(compress_file, 'wb',compresslevel=1) as f_out:
            shutil.copyfileobj(f_in, f_out)
            
    return compress_file

#### Map function


In [None]:


"""
Runds kraken on single file via subprcess.
First kraken is installed by copying kraken tool from hdfs.
Outputs are copied back to hdfs.
If an output file name is already present in output directory the processing
of file is skipped to avoid processing of same file in case of resubmit of failed run.
"""
def apply_kraken_single(file_path,kk_db_path): 
    
    file=os.path.split(file_path)[1]  
    sample=os.path.splitext(os.path.splitext(file)[0])[0]    
    report=sample+'_report.txt'
    
    if not hdfs.exists(os.path.join(outputRoot,'report',report)): # check if output already exists
    
        load_kraken(kraken_path) # install kraken

        kk_db=os.path.split(kk_db_path)[1]  
        if not os.path.exists(kk_db):
            hdfs.copy_to_local(kk_db_path)


        hdfs.copy_to_local(file_path)

        output=sample+'_out.txt'
        unclassified=sample+'_unclassified.txt'
        if is_save_all_outputs: # save unclassified and output files
            params={'--db':kk_db,'--threads': threads, '--report': report,'--report-minimizer-data':'','--report-zero-counts':'','--unclassified-out': unclassified, file: '','--output': output }
        else :
            params={'--db':kk_db,'--threads': threads, '--report': report,'--report-minimizer-data':'','--report-zero-counts':'','--unclassified-out': '/dev/null', file: '','--output': '/dev/null' }
        cmd=utils.build_command(tool+'/kraken2',params)
        print(cmd)
        subprocess.run(cmd.split(),stdout=subprocess.PIPE)

        if os.path.exists(report):
            hdfs.copy_to_hdfs(report,os.path.join(outputRoot,'report'),overwrite=True)
            os.remove(report)
            
            if is_save_all_outputs:   
                # compress
                c_output=compress_file(output)            
                c_unclassified=compress_file(unclassified)             
                # copy to hdfs
                hdfs.copy_to_hdfs(c_unclassified,os.path.join(outputRoot,'unclassified'),overwrite=True)
                hdfs.copy_to_hdfs(c_output,os.path.join(outputRoot,'output'),overwrite=True)                     
                # remove local files
                os.remove(output)
                os.remove(unclassified)
                os.remove(c_output)
                os.remove(c_unclassified)

        os.remove(file)
        return file
    else :
        print('skipping existing file: ', file)
        return None

#### Get all input file paths

In [None]:


inputFiles=utils.load_file_names(inputRoot)


#### Run in parallel

In [None]:
# parallelize
rdd=sc.parallelize(inputFiles)
# run
final=rdd.map( lambda x: apply_kraken_single(x,kk_db_path) ).collect()