# Distributed Image Processing in the Cloud 

## Learning objectives

- Become familiar with **lazy, distributed Python image processing** with Dask
- Learn how to start and interact with a **Coiled.io cloud cluster**
- Understand why **consistent software environments** are required and how to create them

## Distributed Python Image Processing with Dask

![Dask overview](https://docs.dask.org/en/latest/_images/dask-overview.svg)

[Dask](https://dask.org/) is *a Python-based, flexible library for parallel computing*. Dask provides dynamic task scheduling optimized for interactive computing, and parallel-friendly collections such as the `dask.array`.

Dask can run tasks in parallel on a single machine via threads or processes. Or, the same code can trivially also be executed on a collection of local workstations connected via SSH, or an [HPC cluster with a traditional job scheduler](https://docs.dask.org/en/latest/setup/hpc.html), or a managed Kubernetes-based cloud cluster.

In [None]:
# Load the OME-Zarr image array
import dask.array as da

arr = da.from_zarr('https://s3.embassy.ebi.ac.uk/idr/zarr/v0.1/6001240.zarr', '0')
arr

Unnamed: 0,Array,Chunk
Bytes,70.35 MB,149.05 kB
Shape,"(1, 2, 236, 275, 271)","(1, 1, 1, 275, 271)"
Count,473 Tasks,472 Chunks
Type,>u2,numpy.ndarray
"Array Chunk Bytes 70.35 MB 149.05 kB Shape (1, 2, 236, 275, 271) (1, 1, 1, 275, 271) Count 473 Tasks 472 Chunks Type >u2 numpy.ndarray",2  1  271  275  236,

Unnamed: 0,Array,Chunk
Bytes,70.35 MB,149.05 kB
Shape,"(1, 2, 236, 275, 271)","(1, 1, 1, 275, 271)"
Count,473 Tasks,472 Chunks
Type,>u2,numpy.ndarray


In [None]:
vol = arr[0,0,:,:,:]
vol

Unnamed: 0,Array,Chunk
Bytes,35.18 MB,149.05 kB
Shape,"(236, 275, 271)","(1, 275, 271)"
Count,709 Tasks,236 Chunks
Type,>u2,numpy.ndarray
"Array Chunk Bytes 35.18 MB 149.05 kB Shape (236, 275, 271) (1, 275, 271) Count 709 Tasks 236 Chunks Type >u2 numpy.ndarray",271  275  236,

Unnamed: 0,Array,Chunk
Bytes,35.18 MB,149.05 kB
Shape,"(236, 275, 271)","(1, 275, 271)"
Count,709 Tasks,236 Chunks
Type,>u2,numpy.ndarray


In [None]:
from itkwidgets import view
view(vol, gradient_opacity=0.9)

Viewer(geometries=[], gradient_opacity=0.9, point_sets=[], rendered_image=<itk.itkImagePython.itkImageUS3; pro…

In [None]:
# We can downsample the Dask array with `coarsen`
import numpy as np

factor = 4
down = da.coarsen(np.mean, vol, { 0: factor, 1: factor, 2: factor }, trim_excess=True)
down

Unnamed: 0,Array,Chunk
Bytes,2.15 MB,36.45 kB
Shape,"(59, 68, 67)","(1, 68, 67)"
Count,2125 Tasks,236 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.15 MB 36.45 kB Shape (59, 68, 67) (1, 68, 67) Count 2125 Tasks 236 Chunks Type float64 numpy.ndarray",67  68  59,

Unnamed: 0,Array,Chunk
Bytes,2.15 MB,36.45 kB
Shape,"(59, 68, 67)","(1, 68, 67)"
Count,2125 Tasks,236 Chunks
Type,float64,numpy.ndarray


In [None]:
# Computation is lazy by default -- we can trigger it with .compute()
import time

start = time.time()

down_arr = down.compute()

elapsed = time.time() - start
print(elapsed, 'seconds')

7.793061017990112 seconds


In [None]:
print(type(down_arr))
print(down_arr.shape)
view(down_arr)

<class 'numpy.ndarray'>
(59, 68, 67)


Viewer(geometries=[], gradient_opacity=0.22, point_sets=[], rendered_image=<itk.itkImagePython.itkImageD3; pro…

## Cloud cluster

[Coiled.io](https://coiled.io/) is a service that provides a dynamic cloud cluster with minimal configuration.

In [None]:
#ciskip
# Start a new cloud cluster

# You must first log into Coiled.
import coiled
from dask.distributed import Client, LocalCluster

# Set to re-use a running cluster when re-running the notebook. Listed at https://cloud.coiled.io/<username>/clusters.
name = None
cluster = coiled.Cluster(n_workers=4,
                         worker_cpu=2,
                         worker_memory='6G',
                         name=name,
                         software='thewtex/i2k-2020-mi2')
    
client = Client(cluster)
# Click on the *Dashboard* link
client

*Note:*

It is **critical** that the:

- Jupyter client
- Jupyter kernel
- Dask scheduler
- Dask workers

all have a consistent software environment.

See [the coiled documentation on how to create a consistent software environment](https://docs.coiled.io/user_guide/software_environment.html).

In [None]:
%pycat ./create_coiled_software_environment.py

In [None]:
# Run on the cloud cluster -- keep an eye on the dashboard!
start = time.time()

down_arr = down.compute()

elapsed = time.time() - start
print(elapsed, 'seconds')

9.133583784103394 seconds


## Exercises

*Change the data chunk size.*

*How does chunk size impact downsampling performance?*

In [None]:
# Run on a local "cluster" -- how does performance compare? Why?
from dask.distributed import Client, LocalCluster

local_cluster = LocalCluster(n_workers=2, processes=False, memory_limit='6G')
client = Client(local_cluster)
client

0,1
Client  Scheduler: inproc://10.10.10.247/3140440/1  Dashboard: http://10.10.10.247:8787/status,Cluster  Workers: 2  Cores: 16  Memory: 12.00 GB


In [None]:
start = time.time()

down_arr = down.compute()

elapsed = time.time() - start
print(elapsed, 'seconds')

3.965486526489258 seconds
