# Profile array copy via dask threaded scheduler

This notebook profiles a very simple array copy operation, using synthetic data.

In [1]:
import sys
sys.path.insert(0, '..')
import zarr
print('zarr', zarr.__version__)
from zarr import blosc
import numpy as np
import h5py
import bcolz
# don't let bcolz use multiple threads internally, we want to 
# see whether dask can make good use of multiple CPUs
bcolz.set_nthreads(1)
import multiprocessing
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
from cachey import nbytes
import bokeh
from bokeh.io import output_notebook
output_notebook()

zarr 0.4.1.dev94+dirty


In [2]:
import tempfile
import operator
from functools import reduce
from zarr.util import human_readable_size


def h5fmem(**kwargs):
    """Convenience function to create an in-memory HDF5 file."""

    # need a file name even tho nothing is ever written
    fn = tempfile.mktemp()

    # file creation args
    kwargs['mode'] = 'w'
    kwargs['driver'] = 'core'
    kwargs['backing_store'] = False

    # open HDF5 file
    h5f = h5py.File(fn, **kwargs)

    return h5f


def h5d_diagnostics(d):
    """Print some diagnostics on an HDF5 dataset."""
    
    print(d)
    nbytes = reduce(operator.mul, d.shape) * d.dtype.itemsize
    cbytes = d._id.get_storage_size()
    if cbytes > 0:
        ratio = nbytes / cbytes
    else:
        ratio = np.inf
    r = '  compression: %s' % d.compression
    r += '; compression_opts: %s' % d.compression_opts
    r += '; shuffle: %s' % d.shuffle
    r += '\n  nbytes: %s' % human_readable_size(nbytes)
    r += '; nbytes_stored: %s' % human_readable_size(cbytes)
    r += '; ratio: %.1f' % ratio
    r += '; chunks: %s' % str(d.chunks)
    print(r)
 

In [3]:
def profile_dask_copy(src, dst, chunks, num_workers=multiprocessing.cpu_count(), dt=0.1, lock=True):
    dsrc = da.from_array(src, chunks=chunks)
    with Profiler() as prof, ResourceProfiler(dt=dt) as rprof:
        da.store(dsrc, dst, num_workers=num_workers, lock=lock)
    visualize([prof, rprof])
    

## NumPy arrays

In [4]:
a1 = np.arange(400000000, dtype='i4')
a2 = np.empty_like(a1)

In [5]:
chunks = 2**20,  # 4M

In [6]:
profile_dask_copy(a1, a2, chunks, lock=True, dt=.02)

In [7]:
profile_dask_copy(a1, a2, chunks, lock=False, dt=.02)

Interestingly, when the dask lock is disabled, more CPU is utilised, but the overall time taken is longer. Is this due to some contention over memory access?

## Zarr arrays (in-memory)

In [8]:
z1 = zarr.array(a1, chunks=chunks)
z1

zarr.core.Array((400000000,), int32, chunks=(1048576,), order=C)
  compression: blosc; compression_opts: {'shuffle': 1, 'clevel': 5, 'cname': 'blosclz'}
  nbytes: 1.5G; nbytes_stored: 26.9M; ratio: 56.7; initialized: 382/382
  store: builtins.dict

In [9]:
z2 = zarr.empty_like(z1)
z2

zarr.core.Array((400000000,), int32, chunks=(1048576,), order=C)
  compression: blosc; compression_opts: {'shuffle': 1, 'clevel': 5, 'cname': 'blosclz'}
  nbytes: 1.5G; nbytes_stored: 298; ratio: 5369127.5; initialized: 0/382
  store: builtins.dict

In [10]:
with blosc.use_context():
    profile_dask_copy(z1, z2, chunks, lock=True, dt=.02)

In [11]:
with blosc.use_context():
    profile_dask_copy(z1, z2, chunks, lock=False, dt=0.02)

Without the dask lock, we get nearly 400% CPU utilisation. 

## HDF5 datasets (in-memory)

In [12]:
h5f = h5fmem()
h5f

<HDF5 file "tmp051mtwrt" (mode r+)>

In [13]:
h1 = h5f.create_dataset('h1', data=a1, chunks=chunks, compression='lzf', shuffle=True)
h5d_diagnostics(h1)

<HDF5 dataset "h1": shape (400000000,), type "<i4">
  compression: lzf; compression_opts: None; shuffle: True
  nbytes: 1.5G; nbytes_stored: 21.0M; ratio: 72.8; chunks: (1048576,)


In [14]:
h2 = h5f.create_dataset('h2', shape=h1.shape, chunks=h1.chunks, 
                        compression=h1.compression, compression_opts=h1.compression_opts, 
                        shuffle=h1.shuffle)
h5d_diagnostics(h2)

<HDF5 dataset "h2": shape (400000000,), type "<f4">
  compression: lzf; compression_opts: None; shuffle: True
  nbytes: 1.5G; nbytes_stored: 0; ratio: inf; chunks: (1048576,)


In [15]:
profile_dask_copy(h1, h2, chunks, lock=True, dt=0.1)

In [16]:
profile_dask_copy(h1, h2, chunks, lock=False, dt=0.1)

## Bcolz carrays (in-memory)

In [17]:
c1 = bcolz.carray(a1, chunklen=chunks[0])
c1

carray((400000000,), int32)
  nbytes: 1.49 GB; cbytes: 30.93 MB; ratio: 49.33
  cparams := cparams(clevel=5, shuffle=1, cname='blosclz')
[        0         1         2 ..., 399999997 399999998 399999999]

In [18]:
c2 = bcolz.zeros(a1.shape, chunklen=chunks[0], dtype=a1.dtype, 
                 cparams=bcolz.cparams(cname='zlib', clevel=1, shuffle=1))
c2

carray((400000000,), int32)
  nbytes: 1.49 GB; cbytes: 4.07 MB; ratio: 374.80
  cparams := cparams(clevel=1, shuffle=1, cname='zlib')
[0 0 0 ..., 0 0 0]

In [19]:
profile_dask_copy(c1, c2, chunks, lock=True, dt=0.1)

In [20]:
# not sure it's safe to use bcolz without a lock, but what the heck...
profile_dask_copy(c1, c2, chunks, lock=False, dt=0.1)