# What is Dask

* [Dask](https://dask.org/) is a free and open-source parallel computing library that scales the existing Python ecosystem.
* Dask helps you scale your data science and machine learning workflows. 
* Dask makes it easy to work with Numpy, pandas, and Scikit-Learn etc.
* Dask is a framework to build distributed applications.
* Dask can scale down to your laptop and up to a cluster. We will use today on an environment you can set up on your computer.


Dask can be split into **two components**:

* **Collections**:  

Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas. The advantage is that in can run in parallel on data that cannot fit in memory.
* **Schedulers**:

Dask provides schedulers to run the tasks in parallel

## Examples
We will go over some concepts of Dask that we will need today.
This notebook contains a few exercises, the solutions are available in a separate [notebook]().

### Dask Array

Dask arrays combine many [NumPy](https://numpy.org/) arrays, arranged into chunks within a grid.

Create an array of numbers represented by several NumPy arrays of size 10x10 (the arrays will be smaller if they cannot be divided evenly).

In [85]:
import dask.array as da
x = da.random.random((100, 100), chunks=(10, 10))
x

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,800 B
Shape,"(100, 100)","(10, 10)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 78.12 kiB 800 B Shape (100, 100) (10, 10) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",100  100,

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,800 B
Shape,"(100, 100)","(10, 10)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


Use NumPy syntax for operations with Dask arrays. See [numpy.array documentation](https://numpy.org/doc/stable/reference/generated/numpy.array.html).

We transform the array `x` and add the values of ``x`` and transformed array ``x.T``.
We divide the size of the array and the chunk by two along the first axis and slice from index 50 to the end along the second axis and calculate the mean

In [89]:
y = x + x.T
print(y[::2].shape)
z = y[::2, 50:].mean(axis=1)
z

(50, 100)


Unnamed: 0,Array,Chunk
Bytes,400 B,40 B
Shape,"(50,)","(5,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 400 B 40 B Shape (50,) (5,) Count 430 Tasks 10 Chunks Type float64 numpy.ndarray",50  1,

Unnamed: 0,Array,Chunk
Bytes,400 B,40 B
Shape,"(50,)","(5,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray


To get the result as a NumPy array

In [71]:
z.compute()

array([1.10313273, 0.98110019, 1.03912925, 0.99845679, 0.98786335,
       1.0195118 , 1.10746836, 0.96389864, 1.03843629, 0.99158417,
       1.06553231, 0.97886679, 1.04023854, 1.05131277, 0.93531925,
       1.03594913, 0.95938288, 0.94416435, 0.99124613, 0.83566694,
       0.92394539, 1.00713178, 1.0164884 , 0.97781701, 1.0488211 ,
       1.0492441 , 0.9714299 , 0.95540818, 0.9509488 , 0.96784203,
       1.00755896, 1.0128567 , 1.06842598, 1.04205964, 1.03330535,
       1.05690754, 1.03416651, 0.92930565, 1.02043432, 1.12277939,
       0.99712537, 0.9846522 , 0.99679312, 1.02142025, 1.13188632,
       1.05243094, 1.036054  , 1.05240528, 1.00092722, 1.00831373])

Depending on the available RAM, you might want to let the data persist in memory to speed up further computation

In [72]:
y = y.persist()

To find the time it takes to perform an operation 

In [73]:
%time y.sum().compute()

CPU times: user 20.9 ms, sys: 3.76 ms, total: 24.6 ms
Wall time: 26.7 ms


9966.715115575867

## Dask Delayed 

``for`` loops are often used to parallelize, e.g. iterate over all the 2D-planes of a Z-stack.

Below we show how to parallelize sequential incrementation of each value using ``dask.delayed``.

In [74]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [75]:
from time import sleep

def increment(x):
    sleep(1)
    return x + 1

In [76]:
%%time
# Running without Dask Delayed (the "usual" way)
results = []
for x in data:
    y = increment(x)
    results.append(y)
    
total = sum(results)

print("Compute:", total) 

Compute: 44
CPU times: user 466 ms, sys: 115 ms, total: 582 ms
Wall time: 8.03 s


We will "transform" our function to use ``dask.delayed``. 
The code below will finish **very quickly**. It will record what we want to compute as a task into a graph that will run later on parallel hardware.

In [79]:
from dask import delayed

In [81]:
# No computation happens here
lazy_results = []

for x in data:
    y = delayed(increment)(x)
    lazy_results.append(y)
    
print("Graph to compute", lazy_results) 

Graph to compute [Delayed('increment-5c3d63ee-ceb7-4bb0-88de-4e3420fc4a5c'), Delayed('increment-576fffff-9b76-44c7-8c8f-33663129269a'), Delayed('increment-ad501ae8-2ac8-4917-9a3a-06e47e09cb66'), Delayed('increment-cd40f504-d066-4bb8-be46-6ea6316f0138'), Delayed('increment-54e30c2a-6a5c-4cfa-89e4-e3864167f265'), Delayed('increment-11dbacc8-d788-472c-bf7a-73dd8310ff20'), Delayed('increment-fc012e05-e4e3-4435-8052-3543b80d36a9'), Delayed('increment-5eea6523-27ae-46f4-87cf-dd7dba0450b6')]


To get the result, we need to invoke the ``dask.compute`` method.

The ``*`` in front of ``lazy_results`` below will **unpack the sequence/collection into positional arguments**.

In [84]:
import dask
%time result = dask.compute(*lazy_results)
print("Computed the graph :", result)  # Print result after it is computed

CPU times: user 53.5 ms, sys: 13.3 ms, total: 66.8 ms
Wall time: 1.02 s
Compute the graph : (2, 3, 4, 5, 6, 7, 8, 9)


There are few tricks to learn with ``dask.delayed``. Please check [dask.delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html)

### Parallelize the code below using ``dask.delayed``

We create a 3D-numpy array mimicking an image z-stack.
We want to segment the XY-plane using ``dask_image``.

Run the segmentation code below in parallel.
Hint: 
* Use the suggestion in this cell to define a function first named ``analyze``
* The function should return the label_image and the z-section so that we can use them as pointers for later identification.
* In the ``for`` loop, collect the elements of the graph then compute the graph using ``dask.compute``

```
import dask etc.
# Create an artificial image
planes = da.random.random((10, 100, 100), chunks=(10, 10, 10))
print(planes.shape)
```

```
import dask_image.ndfilters
import dask_image.ndmeasure


for z in range(planes.shape[0]):
    # move the code below in a new function and use dask.delayed
    plane = planes[z, :, :]
    smoothed_image = dask_image.ndfilters.gaussian_filter(plane, sigma=[1, 1])
    threshold_value = 0.33 * da.max(smoothed_image).compute()
    threshold_image = smoothed_image > threshold_value
    label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
```    

### (Advanced) Dask cluster 

Your computer will have multiple cores e.g. 4. When writing regular Python code, you are probably only using 1 of them. If you are using Numpy, e.g. for matrix multiplication, you will be using multiple cores because Numpy knows how to it but general Python code doesn't.

Dask cluster allows you to use multiple cores on your computer.
Dask has also a dashboard that you can use to monitor your work.

This time we will use a local cluster to increment each value in the ``data`` array define above.

In [41]:
def prepare_call(client):
    futures = []
    for x in data:
        y = client.submit(increment, x)
        futures.append(y)
    return futures

Create a local cluster

In [42]:
from dask.distributed import Client, LocalCluster

In [43]:
# if you want to specify number of workers etc.
cluster = LocalCluster(n_workers=2, processes=True, threads_per_worker=1)
# or simply 
# cluster = LocalCluster()
with Client(cluster) as client:
    # perform code
    futures = prepare_call(client)
    results = client.gather(futures)

print(results)

[2, 3, 4, 5, 6, 7, 8, 9]


## Parallelize using LocalCluster

Using the 3D data above, parallelize the "segmentation" example this time using a LocalCluster instead of ``dask.delayed``.

### License (BSD 2-Clause)
Copyright (C) 2022 University of Dundee. All Rights Reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.