# Introduction to Dask

In [None]:
import dask
import time

In [None]:
def square(n):
    time.sleep(1)
    return n * n
    
def add(m, n):
    time.sleep(1)
    return m * n

In [None]:
%%time 

x = square(1)
y = square(2)
z = add(x, y)

## Building a computational graph
***

In [None]:
x = dask.delayed(square)(1)
y = dask.delayed(square)(2)
z = dask.delayed(add)(x, y)

In [None]:
z.visualize(rankdir='LR')

In [None]:
%%time
z.compute()

***
<mark>Question</mark> Rewrite the following cell so it's executed lazily.
 * Which functions should be delayed? `square`? `sum`? Both of them? Why?
 * Visualize the graph.
 * Compare the execution time with the sequential execution.

In [None]:
x = [square(i) for i in range(10)]
y = sum(x)
y

# Multithreaded Cityblock distance matrix function with SciPy and Dask's delayed execution

In this notebook we implement a function to compute the cityblock distance matrix using `scipy.spatial.distance.cdist`. Althought this function is quite fast, it uses a single thread. In cases like this one, it might be convenient to implement a multithreaded version of the function by parallelicing the execution over chunks of data. We are going to use `dask.delayed` to do that.

In [None]:
import numpy as np
from scipy.spatial.distance import cdist
from dask import compute, delayed, visualize

In [None]:
nsamples = 12000
nfeat = 50

x = 10. * np.random.random([nsamples, nfeat])

Let's time the `cdist` function and look the `top` command.

In [None]:
# observe here that the funcion `cdist` used to get the cityblock distance
# is not multithreaded

%timeit cdist(x, x, 'cityblock')

With the `top` command we see that `cdist` runs in a single thread. In such cases it could be quite simple write a distributed version of the function. We can do this very easily with `dask.delayed`!

## Dask's async delayed execution
A simple distributed version of `cdist` can be done as the following:
  * Split the array of vectors into chunks. We can use `np.split(x, num_chunks)`
  * Compute partial cityblock distance matrices of the complete array with respect to each of the chunks
  * Concatenate the resulting list into a single cityblock distance matrix.

Note that concatenation is not a fast operation, so probably we will have to continue improving our function.

In [None]:
# define the list of operations to be performed asynchronously
chunks = 12  # we choose on chunk for physical cpu (gpu partition)
partial_distances = [delayed(cdist)(x, xi, 'cityblock') for xi in np.split(x, chunks)]

# at this point nothing is executed

In [None]:
# visualize the copmutational graph
visualize(partial_distances)

In [None]:
cbdm_dask = delayed(np.concatenate)(partial_distances, axis=1)

In [None]:
# visualize the copmutational graph
visualize(cbdm_dask)

Let's time the compute step and go to the command `top`. Now you can see that computation is executed in parallel resulting in a shorter execution time.

In [None]:
%timeit compute(cbdm_dask, scheduler='threads')

In [None]:
# check that the resulting matrices are the same
cbdm = compute(cbdm_dask, scheduler='threads')[0]
np.abs(cbdm - cdist(x, x, 'cityblock')).max()

A problem with this solution, as mentioned above, is that `np.concatenate` is not  a fast operation.
Let's check how much time it takes without the concatenation part:

In [None]:
%timeit compute(*partial_distances, scheduler='threads')

Let's implement the whole thing as a single function:

In [None]:
def cityblock_dask_concat(x, y, chunks):
    """Implementation using array concatenation"""
    partial_distances = [delayed(cdist)(x, xi, 'cityblock')
              for xi in np.split(x, chunks)]
    cbdm_dask = delayed(np.concatenate)(partial_distances, axis=1)
    return compute(cbdm_dask, scheduler='threads')

In [None]:
print(np.abs(cityblock_dask_concat(x, x, chunks) - cdist(x, x, 'cityblock')).max())

# Dask arrays

In [None]:
import numpy as np
import dask.array as da

Let's create a random dask array, do some operation and visualize it:

In [None]:
x = da.random.random((2000, 2000), chunks=(500, 500))
x

In [None]:
y = x.mean(axis=0)
y.visualize(optimize_graph=True)

Let's increase the size of the array and run the operation.

In [None]:
%%time
x = da.random.random((2000, 2000))
y = x.mean(axis=0)
y.compute().shape

Let's compare the results with NumPy:

In [None]:
%%time 
x = np.random.random((2000, 2000))
y = x.mean(axis=0)
y.shape

Let's consider now the operation `x.dot(x)`. <mark> Question </mark> Could you explain the results of the timings?

In [None]:
%%time 
x = np.random.random((2000, 2000))
y = x.dot(x)
y.shape

In [None]:
%%time
x = da.random.random((2000, 2000))
y = x.dot(x)
y.compute().shape

# Conclusions
The main points to take home from this notebook are:
  * Dask's delayed execution can be used to make distributed version of functions that run on a single thread.
  * Ditributed version of functions that use OpenMP threads might be slower than the original since the CPU threads need be shared between the concurrent executions of the function.