# Variant Calling Pegasus Workflow

In this notebook, we will create Pegasus Workflow corresponding to the  [Automating a Variant Calling Workflow](https://datacarpentry.org/wrangling-genomics/05-automation.html) 
from Data Carpentry Lesson 
[Data Wrangling and Processing for Genomics](https://datacarpentry.org/wrangling-genomics/).

This workflow downloads and aligns SRA data to the E. coli  REL606 reference genome, and checks what differences exist in our reads versus the genome. The workflow also performs perform  variant calling to see how 
the population changed over time. 

One major change from other examples is that this workflow, uses CARC project storage to do the
data staging. The previous examples, uses the your directory space on CARC which is limited to 100GB.

## Container

All tools required to execute the jobs in the container are all included in
a single Docker container defined in `docker/Dockerfile` and available in the
[Docker Hub](https://hub.docker.com/repository/docker/pegasus/variant-calling) under
`pegasus/variant-calling`. The workflow is setup up to use that container
but execute it via Singularity as that maybe a more common container
runtime on HPC machines. The container runtime used can easily be
changed in the workflow definition.

The container comes with the following tools
* Burrows-Wheeler Aligner (BWA) 0.7.17
* SamTools 1.15.1
* Bcftools 1.15.1
* HTSlib   1.15.1
* SRA Tools 3.0.0

The number of concurrent downloads is limited with a DAGMan
category profile.

## CARC Access

The Center for Advanced Researcfh Computing provides a service intended to facilitate computational research.

If you dont have access to an existing CARC account. Please refer to the instructions on the 
[Accounts and Allocations Page In CARC Guide](https://www.carc.usc.edu/user-information/accounts)

## Workflow

The Pegasus workflow downloads SRA data from NCBI repository using
`fasterq-dump` in the SRA toolkit and aligns it against the reference genome.

Please note, that using more than one thread in `fasterq-dump` will result in your job hanging up due to the filesystem bug.

![Pegasus Variant Calling Workflow for 2 SRA reads](../images/variant-calling-pegasus-workflow.png)

The tools used for various jobs in the worklfow are listed in table below

| Job Label                 | Tool Used        |
| --------------------------|----------------- |
| bwa                       | bwa              |
| fasterq_dump              | fasterq_dump     |
| align_reads               | bwa              |
| sam_2_bam_converter       | samtools         |
| calculate_read_coverage   | bcftools         |
| detect_snv                | bcftools         |
| variant_calling           | vcfutils         |

The bwa invocation as part of the align_reads job is configured to use multiple cores. In this notebook, we set the core usage for that job to 6 and enable use of threads( set to 12 - an oversubscription) on the command line to the bwa job. 


## 0. Set Jupyter Environment

We set some environment variables and set PYTHONPATH for Pegasus libraries to be imported successfully.
This is temporary until the Jupyter Notebook setup is fixed

In [None]:
import sys
sys.path.append("/usr/lib64/python3.6/site-packages")

In [None]:
%env LANG=en_US.utf-8

## 1. Create the Variant Calling Workflow

By now, you have a good idea about the Pegasus Workflow API.
We now create the workflow for the Variant Calling based on the picture above

First step is to identify what SRA we want to pull down from the NCBI database

In [None]:
sra_list = ["SRR2584866",
            "SRR2589044"]
reference_genome="./ref_genome/ecoli_rel606.fasta"

In [None]:
import argparse
import logging
import os
import shutil
import sys

from pathlib import Path
from Pegasus.api import *

logging.basicConfig(level=logging.DEBUG)
BASE_DIR = str(Path(".").resolve())

# need to know where Pegasus is installed for notifications
PEGASUS_HOME = shutil.which('pegasus-version')
PEGASUS_HOME = os.path.dirname(os.path.dirname(PEGASUS_HOME))

wf = Workflow('variant-calling')
tc = TransformationCatalog()
rc = ReplicaCatalog()
sc = SiteCatalog()
    
# --- Properties ----------------------------------------------------------

# set the concurrency limit for the download jobs, and send some extra usage
# data to the Pegasus developers
props = Properties()
props['dagman.fasterq-dump.maxjobs'] = '10'
props['pegasus.catalog.workflow.amqp.url'] = 'amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows'
props['pegasus.data.configuration'] = 'nonsharedfs'
props["pegasus.mode"] = "tutorial"
props.write() 

# --- Event Hooks ---------------------------------------------------------

# get emails on all events at the workflow level
wf.add_shell_hook(EventType.ALL, '{}/share/pegasus/notification/email'.format(PEGASUS_HOME))

# --- Transformations -----------------------------------------------------

container = Container(
               'variant-calling',
               Container.SINGULARITY,
               'docker://pegasus/variant-calling:latest',
               mounts=["{}/SLURM/work:{}/SLURM/work:rw".format(BASE_DIR, BASE_DIR)]
            )
tc.add_containers(container)

fasterq_dump = Transformation(
    'fasterq-dump',
    site='local',
    container=container,
    pfn=BASE_DIR + '/tools/fasterq_dump_wrapper',
    is_stageable=True
)
fasterq_dump.add_profiles(Namespace.CONDOR, key='request_memory', value='6 GB')
# this one is used to limit the number of concurrent downloads
fasterq_dump.add_profiles(Namespace.DAGMAN, key='category', value='fasterq-dump')
tc.add_transformations(fasterq_dump)

bwa = Transformation(
                   'bwa',
                   site='incontainer',
                   container=container,
                   pfn='/opt/software/install/bwa/default/bwa',
                   is_stageable=False
                )
bwa.add_profiles(Namespace.CONDOR, key='request_memory', value='6 GB')
tc.add_transformations(bwa)

# we use the simple bash wrapper to convert to bam,
# sort and index the generated bam file
samtools = Transformation(
    'samtools',
    site='local',
    container=container,
    pfn=BASE_DIR + '/tools/samtools_wrapper',
    is_stageable=True
)
samtools.add_profiles(Namespace.CONDOR, key='request_memory', value='6 GB')
tc.add_transformations(samtools)

bcftools = Transformation(
    'bcftools',
    site='incontainer',
    container=container,
    pfn='/opt/software/install/bcftools/default/bin/bcftools',
    is_stageable=False
)
bcftools.add_profiles(Namespace.CONDOR, key='request_memory', value='6 GB')
tc.add_transformations(bcftools)

vcfutils = Transformation(
    'vcfutils',
    site='incontainer',
    container=container,
    pfn='/opt/software/install/bcftools/default/bin/vcfutils.pl',
    is_stageable=False
)
vcfutils.add_profiles(Namespace.CONDOR, key='request_memory', value='6 GB')
tc.add_transformations(vcfutils)

# --- Site Catalog ------------------------------------------------- 
# add a local site with an optional job env file to use for compute jobs
shared_scratch_dir = "{}/LOCAL/work".format(BASE_DIR)
local_storage_dir = "{}/LOCAL/storage".format(BASE_DIR)

# some variables for slurm cluster. you may wish to update
# them for your needs
slurm_partition="main"
slurm_account="hpcsuppt_613"

local = Site("local") \
    .add_directories(
    Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
        .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
    Directory(Directory.LOCAL_STORAGE, local_storage_dir)
        .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL)))

slurm_scratch_dir = "{}/SLURM/work".format(BASE_DIR)
slurm_storage_dir = "{}/SLURM/storage".format(BASE_DIR)

slurm = Site("slurm")\
    .add_directories(
    Directory(Directory.SHARED_SCRATCH, slurm_scratch_dir)
        .add_file_servers(FileServer("file://" + slurm_scratch_dir, Operation.ALL)),
    Directory(Directory.LOCAL_STORAGE, slurm_storage_dir)
        .add_file_servers(FileServer("file://" + slurm_storage_dir, Operation.ALL)))

slurm.add_pegasus_profile(
                        style="glite",
                        queue=slurm_partition,
                        project=slurm_account,
                        data_configuration="nonsharedfs",
                        auxillary_local="true",
                        nodes=1,
                        ppn=1,
                        runtime=6400,
                        clusters_num=2
                    )
slurm.add_condor_profile(grid_resource="batch slurm")

sc = SiteCatalog()
sc.add_sites(local)
sc.add_sites(slurm)
sc.write() # written to ./sites.yml
# --- Workflow -----------------------------------------------------
# set up the reference genome and what files need to be generated by the index job
ref_genome_lfn=os.path.basename(reference_genome)
ref_genome = File(ref_genome_lfn)
rc.add_replica('local', ref_genome_lfn, os.path.abspath(reference_genome))
index_files = []
for suffix in ['amb', 'ann', 'bwt', 'pac', 'sa']:
    index_files.append(File(ref_genome.lfn + "." + suffix))

# index the reference file
index_job = Job('bwa', node_label="ref_genome_index")
index_job.add_args('index', ref_genome.lfn)
index_job.add_inputs(ref_genome)
index_job.add_outputs(*index_files, stage_out=False)
wf.add_jobs(index_job)

# create jobs for each trimmed fastq trim.sub.fastq
for sra in sra_list:
    sra_id = sra.strip()
    if len(sra_id) < 5:
        continue


    """
    # files for this id
    # commented out as we download files from NCBI as part of fasterq-dump job
    fastq_1 = File('{}_1.trim.sub.fastq'.format(sra_id))
    fastq_2 = File('{}_2.trim.sub.fastq'.format(sra_id))
    rc.add_replica('local', fastq_1, os.path.join(os.path.abspath(args.fastq_dir), fastq_1.lfn))
    rc.add_replica('local', fastq_2, os.path.join(os.path.abspath(args.fastq_dir), fastq_2.lfn))
    """

    sam=File('{}.aligned.sam'.format(sra_id))
    bam=File('{}.aligned.bam'.format(sra_id))
    sorted_bam=File('{}.aligned.sorted.bam'.format(sra_id))

    raw_bcf=File('{}_raw.bcf'.format(sra_id))
    variants=File('{}_variants.bcf'.format(sra_id))
    final_variants=File('{}_final_variants.bcf'.format(sra_id))

    """
    bwa mem $genome $fq1 $fq2 > $sam
    samtools view -S -b $sam > $bam
    samtools sort -o $sorted_bam $bam 
    samtools index $sorted_bam
    bcftools mpileup -O b -o $raw_bcf -f $genome $sorted_bam
    bcftools call --ploidy 1 -m -v -o $variants $raw_bcf 
    vcfutils.pl varFilter $variants > $final_variants
    """

    # files for this id
    fastq_1 = File('{}_1.fastq'.format(sra_id))
    fastq_2 = File('{}_2.fastq'.format(sra_id))

    # download job
    j = Job('fasterq-dump', node_label="fasterq_dump")
    j.add_pegasus_profile(cores=6)
    j.add_args('--split-files', sra_id)
    j.add_args('-e', '6')
    j.add_outputs(fastq_1, fastq_2, stage_out=False)
    wf.add_jobs(j)

    # align reads tp reference genome job
    j = Job('bwa', node_label="align_reads")
    j.add_pegasus_profile(cores=10)
    j.add_args('mem', "-t 10", ref_genome, fastq_1, fastq_2)
    j.add_inputs(*index_files, ref_genome, fastq_1, fastq_2)
    j.set_stdout(sam, stage_out=False)
    wf.add_jobs(j)

    # samtools_wrapper for doing alignment to genome
    j = Job('samtools', node_label="sam_2_bam_converter")
    j.add_args(sra_id)
    j.add_inputs(sam)
    j.add_outputs(bam, sorted_bam, stage_out=False)
    wf.add_jobs(j)

    # Variant calling
    # bcftools for calculating the read coverage of positions in the genome
    j = Job('bcftools', node_label="calculate_read_coverage")
    j.add_pegasus_profile(cores=10)
    j.add_args('mpileup --threads 10 -O b -o', raw_bcf, '-f', ref_genome, sorted_bam)
    j.add_inputs(ref_genome, sorted_bam)
    j.add_outputs(raw_bcf, stage_out=False)
    wf.add_jobs(j)

    # bcftools for Detect the single nucleotide variants (SNVs)
    j = Job('bcftools', node_label="detect_snv")
    j.add_pegasus_profile(cores=10)
    j.add_args('call --threads 10 --ploidy 1 -m -v -o', variants, raw_bcf)
    j.add_inputs(raw_bcf)
    j.add_outputs(variants, stage_out=False)
    wf.add_jobs(j)

    # vcfutils Filter and report the SNV variants in variant calling format (VCF)
    j = Job('vcfutils', node_label="variant_calling")
    j.add_args('varFilter', variants)
    j.add_inputs(variants)
    j.set_stdout(final_variants, stage_out=True)
    wf.add_jobs(j)

wf.add_transformation_catalog(tc)
wf.add_site_catalog(sc)
wf.add_replica_catalog(rc)

## 3. Visualizing the Workflow

Once you have defined your abstract workflow, you can use `pegasus-graphviz` to visualize it. `Workflow.graph()` will invoke `pegasus-graphviz` internally and render your workflow using one of the available formats such as `png`. **Note that Workflow.write() must be invoked before calling Workflow.graph().**

In [None]:
try:
    wf.write()
    wf.graph(include_files=True, label="xform-id", output="graph.png")
except PegasusClientError as e:
    print(e)

In [None]:
# view rendered workflow
from IPython.display import Image
Image(filename='graph.png')

## 4. Plan and Submit the Workflow

We will now plan and submit the workflow for execution. You will notice below that we have added a new option staging_sites to plan the workflow. This option tells Pegasus to use **slurm** site for data staging when running jobs on site **slurm**.

In [None]:
try:
    wf.plan(sites=["slurm"], verbose=1,submit=True)\
        .wait()
except PegasusClientError as e:
    print(e)

Note the line in the output that starts with pegasus-status, contains the command you can use to monitor the status of the workflow. We will cover this command line tool in the next couple of notbooks. The path it contains is the path to the submit directory where all of the files required to submit and monitor the workflow are stored. For now we will just continue to use the Python `Workflow` object.

## 5. Statistics

Depending on if the workflow finished successfully or not, you have options on what to do next. If the workflow failed you can use `wf.analyze()` do get help finding out what went wrong. If the workflow finished successfully, we can pull out some statistcs from the provenance database:

In [None]:
try:
    wf.statistics()
except PegasusClientError as e:
    print(e)

### 6.  What's next

Next Notebook is `06-Summary`, that summarizes what we have learnt so far and how to request further support.