# 2.6. Dask Array

![](dask-array-black-text.svg)

*From the Dask documentation:*
> Dask Array implements a subset of the NumPy ndarray interface using 
> blocked algorithms, cutting up the large array into many small arrays. 
> This lets us compute on arrays larger than memory using all of our cores. 
> We coordinate these blocked algorithms using dask graphs.

Dask Arrays provide "a parallel, larger-than-memory, n-dimensional array using blocked algorithms. Simply put: distributed Numpy.

- **Parallel:** Uses all of the cores on your computer
- **Larger-than-memory:** Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
- **Blocked Algorithms:** Perform large computations by performing many smaller computations"

*Stolen from the Dask tutorial: https://github.com/dask/dask-tutorial*

In [None]:
from dask.distributed import LocalCluster, Client

cluster = LocalCluster(n_workers=4)

client = Client(cluster)
client

In [None]:
import dask
import dask.array as da
import netCDF4 as nc4
import numpy as np
import glob

## Creating Dask Arrays from Numpy Arrays

One of the easiest ways of creating Dask Arrays is directly from Numpy arrays using the `from_array` function of `dask.array`.  This function accepts anything that is "array-like," such as:

- a netCDF4 variable from netCDF4-python,
- an HDF5 field using h5py,
- a Numpy array,

or any other object that can be indexed like a Numpy array.

It takes the "array-like" object and a tuple for the `chunks` parameter, which specifies the size of each chunk of the array along each array axis.

In this example, we'll create a distributed Dask Array from NetCDF file...but first we have to create some NetCDF data.

In [None]:
DIM_SIZE = 2560

for n in range(4):
    i, j = divmod(n, 2)
    ncds = nc4.Dataset('./data/data-{}x{}.nc'.format(i, j), 'w')
    ncds.createDimension('x', DIM_SIZE)
    ncds.createDimension('y', DIM_SIZE)
    x = ncds.createVariable('x', 'd', ('x',))
    y = ncds.createVariable('y', 'd', ('y',))
    v = ncds.createVariable('v', 'f', ('x', 'y'))
    u = ncds.createVariable('u', 'f', ('x', 'y'))
    x[:] = np.arange(i*DIM_SIZE, (i+1)*DIM_SIZE)
    y[:] = np.arange(j*DIM_SIZE, (j+1)*DIM_SIZE)
    v[:] = np.random.random((DIM_SIZE, DIM_SIZE)).astype('f')
    u[:] = np.random.random((DIM_SIZE, DIM_SIZE)).astype('f')
    ncds.close()

In [None]:
ls -lh data*.nc

In [None]:
ncds = nc4.Dataset('./data/data-0x0.nc')

In [None]:
v00 = da.from_array(ncds.variables['v'], chunks=(1280, 1280))
v00

**Now, take a look at the Dashboard, again.**  Look at the *Bytes stored* section of the Dashboard Status page. 

The creation of the Dask Array is a lazy operation, and the data isn't read from disk until you actually call compute!

In [None]:
v00.visualize()

In [None]:
v00.npartitions

In [None]:
v00.numblocks

Notice that slicing is *also* a lazy operation!

In [None]:
v00_5x5 = v00[:5, :5]
v00_5x5

It's only when we `compute` the result that data is read from disk...

In [None]:
v00_5x5.compute()

Now take a look at the *Bytes stored* section of the Dashboard Status page, again.

In [None]:
ncds.close()

In [None]:
client.restart()

## Create Dask Arrays from Delayed Objects

In addition to reading array-like objects as Dasy Arrays, you can construct an array from `Delayed` objects, giving you a little more flexibility in what you can construct an array from.

### Using Delayed to Lazily Read Multiple Files

In [None]:
def readvar(fname, vname):
    ds = nc4.Dataset(fname)
    varray = dask.delayed(ds.variables[vname])[:]
    vshape = ds.variables[vname].shape
    vdtype = ds.variables[vname].dtype
    return varray, vshape, vdtype, '{}-{}'.format(vname, fname)

In [None]:
varrays = [da.from_delayed(*readvar(fname, 'v')) for fname in glob.glob('./data/data-*.nc')]
uarrays = [da.from_delayed(*readvar(fname, 'u')) for fname in glob.glob('./data/data-*.nc')]

In [None]:
varrays[0]

In [None]:
print([v.name for v in varrays])

In [None]:
varrays[0].visualize()

## Concatenate

In [None]:
v = da.concatenate([da.concatenate(varrays[:2], axis=1), da.concatenate(varrays[2:], axis=1)])
v

In [None]:
u = da.concatenate([da.concatenate(uarrays[:2], axis=1), da.concatenate(uarrays[2:], axis=1)])
u

In [None]:
v.shape

In [None]:
v.npartitions

In [None]:
v.numblocks

In [None]:
v.visualize()

## Mathematical Operations

Most of the Numpy API is mirrored in Dask Array.

### Example: Sum
So, you can do operations like sum over an axis:

In [None]:
sum_v = v.sum(axis=1)
sum_v

In [None]:
sum_v.visualize()

In [None]:
sum_v.compute()

### Example: Matrix Multiplation

In [None]:
v_x_u = v @ u

In [None]:
v_x_u

In [None]:
v_x_u.visualize()

In [None]:
v_x_u = v_x_u.persist()
v_x_u

## Rechunking

In [None]:
v_2 = v.rechunk((1280,1280)).persist()
u_2 = v.rechunk((1280,1280)).persist()

In [None]:
v_2.npartitions

In [None]:
v_x_u_2 = v_2 @ u_2

In [None]:
v_x_u_2.compute()

# So much easier with Xarray...

In [None]:
import xarray as xr

In [None]:
ls -lh data-*.nc

In [None]:
_ds = []
for i in range(2):
    _ds.append(xr.open_mfdataset(glob.glob('./data/data-{}*.nc'.format(i)), chunks={'x': 1280, 'y': 1280}, concat_dim='y'))
ds = xr.concat(_ds, dim='x')

ds