# Distributed Image Processing in the Cloud 

## Learning objectives

- Become familiar with **lazy, distributed Python image processing** with Dask
- Learn how to start and interact with a **Coiled.io cloud cluster**
- Understand why **consistent software environments** are required and how to create them

## Distributed Python Image Processing with Dask

![Dask overview](https://docs.dask.org/en/latest/_images/dask-overview.svg)

[Dask](https://dask.org/) is *a Python-based, flexible library for parallel computing*. Dask provides dynamic task scheduling optimized for interactive computing, and parallel-friendly collections such as the `dask.array`.

Dask can run tasks in parallel on a single machine via threads or processes. Or, the same code can trivially also be executed on a collection of local workstations connected via SSH, or an [HPC cluster with a traditional job scheduler](https://docs.dask.org/en/latest/setup/hpc.html), or a managed Kubernetes-based cloud cluster.

In [1]:
# Load the OME-Zarr image array
import dask.array as da

arr = da.from_zarr('https://s3.embassy.ebi.ac.uk/idr/zarr/v0.1/6001240.zarr', '0')
arr

Unnamed: 0,Array,Chunk
Bytes,67.09 MiB,145.56 kiB
Shape,"(1, 2, 236, 275, 271)","(1, 1, 1, 275, 271)"
Count,473 Tasks,472 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 67.09 MiB 145.56 kiB Shape (1, 2, 236, 275, 271) (1, 1, 1, 275, 271) Count 473 Tasks 472 Chunks Type uint16 numpy.ndarray",2  1  271  275  236,

Unnamed: 0,Array,Chunk
Bytes,67.09 MiB,145.56 kiB
Shape,"(1, 2, 236, 275, 271)","(1, 1, 1, 275, 271)"
Count,473 Tasks,472 Chunks
Type,uint16,numpy.ndarray


In [2]:
import itk

image = itk.imread('data/6001240.tif')

print(type(image))
print(image.shape)

<class 'itk.itkImagePython.itkImageUS3'>
(236, 275, 271)


In [3]:
import numpy as np
import dask.array as da


arr = np.asarray(image)
arr = da.from_array(arr, chunks=64)

arr

Unnamed: 0,Array,Chunk
Bytes,33.55 MiB,512.00 kiB
Shape,"(236, 275, 271)","(64, 64, 64)"
Count,100 Tasks,100 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 33.55 MiB 512.00 kiB Shape (236, 275, 271) (64, 64, 64) Count 100 Tasks 100 Chunks Type uint16 numpy.ndarray",271  275  236,

Unnamed: 0,Array,Chunk
Bytes,33.55 MiB,512.00 kiB
Shape,"(236, 275, 271)","(64, 64, 64)"
Count,100 Tasks,100 Chunks
Type,uint16,numpy.ndarray


In [4]:
from itkwidgets import view
view(arr)

Viewer(geometries=[], gradient_opacity=0.22, point_sets=[], rendered_image=<itk.itkImagePython.itkImageUS3; pr…

In [19]:
def denoise(array_chunk):
    # Currently required for serialization
    import itk
    itk.force_load()
    
    # itk already has parallelism on a single-machine -- if running
    # with dask in parallel on a single-machine, avoid over-subscription
    #
    # itk.set_nthreads(1)
    
    array_float = array_chunk.astype(np.float32)

    denoised = itk.curvature_flow_image_filter(array_float,
                                               number_of_iterations=10)
    denoised = itk.median_image_filter(denoised, radius=1)
    denoised = itk.smoothing_recursive_gaussian_image_filter(denoised,
                                                             sigma=0.5)
    
    return denoised

denoised = arr.map_blocks(denoise, dtype=np.float32)

In [20]:
denoised = denoised.compute()

In [7]:
view(denoised)

Viewer(geometries=[], gradient_opacity=0.22, point_sets=[], rendered_image=<itk.itkImagePython.itkImageF3; pro…

In [10]:
denoised = arr.map_overlap(denoise, dtype=np.float32,
                           depth=6)

In [11]:
denoised = denoised.compute()
view(denoised)

Viewer(geometries=[], gradient_opacity=0.22, point_sets=[], rendered_image=<itk.itkImagePython.itkImageF3; pro…

## Cloud cluster

[Coiled.io](https://coiled.io/) is a service that provides a dynamic cloud cluster with minimal configuration.

In [14]:
#ciskip
# Start a new cloud cluster

# You must first log into Coiled.
import coiled
from dask.distributed import Client

# Set to re-use a running cluster when re-running the notebook.
# Listed at https://cloud.coiled.io/<username>/clusters.
# name = 'thewtex-dd6121ae-0'
name = None
cluster = coiled.Cluster(n_workers=4,
                         worker_cpu=2,
                         worker_memory='6G',
                         name=name,
                         software='thewtex/coiled-science-thursdays-itk')
    
client = Client(cluster)
# Click on the *Dashboard* link
client

Output()

Found software environment build


0,1
Client  Scheduler: tls://ec2-35-172-121-209.compute-1.amazonaws.com:8786  Dashboard: http://ec2-35-172-121-209.compute-1.amazonaws.com:8787,Cluster  Workers: 2  Cores: 4  Memory: 10.00 GiB


*Note:*

It is **critical** that the:

- Jupyter client
- Jupyter kernel
- Dask scheduler
- Dask workers

all have a consistent software environment.

See [the coiled documentation on how to create a consistent software environment](https://docs.coiled.io/user_guide/software_environment.html).

In [12]:
%pycat ./create_coiled_software_environment.py

In [22]:
# Run on the cloud cluster -- keep an eye on the dashboard!

denoised = arr.map_overlap(denoise, dtype=np.float32,
                           depth=6)
denoised = denoised.compute()

### Dask Imaging Resources

- [Dask documention](https://docs.dask.org/en/latest/)
- [Coiled documentation](https://docs.coiled.io/user_guide/index.html)
- [`dask-image` documentation](https://image.dask.org/en/latest/)
- [Bioimaging Image2Knowledge (I2K) 2020 Tutorial](https://github.com/thewtex/modern-insights-from-microscopy-images)
- [SuperComputing 2020 pyHPC Material Science Publication](https://github.com/dani-lbnl/SC20_pyHPC) 