### Chunking a single ~400 MB tif file and writing to zarr file

In [1]:
import dask.array as da
from dask.delayed import delayed
from dask_image.imread import imread
from numcodecs import Blosc
from skimage import io
import zarr

In [2]:
from dask.distributed import Client, progress
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:55989,Cluster  Workers: 4  Cores: 8  Memory: 8.59 GB


In [3]:
import os
os.chdir('/Users/irahorecka/Desktop/Harddrive_Desktop/UCB_ABC/LLS_Pipeline/Exercises/Dask')

In [4]:
large_tif = imread('Scan_Iter_0000_CamA_ch0_CAM1_stack0000_488nm_0000000msec_0016966725msecAbs_000x_000y_000z_0000t.tif')

  warn(message)


In [5]:
import time
import functools

def timer(method):
    """timer wrapper function to calculate method execution
    time"""
    @functools.wraps(method)
    def wrapper(*args, **kwargs):
        t0 = time.time()
        method(*args, **kwargs)
        t1 = time.time()
        print(f"{method.__name__} - process time: {'%.4f' % float(t1-t0)} seconds")
    return wrapper

@timer
def sync_write(da_file, zarr_name):
    """synchronously write large tif file to zarr file"""
    da_file.to_zarr(zarr_name, compressor=Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE))
    
@timer
def async_write(da_file, zarr_name):
    """asynchronously write large tif file to zarr file
    using dask"""
    out = da_file.to_zarr(
        zarr_name, compressor=Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE), compute=False
    )
    fut = client.compute(out)


In [6]:
"""Using a simple, synchronous method of writing to zarr.
Performance is hindered here.
Default chunks (401)."""

print(large_tif.shape)
sync_write(large_tif, 'large_tif_sync.zarr')
async_write(large_tif, 'large_tif_async.zarr')
large_tif

(401, 1600, 288)
sync_write - process time: 3.2284 seconds
async_write - process time: 0.1003 seconds


Unnamed: 0,Array,Chunk
Bytes,369.56 MB,921.60 kB
Shape,"(401, 1600, 288)","(1, 1600, 288)"
Count,1203 Tasks,401 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 369.56 MB 921.60 kB Shape (401, 1600, 288) (1, 1600, 288) Count 1203 Tasks 401 Chunks Type uint16 numpy.ndarray",288  1600  401,

Unnamed: 0,Array,Chunk
Bytes,369.56 MB,921.60 kB
Shape,"(401, 1600, 288)","(1, 1600, 288)"
Count,1203 Tasks,401 Chunks
Type,uint16,numpy.ndarray


In [7]:
"""Concurrency (asynchronous) when chunking a singel tif file.
Use dask array to convert to zarr.
Rechunk to 1440 chunks."""

tif_chunk = large_tif.rechunk((401,40,8))
print(tif_chunk.shape)
sync_write(tif_chunk, 'tif_chunk_sync.zarr')
async_write(tif_chunk, 'tif_chunk_async.zarr')
tif_chunk

(401, 1600, 288)
sync_write - process time: 8.4534 seconds
async_write - process time: 0.7291 seconds


Unnamed: 0,Array,Chunk
Bytes,369.56 MB,256.64 kB
Shape,"(401, 1600, 288)","(401, 40, 8)"
Count,6966 Tasks,1440 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 369.56 MB 256.64 kB Shape (401, 1600, 288) (401, 40, 8) Count 6966 Tasks 1440 Chunks Type uint16 numpy.ndarray",288  1600  401,

Unnamed: 0,Array,Chunk
Bytes,369.56 MB,256.64 kB
Shape,"(401, 1600, 288)","(401, 40, 8)"
Count,6966 Tasks,1440 Chunks
Type,uint16,numpy.ndarray


In [8]:
"""Concurrency (asynchronous) when chunking a singel tif file.
Use dask array to convert to zarr.
Rechunk to 4 chunks."""

tif_chunk_1 = large_tif.rechunk((401,400,288))
print(tif_chunk_1.shape)
sync_write(tif_chunk_1, 'tif_chunk_1_sync.zarr')
async_write(tif_chunk_1, 'tif_chunk_1_async.zarr')
tif_chunk_1

(401, 1600, 288)
sync_write - process time: 3.0223 seconds
async_write - process time: 0.0662 seconds


Unnamed: 0,Array,Chunk
Bytes,369.56 MB,92.39 MB
Shape,"(401, 1600, 288)","(401, 400, 288)"
Count,1222 Tasks,4 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 369.56 MB 92.39 MB Shape (401, 1600, 288) (401, 400, 288) Count 1222 Tasks 4 Chunks Type uint16 numpy.ndarray",288  1600  401,

Unnamed: 0,Array,Chunk
Bytes,369.56 MB,92.39 MB
Shape,"(401, 1600, 288)","(401, 400, 288)"
Count,1222 Tasks,4 Chunks
Type,uint16,numpy.ndarray


In [9]:
"""Concurrency (asynchronous) when chunking a singel tif file.
Use dask array to convert to zarr.
Rechunk to 1 chunk."""

tif_chunk_2 = large_tif.rechunk((401,1600,288))
print(tif_chunk_2.shape)
sync_write(tif_chunk_2, 'tif_chunk_2_sync.zarr')
async_write(tif_chunk_2, 'tif_chunk_2_async.zarr')
tif_chunk_2

(401, 1600, 288)
sync_write - process time: 4.6990 seconds
async_write - process time: 0.0611 seconds


Unnamed: 0,Array,Chunk
Bytes,369.56 MB,369.56 MB
Shape,"(401, 1600, 288)","(401, 1600, 288)"
Count,1204 Tasks,1 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 369.56 MB 369.56 MB Shape (401, 1600, 288) (401, 1600, 288) Count 1204 Tasks 1 Chunks Type uint16 numpy.ndarray",288  1600  401,

Unnamed: 0,Array,Chunk
Bytes,369.56 MB,369.56 MB
Shape,"(401, 1600, 288)","(401, 1600, 288)"
Count,1204 Tasks,1 Chunks
Type,uint16,numpy.ndarray


#### Summary:
It appears that a chunk number equal to the number of workers (tif_chunk_1) will yield the best performance when asynchronously writing to zarr. 

### Chunking a N ~20 MB tif files and writing to N zarr files

In [10]:
import dask.array as da
from dask.delayed import delayed
from dask_image.imread import imread
from numcodecs import Blosc
from skimage import io
import zarr

In [11]:
from dask.distributed import Client, progress
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:56034,Cluster  Workers: 4  Cores: 8  Memory: 8.59 GB


In [32]:
def auto_chunk(tif_file: str) -> tuple:  # input filepath to single tif file
    """find optimal rechunking size (tuple of 3 ints).
    optimal rechunk size is having no. chunks equivalent
    to no. workers"""
    nworkers = len(client.ncores())
    tif_load = imread(tif_file)
    orig_chunk = tif_load.shape
    x = int
    while x != 0:
        for index, i in enumerate(orig_chunk):
            x = i % nworkers
            if x == 0:
                break
        if x != 0:
            nworkers += 1
        if nworkers >= orig_chunk[0]:  # i.e. original chunking by dask
            break
    if x == 0:
        # print(i, index, nworkers)
        new_chunk = list(orig_chunk)
        new_chunk[index] = int(new_chunk[index] / nworkers)
        new_chunk = tuple(new_chunk)
        return new_chunk
    return orig_chunk


def opti_chunk(file: str, chunk: tuple):
    """read and rechunk tif file"""
    array = imread(file)  # dask_image.imread
    array_rechunk = array.rechunk(chunk)
    time.sleep(.1)  # introduce latency


@timer
def sync_execute(tif_dir: list):
    """rechunk and imread tif files synchronously"""
    chunk = auto_chunk(tif_dir[0])
    for tif_file in tif_dir:
        opti_chunk(tif_file, chunk)
        

@timer
def async_execute(tif_dir: list):
    """rechunk and imread tif files asynchronously
    using dask futures"""
    chunk = auto_chunk(tif_dir[0])
    futures = []
    for tif_file in tif_dir:
        futures.append(client.submit(opti_chunk, tif_file, chunk))
    result = [future.result() for future in futures]


In [33]:
import os
os.chdir('/Users/irahorecka/Desktop/Harddrive_Desktop/UCB_ABC/demo_napari/GPU')
small_tifs = [f"{os.getcwd()}/{f}" for f in os.listdir() if f[-4:] == '.tif']
os.chdir('/Users/irahorecka/Desktop/Harddrive_Desktop/UCB_ABC/LLS_Pipeline/Exercises/Dask')

In [34]:
"""Compare synchronous vs asynchronous rechunk and imread
of 120 ~20 MB files (small_tifs)"""

sync_execute(small_tifs)
async_execute(small_tifs)

sync_execute - process time: 14.6280 seconds
async_execute - process time: 2.1657 seconds
