In [1]:
sra_list = ["SRR3152141", 
            "SRR3152151", 
            "SRR3152142",
            "SRR3152143",
            "SRR3177945",
            "SRR3152147",
            "SRR3152152",
            "SRR3152148",
            "SRR3152150",
            "SRR3178075"]
reference = "crassphage.fna"

In [None]:

'''
Sample Pegasus workflow for searching the SRA database
'''

import argparse
import logging
import os
import pathlib
import shutil
import sys

from Pegasus.api import *

logging.basicConfig(level=logging.DEBUG)
BASE_DIR = str(pathlib.Path.cwd())
USERNAME = os.environ.get('USER')


def add_merge_jobs(wf, parents):
    '''
    an upside down triangle of merge jobs to merge a set of bam
    files into a final tarball.
    parents is a list of jobs, for which all outputs will be
    in the resulting tarball
    '''
    
    max_parents = 25
    final_job = False
    level = 1
    while len(parents) > 1:
        children = []
        if len(parents) <= max_parents:
            final_job = True
        chunks = [parents[i:i + max_parents] for i in range(0, len(parents), max_parents)]
        job_count = 0
        for chunk in chunks:
            job_count += 1
            j = Job('merge')
            wf.add_jobs(j)
            # outputs
            out_file = File('results-l{}-j{}.tar.gz'.format(level, job_count))
            if final_job:
                out_file = File('results.tar.gz')
            j.add_outputs(out_file, stage_out=final_job)
            j.add_args(out_file)
            # inputs and parent deps
            for parent in chunk:
                j.add_inputs(*parent.get_outputs())
                j.add_args(*parent.get_outputs())
            wf.add_dependency(j, parents=chunk)
            children.append(j)
        # next round
        level += 1
        parents = children


def generate_wf(sra_list, reference):
    '''
    Main function that parses arguments and generates the pegasus
    workflow
    '''
    
    wf = Workflow('sra-search')
    sc = SiteCatalog()
    tc = TransformationCatalog()
    rc = ReplicaCatalog()
    
    # --- Properties ----------------------------------------------------------

    props = Properties()
     # set the concurrency limit for the download jobs
    props['dagman.fasterq-dump.maxjobs'] = '20'
    # send some extra usage stats to the Pegasus developers
    props['pegasus.catalog.workflow.amqp.url'] = 'amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows'
    props.write()
    
    # --- Sites ---------------------------------------------------------------
    
    shared_scratch_dir = "{}/work".format(BASE_DIR)
    local_storage_dir = "{}/storage".format(BASE_DIR)
    local = Site("local")\
      .add_directories(
          Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
            .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
          Directory(Directory.LOCAL_STORAGE, local_storage_dir)
            .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL)))\
      .add_pegasus_profile(pegasus_lite_env_source="{}/tools/job-env-setup.sh".format(BASE_DIR))
    sc.add_sites(local)
    sc.write()

    # --- Transformations -----------------------------------------------------
    
    container = Container(
                   'sra-search',
                   Container.SINGULARITY,
                   'docker://pegasus/sra-search:latest'
                )
    tc.add_containers(container)

    bowtie2_build = Transformation(
                       'bowtie2-build',
                       site='incontainer',
                       container=container,
                       pfn='/opt/bowtie2-2.2.9/bowtie2-build',
                       is_stageable=False
                    )
    bowtie2_build.add_profiles(Namespace.CONDOR, key='request_memory', value='1 GB')
    #bowtie2_build.add_profiles(Namespace.CONDOR, key='+TargetAnnexName', value='"{}"'.format(USERNAME))
    tc.add_transformations(bowtie2_build)
    
    bowtie2 = Transformation(
                  'bowtie2',
                  site='local',
                  container=container,
                  pfn=BASE_DIR + '/tools/bowtie2_wrapper',
                  is_stageable=True
              )
    bowtie2.add_profiles(Namespace.CONDOR, key='request_memory', value='2 GB')
    #bowtie2.add_profiles(Namespace.CONDOR, key='+TargetAnnexName', value='"{}"'.format(USERNAME))
    tc.add_transformations(bowtie2)

    fasterq_dump = Transformation(
                      'fasterq-dump',
                       site='local',
                       container=container,
                       pfn=BASE_DIR + '/tools/fasterq_dump_wrapper',
                       is_stageable=True
                     )
    fasterq_dump.add_profiles(Namespace.CONDOR, key='request_memory', value='1 GB')
    #fasterq_dump.add_profiles(Namespace.CONDOR, key='+TargetAnnexName', value='"{}"'.format(USERNAME))
    # this one is used to limit the number of concurrent downloads
    fasterq_dump.add_profiles(Namespace.DAGMAN, key='category', value='fasterq-dump')
    tc.add_transformations(fasterq_dump)

    merge = Transformation(
                'merge',
                site='local',
                container=container,
                pfn=BASE_DIR + '/tools/merge',
                is_stageable=True
            )
    merge.add_condor_profile(request_memory='1 GB')
    tc.add_transformations(merge)


    # --- Workflow -----------------------------------------------------

    # keep track of bam files, so we can merge them into a single tarball at
    # the end
    to_merge = []

    # set up reference file and what files needs to be generated by the index job
    ref_main = File('reference.fna')
    rc.add_replica('local', 'reference.fna', os.path.abspath(reference))
    ref_files = []
    for filename in ['reference.1.bt2', 'reference.2.bt2', 'reference.3.bt2', 'reference.4.bt2',
                     'reference.rev.1.bt2', 'reference.rev.2.bt2']:
        ref_files.append(File(filename))

    # index the reference file
    index_job = Job('bowtie2-build')
    index_job.add_args('reference.fna', 'reference')
    index_job.add_inputs(ref_main)
    index_job.add_outputs(*ref_files, stage_out=False)
    wf.add_jobs(index_job)

    # create jobs for each SRA ID
    for sra_id in sra_list:
        if len(sra_id) < 5:
            continue

        # files for this id
        fastq_1 = File('{}_1.fastq'.format(sra_id))
        fastq_2 = File('{}_2.fastq'.format(sra_id))

        # download job
        j = Job('fasterq-dump')
        j.add_args('--split-files', sra_id)
        j.add_outputs(fastq_1, fastq_2, stage_out=False)
        wf.add_jobs(j)

        # bowtie2 job
        bam = File('{}.bam'.format(sra_id))
        bam_index = File('{}.bam.bai'.format(sra_id))
        j = Job('bowtie2')
        j.add_args(sra_id)
        j.add_inputs(*ref_files, fastq_1, fastq_2)
        j.add_outputs(bam, bam_index, stage_out=False)
        wf.add_jobs(j)
        
        # keep track of jobs and outputs for merging
        to_merge.append(j)
    
    add_merge_jobs(wf, to_merge)

    try:
        wf.add_transformation_catalog(tc)
        wf.add_replica_catalog(rc)
        wf.plan()
    except PegasusClientError as e:
        print(e.output)
    return wf


wf = generate_wf(sra_list, reference)

try:
    wf.plan(submit=True)\
        .wait()
except PegasusClientError as e:
    print(e)


INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000001, transformation=bowtie2-build)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000002, transformation=fasterq-dump)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000003, transformation=bowtie2)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000004, transformation=fasterq-dump)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000005, transformation=bowtie2)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000006, transformation=fasterq-dump)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000007, transformation=bowtie2)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000008, transformation=fasterq-dump)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000009, transformation=bowtie2)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000010, transformation=fasterq-dump)
INFO:Pegasus.api.workflow:sra-search added Job(_id=ID0000011, transformation=bowtie2)
INFO:Pegasus.api.workfl

[[1;32m####[0m---------------------]  15.0% ..Running ([1;34mUnready: 23[0m, [1;32mCompleted: 6[0m, [1;33mQueued: 0[0m, [1;36mRunning: 11[0m, [1;31mFailed: 0[0m)