In [None]:
%matplotlib inline
import pandas as pd
import dask

#Imports for our little html parser
import requests
from bs4 import BeautifulSoup
import bioservices

# Structure
This notebook will go over three parts of dask. Reflected in each part is my experience with dask.
- **dask basics**: dask.delayed, dask.compute, dask schedulers
- **dask.dataframe**: Caveats & work-arounds, meta keyword
- **dask concurrent-futures interface**: Basic useage 

## Dask basics
Dask is a library that makes paralellizing computation in python easy.
At its core dask really just has three important parts that you need to understand when working with it. 

### dask.delayed
Dask splits parallelization into two components: 
- constructing a task graph
- scheduling the individual tasks of the graph

`dask.delayed` enables what is called **lazy computation**. Lazy computation means that the time at which you define how your computation should take place, and the time the computation is performed are not the same. This means you can build what is called a **task graph** that defines which functions should be executed in which order. This is exactly what `dask.delayed` is meant for. 

It "remembers" a function and the specified inputs. `dask.delayed` can be used to wrap functions and classes to make their execution "lazy".

In [None]:
from dask import delayed

In [None]:
@delayed
def add(x, y):
    return x + y

In [None]:
firstsum = add(add(1, 2), 3) # = 1 + 2 + 3

In [None]:
firstsum

Let's visualize the task graph we just created:

In [None]:
firstsum.visualize() #This requires the system graphviz package and the python-graphviz package

In the task graph above all the circles represent the functions and the rectangles represent the outputs of those functions.

Now a `Delayed` object by itself is not very useful, since it is just a task graph, no actual computation has taken place yet.

To get the actual result, simply call the compute method.

In [None]:
firstsum.compute()

The task graph above was linear, so there is not much to parallelize. What if we have a bunch of task graphs like the one above though, how can we parallelize the computation of all of them?

The answer is `dask.compute` which any number of python collections (e.g. a list) of Delayed objects, or individual Delayed objects and avoids multiple computation of the same sub-graph.

In [None]:
from time import sleep
@delayed
def slow_ass_function(*args, **kwargs):
    sleep(1) #sleeps for 1 second to simulate expensive computation

In [None]:
slow_ass_function().compute()

In [None]:
dask.visualize(*[slow_ass_function(slow_ass_function()) for _ in range(2)])

In [None]:
%%timeit
dask.compute([slow_ass_function(slow_ass_function()) for _ in range(2)]);

In [None]:
%%timeit
[slow_ass_function(slow_ass_function()).compute() for _ in range(2)];

Using `dask.compute` instead of calling the compute method of a bunch of `Delayed` objects can save time when the task graphs are partially redundant. Note that `dask.compute` can only optimize task graphs from the bottom up, meaning that if the redundant part of the graph is not at the leaf, it cannot optimize it.

In [None]:
@delayed
def one_plus_two(*args):
    return 1 + 2

In [None]:
dask.visualize(add(one_plus_two(add(1,2)), one_plus_two(3)), add(add(1, one_plus_two(3)), one_plus_two(3)))

dask.compute cannot optimize the graph on the right even though one_plus_two is called twice here and its output does not depend on its input in this case. It can however optimize the graph on the left.

This is because `dask.optimize`, the function that does the task graph optimization under the hood, basically starts from the leafs (nodes that have only one connection) of the task graph and takes in nodes one-by-one until none of the subgraphs it finds are the same anymore.

###  dask schedulers
Constructing a dask task graph is nice and dandy but somebody has got to do the work. This is where the other core aspect of dask, the scheduler becomes important. A **scheduler** decides which task is executed where and at what time. 

Parallel computing is also possible in python without the use of dask, by using the multiprocessing module from the standard library. What really sets dask apart from multiprocessing are its schedulers. With multiprocessing you have to decide which task is executed where and at what time, dask lifts that burden from you. 

There is however still some thought required when using dask, choosing the right scheduler for the execution of a computation graph can make a big difference in performance.

In a python process only one thread can execute python code a time, however all threads within each process have access to all the data inside that process.

#### threaded scheduler (`dask.threaded.get`)
The threaded scheduler is good when sharing data between different tasks is important and the computations that are happening are not actually that expensive or achieve parallelism by themselves (numpy, pandas, sklearn, numba).

#### multiprocessing scheduler (`dask.multiprocessing.get`)
The multiprocessing scheduler is good when each task can be run by itself and does not have to communicate large intermediate results like big matrices a lot. This scheduler is really good when you have a huge number of tasks each of which only communicate a small number of 

#### synchronous scheduler  (`dask.get`)
The synchronous scheduler as the name suggests, runs tasks one after the other, so achieves no parallelism at all. This makes debugging easier however, which is its intended use.

#### distributed scheduler  (`distributed.Client.get`)
First and foremost, the distributed scheduler allows for computation spread across multiple machines.

On a single machine it is often a good alternative to the multiprocessing scheduler because it can make some smart decisions when dealing with task graphs. When your task graph has a huge number of small tasks, this scheduler is a poor choice however because each task has a significant scheduling overhead of about 0.2 µs, in which the multiprocessing scheduler is better suited.

Lastly the distributed scheduler also allows for asynchronous computing, more about that later in this notebook.

more information can be found [here](http://dask.pydata.org/en/latest/scheduler-choice.html)

### dask.dataframe
Another strength of dask in scientific computing is that it comes ready with parallel implementations of pandas.DataFrame (`dask.dataframe`) and numpy arrays (`dask.array`).
In the following I will go over some caveats of the use.

In [None]:
import dask.dataframe as dd

### Caveat #1: No assignment
`dask.dataframe` does not support assignment to columns or rows. Once a dataframe is created, its columns and rows cannot be replaced because not the entire dataframe is in memory, hence every assignment would have to be written to disk.

Work-around: Use map_partitions

--> this applies a function to each partition of the dataframe. Each partition is a pandas DataFrame.

--> We just have the function we apply to each partition simply return a new dataframe with our desired proportions.

--> We have to supply the **meta** keyword to tell dask the output format. Dask expects either a pandas Series or a pandas DataFrame for each partition.

--> Finally we probably want to write to file(s). 

Here we just take the simple task of correcting the names in the `chr` column of a gtf (General Transfer Format, a genome annotation file) file.

In [None]:
#Reading in the file; notice that we supply dtypes for some columns.
#This is because the chr column has some numbers and then later some strings, so dask gets confused because some
#rows have an int and some have a str.
gtf = dd.read_table('TB_human_sorted.gtf', dtype={'chr':'str', 'info':'str'}, 
                    names=['chr', 'origin','type', 'start', 'stop', 'noidea1', 'strand', 'noidea2', 'info'], 
                    comment='#', header=None)

In [None]:
gtf.chr.unique().compute() # to get much of anything with dask.dataframe, we usually have to call compute. 
                           #Exceptions are head and any writing to file method.

In [None]:
gtf['chr'] = 'chr' + gtf.chr.compute() #This will throw an error because we cannot assign in dask the way we would in pandas

Let's write a function that returns the original DataFrame but exchanges corrects the values in the chr column.

In [None]:
def correct_chr(df):
    return pd.merge(
        pd.DataFrame(
            df['chr'].apply(lambda cell: gtf_sam_dict[cell])
        ), 
        df.iloc[:,1:], 
        left_index=True, 
        right_index=True)

In [None]:
partition1 = gtf.get_partition(0).compute()

In [None]:
import json
with open('./gtf_sam_dict.json', 'r') as fp:
    gtf_sam_dict = json.load(fp) #this

In [None]:
corrected = gtf.map_partitions(correct_chr, 
                   meta=partition1.head())

In [None]:
corrected.chr.unique().compute() #yay, our computations were all finished!

### Caveat #2: Cannot write to one file
`dask.dataframe` does not allow writing into a single file, it does however support writing to multiple files.
Work-around: if you want to write to a single file, in bash you can use cat.

In [None]:
import csv

In [None]:
#this will throw an error
corrected.to_csv('TB_human_sorted_corrected.gtf', sep='\t', index=False, header=None, 
                 quoting=csv.QUOTE_NONE #This line ensures that Pandas does not end up 
                                        #putting quotation marks around any line with white spaces in it.
                )

In [None]:
# this would pass
#corrected.to_csv('TB_human_sorted_corrected*.gtf', sep='\t', index=False, header=None, quoting=csv.QUOTE_NONE #This line ensures that Pandas does not end up 
                                        #putting quotation marks around any line with white spaces in it.)

## Concurrent Futures with dask
In python concurrent futures allow asynchronous execution of code, meaning that not only does the code run in parallel, subsequent python code can still be executed.

In [None]:
from dask.distributed import Client, progress
c = Client()
c

In [None]:
prots = pd.read_csv('./UniProtIDs.csv')

In [None]:
def get_gene_from_uniprot(url, sleepy=False):
    ''' 
    Takes a uniprot URL and gets the gene name field.
    '''
    if sleepy:
        sleep(25)
    if url:
        try:
            return str(
                next(
                    next(
                        BeautifulSoup(
                            requests.get(url).content, 'html.parser')\
                        .find('div',{'id':'content-gene'}).children
                    ).children)
            )
        except:
            pass
    else:
        return ''

In [None]:
future = c.submit(get_gene_from_uniprot, prots.UniProtID.iloc[0], sleepy=True)

In [None]:
future

In [None]:
#future.result() will force the computation for future to complete and wait for its result

But computing just one future is no fun. The real fun begins when you can just have a ton of futures compute.

The **progress** function from `dask.distributed` can show you real-time progress of your work, and by clicking the dashboard link of your `dask.distributed.Client` you can monitor the work being done on your cores in real time.

In [None]:
genes = [c.submit(get_gene_from_uniprot, url) for url in prots.UniProtID]
progress(genes)

Notice how you can still execute python code in this jupyter notebook even though your cores are hard at work on your concurrent futures.

In [None]:
import this

In [None]:
len(genes)

In [None]:
genes[0].result()

In [None]:
prots['genes'] = pd.Series([gene.result() for gene in genes])

In [None]:
prots.head()