# Guide to dask chunking in Spectral-Cube

Dask is a data processing framework integrated into spectral-cube that enables parallel processing of larger-than-memory cubes.

In [None]:
import numpy as np
from astropy.utils.data import download_file
from spectral_cube import SpectralCube

We download a cube from the MAPS survey:

In [None]:
filename = download_file('ftp://ftp.cv.nrao.edu/NRAO-staff/rloomis/MAPS/HD_163296/images/CO/0.15arcsec/HD_163296_CO_220GHz.0.15arcsec.JvMcorr.image.pbcor.fits', cache=True)

We load the cube using the `dask` backend, which allows for some parallelization:

In [None]:
cube = SpectralCube.read(filename, use_dask=True)

In [None]:
cube

# Dask Chunking

In [None]:
# Don't use _data, it will cause problems!  This is _purely_ for visualization purposes
cube._data

Dask data can be 'chunked' to optimize oeprations along different directions.  

For example, this first 'rechunk' will load full spectra into memory, but will break the cube into sub-cubes:

In [None]:
rechunked_cube_spectrally = cube.rechunk((-1,'auto','auto'))
rechunked_cube_spectrally._data

While this chunking will grab sub-cubes of size 8 in the spectral direction, but the full image in the spatial directions:

In [None]:
rechunked_cube_spatially = cube.rechunk(('auto',-1,-1))
rechunked_cube_spatially._data

You can also enforce individual small chunks if you want to ensure every operation fits in memory:

In [None]:
rechunked_cube = cube.rechunk((25, 512, 512))
rechunked_cube._data

You can control dask's functionality using dask directly.  The preferred approach is to use context managers, e.g., for progressbars and schedulers:

In [None]:
from dask.diagnostics import ProgressBar
pbar = ProgressBar()

In [None]:
with pbar:
    cube.mean()

In [None]:
# the synchronous scheduler is the default
# our default chunking uses 16 chunks
with cube.use_dask_scheduler('synchronous'):
    with pbar:
        cube.mean()

In [None]:
# this version is slower if we load the whole cube into memory but potentially uses less memory & is parallel
with cube.use_dask_scheduler('threads', num_workers=8):
    with pbar:
        cube.rechunk((25,512,512)).mean()

For some huge cubes & operations, it is necessary to save intermediate steps to disk.  

In [None]:
with pbar:
    spectrally_chunked = cube.rechunk((-1,'auto','auto'), save_to_tmp_dir=True)
spectrally_chunked._data

That `_data` is now saved on disk, which can be necessary for huge operations along dimensions that are hard to store in memory.  