# out-of-core image analysis with dask

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [None]:
%gui qt

In [None]:
# needed for %gui qt to work in jupyter book

import time
time.sleep(5)
del time

In [None]:
import dask.array as da

## dask.distributed

A first life hack: in general, avoid using bare dask, and instead create a `dask.distributed` Client as in the cell below. What this buys you:

- memory management: the distributed scheduler that is automatically created with the default client will launch processes with maximum memory limits. If they exceed those limits, the scheduler will stop sending tasks to them and eventually kill them. In contrast, without `distributed`, you are subject to the same issues as bare Python. It is very easy to freeze your machine.
- a [diagnostics dashboard](https://docs.dask.org/en/latest/diagnostics-distributed.html). This can be invaluable in helping to understand performance in your application. We'll see a live example below.
- seamless scaling. Whether the scheduler is using local workers or connected to [your institution's HPC](https://jobqueue.dask.org/en/latest/), or [cloud compute](https://docs.dask.org/en/latest/setup/cloud.html), the API is the same — you just change the scheduler and connect the Client to it.

In [None]:
from dask import distributed
client = distributed.Client()
print(client.dashboard_link)

In [None]:
random_image = np.random.random((512, 512))

import napari

napari.view_image(random_image)

In [None]:
impossible_image = da.random.random(
    (40_000, 2_000, 2_000),
    chunks=(1, 1_000, 1_000),
)

print(impossible_image.nbytes / 1e9)

In [None]:
napari.view_image(impossible_image)

In [None]:
from dask_image.imread import imread

In [None]:
embryo = imread('/Users/jni/data/Fluo-N3DH-CE/01/t*.tif')

In [None]:
type(embryo)

In [None]:
embryo.shape

In [None]:
embryo.dtype

In [None]:
embryo.nbytes / 1e9

In [None]:
embryo.chunksize

In [None]:
viewer = napari.view_image(
    embryo,
    scale=[10, 1, 1],
)

In [None]:
import dask

In [None]:
from dask_image import ndfilters

smoothed = ndfilters.gaussian_filter(
    embryo,
    sigma=[0, 1, 10, 10],
)

smoothed_layer = viewer.add_image(
    smoothed,
    scale=[10, 1, 1],
)

In [None]:
from skimage.morphology import h_maxima

In [None]:
maxes = dask.compute(np.max(smoothed, axis=(1, 2, 3)))[0]  # takes a minute or so, check dashboard!

In [None]:
plt.plot(maxes)

In [None]:
result0 = np.transpose(np.nonzero(h_maxima(np.array(smoothed[0]), 90)))
result0.shape

In [None]:
# this function needs dramatic improvement

def peaks_from_dask(idx, volume, min_height=90):
    arr = np.asarray(volume)
    maxima = h_maxima(arr, min_height)
    coords = np.transpose(np.nonzero(maxima))
    ncoord = coords.shape[0]
    coords = np.concatenate(
        (np.full((ncoord, 1), idx), coords),
        axis=1,
    )
    return coords

In [None]:
futures = client.map(
    peaks_from_dask,
    np.arange(smoothed.shape[0]),
    smoothed,
)

In [None]:
for f in futures:
    f.cancel()

In [None]:
futures_small = client.map(peaks_from_dask, np.arange(10), smoothed[:10])

In [None]:
coords = np.concatenate(
    [f.result() for f in futures_small],
    axis=0,
)

In [None]:
viewer.add_points(
    coords * [1, 10, 1, 1],
    name='coords',
)

In [None]:
coords[:10]