# How do I _parallelize_ and _split_ a workflow?

[Use Case] Collect paired RNA expressions, differentiate normal and tumor

This example will process RNA-seq expression files (bams) and use the [Cufflinks](http://cole-trapnell-lab.github.io/cufflinks/cuffcompare/) software suite to calculate _differential expression_. Conceptually the API will scan all available files, matches _tumor_ and _normal_ samples, adds the reference files, and then start two rounds of _tasks_.

Tasks are broken<sup>1</sup> into a *first\_stage* (_N_ tasks accepting a pair of inputs {Normal,Tumor}) and single *second\_stage* (accepting _N_ x 2 abundances files). After creating the _N_ front-end tasks, the app **waits** for completion, pinging the CGC every 10 minutes. Once all front-end tasks are completed, the outputs are collected and used to create a single back-end task. Results are saved on the CGC
 
This example requires a mix of skills. 

 - Case Explorer Use 
 - Data Browser Use
 - Workflow Modification
 
<sup>1</sup> Note: dynamic batching and splitting is already available on the back-end; we don't **need** to do this manually via API. However, we intend to demonstrate the flexibility you can achieve for your _own ends_.
 
### Prerequisites
 1. You need your _authentication token_ and the API needs to know about it. See <a href="set_AUTH_TOKEN.ipynb">**set_AUTH_TOKEN.ipynb**</a> for details.
 2. You need _TCGA **Controlled** Data_ access
 
  
### WARNING
This will burn through **considerable** processing credits, depending on how many files you pull from _Data Browser_ (about \$2-4 per sample pair plus \$3-5 for the second stage). You can create _DRAFT_ tasks to just see how it works, swap the commenting in **Build and run tasks** to only run: 
```python
myTask = API(method='POST', data=new_task, path='tasks/')        # task created in DRAFT state
```

## Steps on the GUI
  
We will always be working with the [Cancer Genomics Cloud](https://cgc.sbgenomics.com), but will mix _GUI_ and _API_ tasks here. GUI tasks will be descriptive (markdown cells); API tasks will be in an executable cell but preceded with an explanation in a markdown cell. 

### 1) Create a project
Create a project in the GUI. Name it 'Divide and Conquer'. Mark that it will contain _TCGA Controlled Data_

<img src="images/thyroid_1.png"> 

### 2) Screen cases using _Case Explorer_
 - In _Case Explorer_ select Thyroid Cancer (THCA) and click on the BRAF gene
 - In the bottom left, unclick "No" so only cases with a BRAF mutation are included
 - drag a box around all data points
 - click the green _Continue To Data Browser_ button at bottom left
<img src="images/thyroid_2.png" height="780" width="542"> 

### 3) Use _Data Browser_ to get expressions
 - In _Data Browser_ add some nodes to restrict the files.
     - hasSample
         - hasSampleType = {Primary Tumor}
         - hasFile
             - hasExperimentalStrategy = RNA-seq
             - hasDataFormat = BAM
      - hasSample
         - hasSampleType = {Solid Tissue Normal}
         - hasFile
             - hasExperimentalStrategy = RNA-seq
             - hasDataFormat = BAM
<img src="images/thyroid_3.png" height="780" width="497"> 

Export **each branch** of the tree to the project 'Divide and Conquer', each one should add 29 files to the project.

### 3) Add tools to your project
From the _Apps_ tab of the project, click on **Add app**.
<img src="images/thyroid_4.png"> 

Click **Browse Public Apps**, then search for _RNA-Seq Differential Expression_
<img src="images/thyroid_5.png"> 

Click **Copy**, select your project, and rename the app to **rna-seq-diff-expression-first** and add it to your project
<img src="images/thyroid_6.png">

Click **Copy** _again_, select your project, and rename the app to **rna-seq-diff-expression-second** and add it to your project
<img src="images/thyroid_7.png"> 

### 4) Sanity Check
If all has gone well, the _Apps_ panel in the 'Divide and Conquer' project should look like this:
<img src="images/thyroid_8.png" height="780" width="162">
Notice they have the same name, we are going to fix that!

### 5) Hack apart the tools
Open the top app, it should be the _second_ version (you can see this in the bottom right). Go to **Edit** to modify this tool. Below you see the original **RNA-seq Differential Expression** workflow. Notice that there is a _Cuffquant_ tool that deals with **single pairs of inputs** and a _Cuffdiff_ that deals with **all** of the abundances. We are going to break this apart.
<img src="images/thyroid_9.png" height="780" width="342"> 

In the Tool Editor window, click on ADDITIONAL INFO and change the **Label** to _second RNA-seq cuffdiff_. Click Save to close that window. Then delete nodes until your screen looks like this:
<img src="images/thyroid_10.png" height="780" width="349"> 

Click and drag from the second input to Cuffdiff to the left screen edge. This will create an _input node_ named 'sample_files'. 
<img src="images/thyroid_11.png" height="780" width="281"> 

Now we need to _unlock_ some of the configuration parameters so we can modify them at run-time (i.e. they will be task **inputs**). Click on the _Cuffdiff_ node and in the right panel (_PARAMS_ tab) scroll-down to _min\_reps\_for\_js\_test_ and unlock it (click the lock icon so it changes to "open" and is colored blue) as shown here:
<img src="images/thyroid_11a.png" height="780" width="361"> 

Repeat the same unlocking (all on the _Cuffdiff_ node) for
* FDR
* library_type
* dispersion_method
* library_norm_method

Finally, click Save to update the workflow.

Now we need to make the other half. Go to the Apps panel. Click on the workflow still named **RNA-seq Differential Expression** and then click **Edit**. In the Tool Editor window, click on ADDITIONAL INFO and change the **Label** to _first RNA-seq cuffquant_. Click Save to close that window. Then delete nodes until your screen looks like this:
<img src="images/thyroid_12.png"> 

Click and drag from the output of Cuffquant to the right screen edge. This will create an _output node_ named 'abundances'. 
<img src="images/thyroid_13.png" height="780" width="284"> 

A few final issues, the Group input names are wrong here and need to be changed. Click on the top _SBG\_Group\_Input_, then on the far right click the lock icon so this field is _Exposed_. 
<img src="images/thyroid_14.png" height="780" width="263"> 

We will be able to change the name at run time. Do the same for the bottom _SBG\_Group\_Input_. 
<img src="images/thyroid_15.png"> 

Click on the _Cuffquant_ node and in the right panel scroll down. Unlock the _library\_type_ input
<img src="images/thyroid_16.png" height="780" width="395"> 

Let's save some money (at the _expense_ of time), by changing the recommended instance for the workflow. First click the gear icon on the top right, and click _Settings_

<img src="images/thyroid_17.png"> 

Then change the **sbg:AWSInstanceYype** to **c3.2xlarge**
<img src="images/thyroid_18.png" height="780" width="411"> 

Finally, click Save to update the workflow.

### 6) Sanity Check
If all has gone well, your Apps panel should have two workflows:

 - first RNA-seq cuffquant
 - second RNA-seq cuffdiff
     
Now lets get into some iPython!

## Imports and Definitions
We will use a Python class (API) as a wrapper for API calls. All classes and methods defined in <a href="defs/apimethods.py" target="_blank">_defs/apimethods.py_</a>. 

In [None]:
from defs.apimethods import *
import numpy as np

# extra method for organizing the files by metadata
def my_zip_sort(indexList, otherList):
    srt = np.argsort(indexList)
    return [otherList[i] for i in srt]

## User Input
We need to set a few things here, depending on which areas we want to look at. Additionally, this would be the space to set project and tool names.

In [None]:
# Set project and app names:
project_name = 'Divide and Conquer'
app_name_first = 'first RNA-seq cuffquant' 
app_name_second = 'second RNA-seq cuffdiff'                    

# File extensions we will be working with
input_ext = 'bam'
annote_ext = 'gtf'
ref_ext = 'fasta'
abund_ext = 'cxb'

## Find your project
This code searches through all projects in your account and then gets the details of the _project\_name_ to make sure you've properly set the things in the GUI above:

 - All files
 - All apps
     - details of the app matching app_name_first 
     - details of the app matching app_name_second
     
#### PROTIPS
* The recipes involved in this cell are [here](../../Recipes/CGC/projects_detailOne.ipynb), [here](../../Recipes/CGC/files_listAll.ipynb), and [here](../../Recipes/CGC/files_detailOne.ipynb)

In [None]:
# LIST all projects
existing_projects = API(path='projects')  

# DETAIL my_project
p_index = existing_projects.name.index(project_name)
my_project = API(path=('projects/'+ existing_projects.id[p_index]))  

# LIST all files in project
my_files = API(path='files', query={'limit':100, 'project': my_project.id}) 

# LIST all apps in project and make sure they were created
my_apps = API(path='apps', query={'limit':100, 'project': my_project.id}) 
if len(my_apps.id) > 1:
    for ii, a_name in enumerate(my_apps.name):
        if app_name_first == a_name:
            my_inputs_first = API(path=('apps/' + my_apps.id[ii]))
        elif app_name_second == a_name:
            my_inputs_second = API(path=('apps/' + my_apps.id[ii]))
    del ii, a_name
else:
    print "Apps were not created in selected project, cannot continue"
    raise KeyboardInterrupt

## Add reference files
We need our **reference** and **annotation** files, which we will take from the Public Reference files

#### PROTIPS
* The recipe involved in this cell are [here](../../Recipes/CGC/files_copyFromPublicReference.ipynb)

In [None]:
# [USER INPUT] Set project and file names:
p_name = 'admin/sbg-public-data'
f_list = ['ucsc.hg19.fasta', 'human_hg19_genes_2014.gtf']                

# LIST all files in the source and target project
my_files_source = API(path='files', \
                      query={'project':p_name, 'limit':100})
my_files_target = API(path='files', \
                      query={'project': my_project.id})

for f_name in f_list:
    f_index = my_files_source.name.index(f_name)
    if f_name not in my_files_target.name:
        print('File (%s) does not exist in Project (%s); copying now' % \
              (f_name, my_project.id))

        # COPY the selected file from source to target project
        API(path=('files/' + my_files_source.id[f_index] + '/actions/copy'), \
            method='POST', \
            data={'project': my_project.id,\
                  'name': f_name}) 

        # re-list files in target project to verify the copy worked
        my_files_target = API(path='files', \
                              query={'project': my_project.id})

        if f_name in my_files_target.name:
            print('Sucessfully copied one file!')
        else:
            print('Something went wrong...')
            
# We are done copying files, let's clean up a little
del my_files_source, my_files_target
my_files = API(path='files', query={'project': my_project.id})

## Organize files into a **cohort**
The _Data Browser_ is excellent for finding files. However, there are challenges to working with them smoothly, especially as the number of files grows. Specifically

 - File naming ambiguity between patients and centers (related to the change from **TCGA Barcode** to **UUID**)
     - This is not a critical issue here, but as an example we save mulitple metadata before starting tasks
         - CASE_UUID
         - disease_type
         - size (of file)
 - Uncertainty whether samples are matched (e.g. does the index file (BAI) exist for all input files (BAM))
     - clean any unmatched UUIDs
     - sort CASE_IDs

In [None]:
gtf_ind = None
fasta_ind = None
case_ids = {'uuid': [None], 'index': [None], 'type': [None]}

# Collect input file metadata. Saving the case_UUID and SampleType 
for ii, f_name in enumerate(my_files.name):
    if f_name[-len(input_ext):] == input_ext:
        single_file = API(path=('files/' + my_files.id[ii]))
        case_ids['uuid'].append(single_file.metadata['case_uuid'])
        case_ids['index'].append(ii)
        if single_file.metadata['sample_type'] == 'Solid Tissue Normal':
            case_ids['type'].append('Normal')
        else:
            case_ids['type'].append('Tumor')
    elif f_name[-len(annote_ext):] == annote_ext:
        gtf_ind = ii
    elif f_name[-len(ref_ext):] == ref_ext:
        fasta_ind  = ii
        
case_ids['uuid'].pop(0)
case_ids['index'].pop(0)
case_ids['type'].pop(0)

# sort CASE_IDs
case_ids['type'] = my_zip_sort(case_ids['uuid'], case_ids['type'])
case_ids['index'] = my_zip_sort(case_ids['uuid'], case_ids['index'])
case_ids['uuid'] = my_zip_sort(case_ids['uuid'], case_ids['uuid'])

# clean any unmatched UUIDs
to_delete = [None]
ii = 0
while ii<len(case_ids['uuid']):
    if ii==len(case_ids['uuid'])-1:
        if case_ids['uuid'][ii] != case_ids['uuid'][ii-1]:
            to_delete.append(ii)
            ii += 1
    else:
        if case_ids['uuid'][ii] == case_ids['uuid'][ii+1]:
            ii += 2
        else:
            to_delete.append(ii)
            ii += 1
to_delete.pop(0)

for ii in np.sort(to_delete)[::-1]:
    case_ids['index'].pop(ii)
    case_ids['type'].pop(ii)
    case_ids['uuid'].pop(ii)
    
del to_delete, ii

print('We have %i matched Tumor-Normal pairs in this project' % (len(case_ids['index'])/2))

## Build and run _first-stage_ tasks
Here we use the API to create a _new\_task_ dictionary that we will use for each pair of files. Once it connects to the CGC, we will have all of the front-end tasks drafted and starting within seconds.

#### PROTIPS
* Detailed documentation of this particular REST architectural style request is available [here](http://docs.cancergenomicscloud.org/docs/create-a-new-task)

In [None]:
# Format the JSON to pass values to the FRONT-END workflow (frontend RNA-seq cuffquant)
my_task_list = [None]
for ii in range(0,len(case_ids['uuid'])/2):
    new_task = {'description': 'Quantify RNA expression using cuffquant (pair-wise). created with thyroid.ipynb',
        'name': ('first_stage_%i' % ii),
        'app': my_inputs_first.id,                                         
        'project': my_project.id,
        'inputs': {
            'Reference':{                                             
                'class':'File',
                'path': my_files.id[fasta_ind],
                'name': my_files.name[fasta_ind]
            },
            'Annotations': {                                          
                'class': 'File',
                'path': my_files.id[gtf_ind],
                'name': my_files.name[gtf_ind]
            },
            # Define App Settings (these are the things we "unlocked")
            'group_name': 'Normal',
            'group_name_1': 'Tumor',
            'library_type': u'ff-unstranded'
        }
    }
    for jj in range(0,2):
        file_desc = {'class': 'File',
                    'path': my_files.id[case_ids['index'][(ii-1)*2 + jj]],
                    'name': my_files.name[case_ids['index'][(ii-1)*2 + jj]]
        }
        if case_ids['type'][(ii-1)*2 + jj] == 'Tumor':
            new_task['inputs']['Group_ERR315335'] = [file_desc]         # this is an ARRAY input, so must be a list
        else:
            new_task['inputs']['Group_ERR315421'] = [file_desc]
        
    my_task = API(method='POST', data=new_task, path='tasks/', query = {'action': 'run'})
    my_task_list.append(my_task.id)
my_task_list.pop(0)
 
print("""
%i tasks have been created. Enjoy a break, treat yourself to a muffin, 
and come back to us once you've gotten an email that tasks are done.
(alternatively, use the task monitoring cells below)""" % (len(my_task_list)))

## Check task completion
These tasks may take a long time to complete, here are two ways to check in on them:
* Wait for email confirmation <sup>1</sup>
* Ping the task to see it's _status_. Here we use a 10 min interval, adjust it appropriately for longer or shorter workflows

<sup>1</sup> Emails will arrive regardless of whether the task was started by GUI or API

#### PROTIPS
* The closest recipe for _monitoring tasks_ is [here](../../Recipes/CGC/tasks_monitorAndGetResults.ipynb)
* Detailed documentation of this particular REST architectural style request is available [here](http://docs.cancergenomicscloud.org/docs/perform-an-action-on-a-specific-task)

In [None]:
# [USER INPUT] Set loop time (seconds):
loop_time = 600

for t_id in my_task_list:
    # Check on one task at a time, 
    #  if ANY running, we are not done (no sense to query others)
    flag = {'taskRunning': True}
    while flag['taskRunning']:
        task = api_call(('tasks/' + t_id))
        if task['status'] == 'COMPLETED':
            flag['taskRunning'] = False
            print('Task has completed, life is beautiful')
        elif (task['status'] == 'FAILED') or (task['status'] == 'ABORTED'):
            print('Task (%s) failed, check it out' \
                  % (t_id))
            flag['taskRunning'] = False
        else:
            sleep(loop_time) 

## Build and run _second-stage_ task
This is similar to our approach with the front-end task. We first get all the files in the project (there will be more since out front-end tasks have completed<sup>2</sup>). Then we seach for the file extenstion (would be more elegant here to query the outputs of the tasks).
     
<sup>2</sup> Note this means we _ignore_ any failed tasked. You can choose your own adventure here, e.g. re-running  (or QC-ing) failed tasks, all-or-none processing, etc.

#### PROTIPS
* Detailed documentation of this particular REST architectural style request is available [here](http://docs.cancergenomicscloud.org/docs/create-a-new-task)

In [None]:
# recheck the files in the project.
my_files = API(path='files', query={'project': my_project.id})

gtf_ind = None
fasta_ind = None
abund_ind = [None]

for ii,f_name in enumerate(my_files.name):
    if f_name[-len(abund_ext):] == abund_ext:                     # candidate input file
        abund_ind.append(ii)
    elif f_name[-len(annote_ext):] == annote_ext:
        gtf_ind = ii
    elif f_name[-len(ref_ext):] == ref_ext:
        fasta_ind = ii
abund_ind.pop(0)

# 7) Format the JSON to pass values to the BACK-END workflow [for backend RNA-seq cuffquant]
new_task = {'description': 'Quantify RNA expression using cuffdiff. created with thyroid.py',
'name': 'second_stage',
'app': my_inputs_second.id,                                         # little hard-coding
'project': my_project.id,
'inputs': {
    'Reference':{                                               # (.fasta file)
        'class':'File',
        'path': my_files.id[fasta_ind],
        'name': my_files.name[fasta_ind]
    },
    'Annotations': {                                            # (.gtf file)
        'class': 'File',
        'path': my_files.id[gtf_ind],
        'name': my_files.name[gtf_ind]
    },
    'sample_files': [
        {                                                       # .processingLists will be added later
        'class': 'File',
        'path': 'not_real',
        'name': 'not_real'
        }
    ],
    # Define App Settings (these are the things we "unlocked")
    'FDR': 0.05,
    'library_type': u'ff-unstranded',
    'min_reps_for_js_test': 3,
    'library_norm_method': u'classic-fpkm',
    'dispersion_method': u'per-condition'
    }
}

for ii in abund_ind:
    file_desc = {'class': 'File',
                'path': my_files.id[ii],
                'name': my_files.name[ii]
                }
    new_task['inputs']['sample_files'].append(file_desc)
new_task['inputs']['sample_files'].pop(0)

my_task = api_call(method='POST', data=new_task, path='tasks/', query={'action': 'run'})        # task created and run

print("You've got tasks, yaaaaayy!")