# Dask

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ome/EMBL-EBI-imaging-course-04-2024/blob/main/Day_5/Dask.ipynb)


## Learning objectives

* Introduction to Dask Collections
* Introduction to Dask schedulers

## Launch

This notebook uses the ``environment_basic.yml`` file.

## What is Dask

* [Dask](https://dask.org/) is a free and open-source parallel computing library that scales the existing Python ecosystem.
* Dask helps you scale your data science and machine learning workflows. 
* Dask makes it easy to work with NumPy, pandas, and scikit-learn.
* Dask is a framework to build distributed applications.
* Dask can scale down to your laptop and up to a cluster. We will use it today on an environment which you can set up on your computer.


Dask can be split into **two components**:

* **Collections**:  

Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and pandas. The advantage of this is that it can run in parallel on data that cannot fit into memory.

* **Schedulers**:

Dask provides schedulers to run the tasks in parallel.

## Examples
We will go over some concepts of Dask that we will need today.

### Dask Array

Dask arrays combine many [NumPy](https://numpy.org/) arrays, arranged into chunks within a grid.

Create an array of numbers represented by several NumPy arrays of size 10x10 (the arrays will be smaller if they cannot be divided evenly).

In [1]:
import dask.array as da
x = da.random.random((100, 100), chunks=(10, 10))
x

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,800 B
Shape,"(100, 100)","(10, 10)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 78.12 kiB 800 B Shape (100, 100) (10, 10) Dask graph 100 chunks in 1 graph layer Data type float64 numpy.ndarray",100  100,

Unnamed: 0,Array,Chunk
Bytes,78.12 kiB,800 B
Shape,"(100, 100)","(10, 10)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Use NumPy syntax for operations with Dask arrays. See [numpy.array documentation](https://numpy.org/doc/stable/reference/generated/numpy.array.html).

We transform the array `x` and add the values of ``x`` and transformed array ``x.T``.
We divide the size of the array and the chunk by two along the first axis and slice from index 50 to the end along the second axis and calculate the mean

In [2]:
y = x + x.T
print(y[::2].shape)
z = y[::2, 50:].mean(axis=1)
z

(50, 100)


Unnamed: 0,Array,Chunk
Bytes,400 B,40 B
Shape,"(50,)","(5,)"
Dask graph,10 chunks in 7 graph layers,10 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 400 B 40 B Shape (50,) (5,) Dask graph 10 chunks in 7 graph layers Data type float64 numpy.ndarray",50  1,

Unnamed: 0,Array,Chunk
Bytes,400 B,40 B
Shape,"(50,)","(5,)"
Dask graph,10 chunks in 7 graph layers,10 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


To get the result as a NumPy array

In [3]:
z.compute()

array([1.04584071, 1.06998567, 1.03465863, 1.08447454, 1.06892304,
       1.00529962, 0.96518974, 0.99055045, 0.95025888, 0.99138764,
       0.91387502, 1.00480781, 0.96542963, 0.98906569, 1.14737193,
       1.04255659, 0.98838456, 1.02720619, 0.95438894, 0.95277842,
       1.00488341, 1.04957736, 0.96076232, 0.94057531, 1.06248276,
       0.87216446, 0.98535561, 0.92250993, 1.08389769, 0.9774222 ,
       1.07790348, 0.98619689, 1.07160621, 0.9580342 , 1.08155589,
       0.99507228, 1.02934046, 0.97414949, 0.98329756, 0.98839573,
       1.02403876, 0.9445758 , 0.97662979, 1.13811484, 0.94138436,
       0.96498194, 0.9207191 , 0.92042073, 0.96125012, 0.93450164])

Depending on the available RAM, you might want to let the data persist in memory to speed up further computation

In [4]:
y = y.persist()

To find the time it takes to perform an operation 

In [5]:
%time y.sum().compute()

CPU times: user 27.5 ms, sys: 5.5 ms, total: 33 ms
Wall time: 47.1 ms


9995.900369409947

## Dask Delayed 

``for`` loops are often used to parallelize, e.g. iterate over all the 2D-planes of a Z-stack.

Below we show how to parallelize sequential incrementation of each value using ``dask.delayed``.

In [6]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [7]:
from time import sleep

def increment(x):
    sleep(1)
    return x + 1

In [8]:
%%time
# Running without Dask Delayed (the "usual" way)
results = []
for x in data:
    y = increment(x)
    results.append(y)
    
total = sum(results)

print("Compute:", total) 

Compute: 44
CPU times: user 5.3 ms, sys: 3.31 ms, total: 8.61 ms
Wall time: 8.03 s


We will "transform" our function to use ``dask.delayed``. 
The code below will finish **very quickly**. It will record what we want to compute as a task into a graph that will run later on parallel hardware.

In [9]:
from dask import delayed

In [10]:
# No computation happens here
lazy_results = []

for x in data:
    y = delayed(increment)(x)
    lazy_results.append(y)
    
print("Graph to compute", lazy_results) 

Graph to compute [Delayed('increment-13b8edba-3697-4ac5-8205-6f3515c34b56'), Delayed('increment-347b1e21-96cc-44a7-93b3-347fb1c5b0fe'), Delayed('increment-d556c7be-19b8-4fe2-9104-d4fd4a01c3d6'), Delayed('increment-cdb79900-4ce9-4968-b4f4-dd400c52c1ee'), Delayed('increment-b1369ad3-3f1e-4c33-b926-6b1a48884597'), Delayed('increment-e2831373-abe7-46f4-a2bc-46239bcdf4c9'), Delayed('increment-20871b93-bcec-4d5e-a942-3f3da07fb13d'), Delayed('increment-f0176915-0ea2-4426-8f91-4708225ffdd6')]


To get the result, we need to invoke the ``dask.compute`` method.

The ``*`` in front of ``lazy_results`` below will **unpack the sequence/collection into positional arguments**.

In [11]:
import dask
%time result = dask.compute(*lazy_results)
print("Computed the graph :", result)  # Print result after it is computed

CPU times: user 4.89 ms, sys: 1.91 ms, total: 6.79 ms
Wall time: 1.01 s
Computed the graph : (2, 3, 4, 5, 6, 7, 8, 9)


There are few tricks to learn with ``dask.delayed``. Please check [dask.delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html)

### Parallelize the code below using ``dask.delayed``

We create a 3D-numpy array mimicking an image z-stack.
We want to segment the XY-plane using ``dask_image``.
The common way is to do it in ``for`` loop (see below), we want to segment the planes in parallel using ``dask.compute``

```
import dask etc.
# Create an artificial image
planes = da.random.random((10, 100, 100), chunks=(10, 10, 10))
print(planes.shape)
```

```
import dask_image.ndfilters
import dask_image.ndmeasure


for z in range(planes.shape[0]):
    # move the code below in a new function and use dask.delayed
    plane = planes[z, :, :]
    smoothed_image = dask_image.ndfilters.gaussian_filter(plane, sigma=[1, 1])
    threshold_value = 0.33 * da.max(smoothed_image).compute()
    threshold_image = smoothed_image > threshold_value
    label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
```    

### Install dependencies if required

The cell below will install dependencies if you choose to run the notebook in [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb#recent=true). **Do not run the cell if you are not running the notebook in Google Colab**.


If using Google Colab, **do not** use the ``Runtime>Run all`` entry.

In [None]:
%pip install dask-image

In [12]:
planes = da.random.random((10, 100, 100), chunks=(2, 10, 10))
print(planes.shape)

(10, 100, 100)


In [16]:
import dask
import dask_image.ndfilters
import dask_image.ndmeasure

def analyze(z):
    plane = planes[z, :, :]
    smoothed_image = dask_image.ndfilters.gaussian_filter(plane, sigma=[1, 1])
    threshold_value = 0.33 * da.max(smoothed_image).compute()
    threshold_image = smoothed_image > threshold_value
    label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
    name = "z:%s" % (z)
    return label_image, name

Segment every plane across the z-stack:
 * First we prepare the graph
 * The we compute

In [17]:
from dask import delayed

lazy_results = []
for z in range(planes.shape[0]):
    r = delayed(analyze)(z)
    lazy_results.append(r)

In [18]:
%time results = dask.compute(*lazy_results)

CPU times: user 6.17 s, sys: 452 ms, total: 6.62 s
Wall time: 6.36 s


View the results

In [19]:
import matplotlib.pyplot as plt
%matplotlib inline
from ipywidgets import *

def display(i=0):
    r, name = results[i]
    fig = plt.figure(figsize=(10, 10))
    plt.subplot(121)
    plt.imshow(r)
    plt.title(name)
    fig.canvas.flush_events()

interact(display, i= widgets.IntSlider(value=0, min=0, max=len(results)-1, step=1, description="Select Plane", continuous_update=False))


interactive(children=(IntSlider(value=0, continuous_update=False, description='Select Plane', max=9), Output()…

<function __main__.display(i=0)>

### (Advanced) Dask cluster 

Your computer will have multiple cores e.g. 4. When writing regular Python code, you are probably only using 1 of them. If you are using NumPy, e.g. for matrix multiplication, you will be using multiple cores because NumPy knows how to do it but general Python code doesn't.

Dask cluster allows you to use multiple cores on your computer.
Dask has also a dashboard that you can use to monitor your work.

This time we will use a local cluster to increment each value in the ``data`` array defined above.

In [20]:
def prepare_call(client):
    futures = []
    for x in data:
        y = client.submit(increment, x)
        futures.append(y)
    return futures

Create a local cluster

In [21]:
from dask.distributed import Client, LocalCluster

In [22]:
# if you want to specify number of workers etc.
cluster = LocalCluster(n_workers=2, processes=True, threads_per_worker=1)
# or simply 
# cluster = LocalCluster()
with Client(cluster) as client:
    # perform code
    futures = prepare_call(client)
    results = client.gather(futures)

print(results)

[2, 3, 4, 5, 6, 7, 8, 9]


## Parallelize using LocalCluster

Using the 3D data above, parallelize the "segmentation" example this time using a LocalCluster instead of ``dask.delayed``. [Solution](Solutions_Dask.ipynb)

### License (BSD 2-Clause)
Copyright (C) 2024 University of Dundee. All Rights Reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.