<img src="IMG/dask_horizontal.svg"  width="30%">



# Introduction to DASK
Dask is a parallel computing library that scales the existing Python ecosystem. This tutorial will introduce Dask and parallel data analysis more generally.

Dask can scale down to your laptop laptop and up to a cluster.


*  **High level Interface:**  Dask provides high-level Array, Bag, and DataFrame
   collections that mimic NumPy, lists, and Pandas but can operate in parallel on
   datasets that don't fit into memory.  Dask's high-level collections are
   alternatives to NumPy and Pandas for large datasets.

*  **Low Level Interface:** Dask provides dynamic task schedulers that
   execute task graphs in parallel.  These execution engines power the
   high-level collections mentioned above but can also power custom,
   user-defined workloads.  These schedulers are low-latency (around 1ms) and
   work hard to run computations in a small memory footprint.  Dask's
   schedulers are an alternative to direct use of `threading` or
   `multiprocessing` libraries in complex cases or other task scheduling
   systems like `Luigi` or `IPython parallel`.
   
## DASK Ressources 
*  [DASK Website](http://dask.org)
*  [Documentation](https://dask.pydata.org/en/latest/)


In [None]:
#%run prep.py

# High Level Interfaces: Parallelizing NumPy and Pandas

## Pandas
Pandas Dataframes are a very popular tool that provide full speadsheet and in-memory database functionality for the ***Python*** ecco system. It has become one of tstandard tools in Data Science and is also widely used for pre-processing / data wrangling tasks in machine learning

* the ***DASK*** API for ***Pandas*** is analog to the one for ***NumPy*** and will not be discussed here, see https://docs.dask.org/en/latest/dataframe.html for details.

## NumPy
As seen before, NumPy arrays are the dominat array and matrix data structure in the ***Python*** data science and machine learning ecco system.
* refresh your NumPy skills with this extra [tutorial](Extra_Material/Numpy.ipynb)

### Functionality of Dask arrays

* Arithmetic and scalar mathematics: +, *, exp, log, ...
* Reductions along axes: sum(), mean(), std(), sum(axis=0), ...
* Tensor contractions / dot products / matrix multiply: tensordot
* Axis reordering / transpose: transpose
* Slicing: x[:100, 500:100:-2]
* Fancy indexing along single axes with lists or NumPy arrays: x[:, [10, 1, 5]]
* Array protocols like __array__ and __array_ufunc__
* Some linear algebra: svd, qr, solve, solve_triangular, lstsq

**Full Array API: https://docs.dask.org/en/latest/array-api.html**

### Limitations
However, Dask Array does not implement the entire NumPy interface. Users expecting this will be disappointed. Notably, Dask Array lacks the following features:

* Much of np.linalg has not been implemented. This has been done by a number of excellent BLAS/LAPACK implementations, and is the focus of numerous ongoing academic research projects
* Arrays with unknown shapes do not support all operations
* Operations like sort which are notoriously difficult to do in parallel, and are of somewhat diminished value on very large data (you rarely actually need a full sort). Often we include parallel-friendly alternatives like topk
* Dask Array doesn’t implement operations like tolist that would be very inefficient for larger datasets. Likewise, it is very inefficient to iterate over a Dask array with for loops
  
**DASK is still evolving ...**


## Just get started!

In [None]:
#let's parallelize a simple array opperations
import numpy as np
np_A = np.random.rand(10000,10000) #create random NumPy array
%time np_A.sum()

In [None]:
#now in parallel 
import dask.array as da
da_A = da.from_array(np_A, chunks=(5000)) #create DASk array from existing NumPy array
res = da_A.sum() #tell dask to compute the sum of A

In [None]:
res

### Where is my result?
In a nutshell: ***DASK*** basically works like this:
* First, it builds an execution graph
* Then, we need to execute it by calling ``compute()``

In [None]:
#call compute
%time res.compute()

### Why is this not faster?
Well, building the graph, parallelizing it and scheduling it's execution causes some overhead...
* probably, our problem was to small :-)


### What is happening at ``compute()`` ?


In [None]:
#need one mor lib
!conda install -y python-graphviz

In [None]:
#have look at the actual compute graph build by DASK
res.visualize(filename='graph.svg')

### Ahh - MapReduce!
* looks like ***DASK*** automatically implemented MapReduce
* but, why did it use 4 parallel paths?

### Block concept of DASK arrays
* ***DASK*** splitts arrays into blocks 
* parallelization (here MapReduce) runs over Blocks
* Blocks sizes are definde by the ``chunks`` 

```
#recall array creation
da_A = da.from_array(np_A, chunks=(5000))
```

<img SRC="IMG/array.png">

In [None]:
#get size of chucks
da_A.chunks

In [None]:
da_A.chunksize

In [None]:
#change chunk size
#WARNING: can be very expensive - better think about the chunk size before ...
B=da_A.rechunk(1000)

In [None]:
B.chunks

In [None]:
res=B.sum()
graph=res.visualize(filename='graph.svg')#saving graph to file

### Is this running in Parallel ?
* open ternminal
* start ``top``
* execute ``compute()``

In [None]:
%%timeit #measure mean of 10 runs
res.compute()

### Memory Usage
By processing each block at a time, ***DASK*** is not only able tu run in parallel, but also needs only tthe memory for the bock size...
* allows easy processing of **larger than memory** data sets

In [None]:
#a larger example in NumPy
%time x = np.random.normal(10, 0.1, size=(20000, 20000)) 
%time y = x.mean(axis=0) #compute mean along first axis


In [None]:
#same in DASK
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0) 
%time res=y.compute() 

# Distributed Execution
## NOTE: the rest of this notebook requires a running DASK Cluster

In [None]:
from dask.distributed import Client #import distributed client

In our cluster enviroment, the ***DASK*** Cluster is automatically started for multi node jobs. See DASK documentation on how to setup a ***DASK*** cluster: http://distributed.dask.org/en/latest/setup.html

In [None]:
client = Client() #start local cluster on CPU cores

In [None]:
#see how many workers we have
workers=client.scheduler_info()['workers']
for w in workers:
    print(w)

Vist [http://localhost:8787](http://localhost:8787) to see your cluster status

## Submit work to the Cluster

In [None]:
#simple remote functions
def square(x):
    return x ** 2

def neg(x):
    return -x

#using the map future, functions will be scheduled for execution - NO graph building and compute()
A = client.map(square, range(1000))
B = client.map(neg, A)

In [None]:
total = client.submit(sum, B)

In [None]:

res = total.result()
print ("done: ",res)

### Arrays on the Cluster
Starting a cluster client automatically cause the high level interfaces to use the cluster

In [None]:
import numpy as np
np_A = np.random.rand(10000,10000) #create random NumPy array
import dask.array as da
cluster_A = da.from_array(np_A, chunks=(5000)) #create DASk array from existing NumPy array
res = cluster_A.sum() #tell dask to compute the sum of A

In [None]:

%time res.compute()

## ***DASK-ML*** Project
Brand new approach to make ***DASK*** a full distributed ML framework.
http://dask-ml.readthedocs.io/en/latest/joblib.html

In [None]:
#example: Clustering
%matplotlib inline

import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt


In [None]:
#generate like in execise 2, but now with dask_mL
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=0,
                                   centers=3)
X = X.persist() #manual executions on X
X

In [None]:
#call skleran k-means
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2000, oversampling_factor=10)
km.fit(X)

In [None]:
plt.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],  cmap='viridis', alpha=0.25);



# Low Level Interfaces: Parallelizing Compute Graphs
Building the Compute-Graph manually

In [None]:
# simple example
def inc(x):
    return x + 1

def double(x):
    return x + 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)


In [None]:
total

### **Now, build a graph:**
```dask.delayed()``` add functions and arguments to graph:

In [None]:
import dask

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)


In [None]:
total.visualize(filename="graph2.svg")

In [None]:
#call copute to execute
total.compute()