In [1]:
import os

import dask
import dask.array as da
import xarray as xr
import numpy as np

In [2]:
data_path = '../data'

# Creating a `dask.array` from a `numpy.array`

In [3]:
# First a simple 3x3 numpy array
np_arr = np.array([
    [1,2,3],
    [4,5,6],
    [7,8,9]
])
np_arr

array([[1, 2, 3],
       [4, 5, 6],
       [7, 8, 9]])

In [4]:
# And here using the dask.array.from_array function
# We also declare the shape of the chunks to be 1x1, and thus we will have 9 in total
dask_arr = da.from_array(np_arr, chunks=(1,1))
# Printing gives a nice representation of the current state of the data
dask_arr

Unnamed: 0,Array,Chunk
Bytes,72 B,8 B
Shape,"(3, 3)","(1, 1)"
Count,9 Tasks,9 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 72 B 8 B Shape (3, 3) (1, 1) Count 9 Tasks 9 Chunks Type int64 numpy.ndarray",3  3,

Unnamed: 0,Array,Chunk
Bytes,72 B,8 B
Shape,"(3, 3)","(1, 1)"
Count,9 Tasks,9 Chunks
Type,int64,numpy.ndarray


Taking the mean over all the columns (same as we do with `numpy`), but the result is a Future, not the data itself. It does now show the resulting number of chunks and the dimensionality of the new data.

In [5]:
col_means = dask_arr.mean(axis=0)
col_means

Unnamed: 0,Array,Chunk
Bytes,24 B,8 B
Shape,"(3,)","(1,)"
Count,21 Tasks,3 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 24 B 8 B Shape (3,) (1,) Count 21 Tasks 3 Chunks Type float64 numpy.ndarray",3  1,

Unnamed: 0,Array,Chunk
Bytes,24 B,8 B
Shape,"(3,)","(1,)"
Count,21 Tasks,3 Chunks
Type,float64,numpy.ndarray


Using `compute()`, we can now execute the task graph and get the data.

In [6]:
col_means.compute()

array([4., 5., 6.])

# Using `xarray`
Another common package for data manipulation. Particularly common with spatial data and working with NetCDF files. As it turns out, `xarray` can use either `numpy` or `dask` arrays as its backend. The latter allows us to make use of `dask`'s functionality in a new way.

In [7]:
# Opening multiple files into single object. Opening any file in the data directory with the given pattern
# Here I've decided to chunk the data only along the time dimension, in 365 unit chunks, which in this case is daily so a year per chunk.
precipitation = xr.open_mfdataset(os.path.join(data_path, 'pr*.nc4'), chunks=dict(time=365))

Now the object is an `xarray`

In [8]:
precipitation

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,360.90 MiB
Shape,"(7305, 360, 720)","(365, 360, 720)"
Count,46 Tasks,22 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.05 GiB 360.90 MiB Shape (7305, 360, 720) (365, 360, 720) Count 46 Tasks 22 Chunks Type float32 numpy.ndarray",720  360  7305,

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,360.90 MiB
Shape,"(7305, 360, 720)","(365, 360, 720)"
Count,46 Tasks,22 Chunks
Type,float32,numpy.ndarray


With a `Dask` backend, seen if you look at `.data`, which would traditionally just point to the values of a `numpy.array`

In [9]:
precipitation['pr'].data

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,360.90 MiB
Shape,"(7305, 360, 720)","(365, 360, 720)"
Count,46 Tasks,22 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.05 GiB 360.90 MiB Shape (7305, 360, 720) (365, 360, 720) Count 46 Tasks 22 Chunks Type float32 numpy.ndarray",720  360  7305,

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,360.90 MiB
Shape,"(7305, 360, 720)","(365, 360, 720)"
Count,46 Tasks,22 Chunks
Type,float32,numpy.ndarray


Here we can use `groupby` and `sum`, just as we would with `xarray` normally. Here though, execution won't happen until we call compute. Additionally, if we were using parallelization here,
`Dask` would speed this operation up.

In [10]:
annual_precip = precipitation.groupby('time.year').sum()
annual_precip

Unnamed: 0,Array,Chunk
Bytes,19.78 MiB,0.99 MiB
Shape,"(20, 360, 720)","(1, 360, 720)"
Count,261 Tasks,20 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 19.78 MiB 0.99 MiB Shape (20, 360, 720) (1, 360, 720) Count 261 Tasks 20 Chunks Type float32 numpy.ndarray",720  360  20,

Unnamed: 0,Array,Chunk
Bytes,19.78 MiB,0.99 MiB
Shape,"(20, 360, 720)","(1, 360, 720)"
Count,261 Tasks,20 Chunks
Type,float32,numpy.ndarray


See now that after `compute()`, the precipitation data is available and held in a simple `numpy.array`.

In [11]:
annual_precip.compute()

# Dask Distributed
Contains functions for deploying workers to perform tasks in parallel. Learn more at their docs [here](https://distributed.dask.org/en/stable/).

In [12]:
from dask.distributed import Client
from dask.distributed import LocalCluster

`Client` will fire up workers. It implicitly calls `LocalCluster`, though you can be explicit by first creating a `LocalCluster` object, and supplying it manually, as is done here. This is where you can specify the details about the workers you want.

By default, threading is the preferred, which is set by `processes=False`. Here we've instead chosen to fire up Python processes, 8 in total, each with 1 thread.

Here we also display the `Client` object, which will give us info about our deployment, as well as provide a link to the Dask dashboard.

In [13]:
cluster = LocalCluster(n_workers=8, processes=True, threads_per_worker=1)
client = Client(cluster)
client

2023-01-13 16:04:27,194 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-2xu7liio', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:52534,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:52579,Total threads: 1
Dashboard: http://127.0.0.1:52589/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52537,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-oq43sbga,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-oq43sbga

0,1
Comm: tcp://127.0.0.1:52581,Total threads: 1
Dashboard: http://127.0.0.1:52583/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52539,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-k_no_p42,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-k_no_p42

0,1
Comm: tcp://127.0.0.1:52582,Total threads: 1
Dashboard: http://127.0.0.1:52585/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52538,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-wbx9u8tv,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-wbx9u8tv

0,1
Comm: tcp://127.0.0.1:52575,Total threads: 1
Dashboard: http://127.0.0.1:52588/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52544,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-jc57_9a1,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-jc57_9a1

0,1
Comm: tcp://127.0.0.1:52577,Total threads: 1
Dashboard: http://127.0.0.1:52590/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52543,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-gjtomwrb,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-gjtomwrb

0,1
Comm: tcp://127.0.0.1:52578,Total threads: 1
Dashboard: http://127.0.0.1:52587/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52542,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-acx9aaf7,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-acx9aaf7

0,1
Comm: tcp://127.0.0.1:52580,Total threads: 1
Dashboard: http://127.0.0.1:52586/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52541,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-q3ecuxs2,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-q3ecuxs2

0,1
Comm: tcp://127.0.0.1:52576,Total threads: 1
Dashboard: http://127.0.0.1:52584/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:52540,
Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-eibd2rts,Local directory: /var/folders/03/82f8dprj20zdn3zkgcd_p1280000gn/T/dask-worker-space/worker-eibd2rts


Important to close your client if doing it this way. Below is a preferred method for initiating client.

In [14]:
client.close()

# Map Blocks, Delayed and Other Dask Functions

### Map Blocks

`dask.array.map_blocks()` allows you to perform the same function on each chunk of you data. This is another delayed function, so it adds to the Dask task graph. This means when you call `map_blocks()`, it will add it to the task list, and also test it out with empty data to make sure it is successful, and retrieve things like data type and shape information. Not until you `compute()` will you have access to the resulting data. This is great because again, it makes use of the scheduler to run tasks in parallel, and to manage memory.

In the below example, we compute the element-wise square root of the 3x3 matrix previously created. In this very overkill example, each element is sent to its own worker, in this case a process with 1 thread.
$$
\begin{vmatrix}
    1 & 2 & 3 \\
    4 & 5 & 6 \\
    7 & 8 & 9
\end{vmatrix} \rightarrow
\begin{vmatrix}
    \sqrt{1} & \sqrt{2} & \sqrt{3} \\
    \sqrt{4} & \sqrt{5} & \sqrt{6} \\
    \sqrt{7} & \sqrt{8} & \sqrt{9}
\end{vmatrix}
$$

Note that below we explicitly set processes to false. This isn't necessary but to make a point about when you may want to use threading over processing and vice-versa. For one, this example is tiny, and thus it's not worth the added overhead of firing up a bunch of Python processes. In addition, when the functions you call unlock the GIL, you can actually make use of multi-threading. This happens to be the case with many numpy operations.

In [15]:
# Processes = False, don't need all the overhead for such a small computation, and one that unlocks the GIL
with LocalCluster(processes=False) as cluster, Client(cluster) as client:
    root_arr = da.map_blocks(np.sqrt, dask_arr).compute()

root_arr

Perhaps you already have a cluster running?
Hosting the HTTP server on port 52607 instead


array([[1.        , 1.41421356, 1.73205081],
       [2.        , 2.23606798, 2.44948974],
       [2.64575131, 2.82842712, 3.        ]])

You can also pass multiple dask arrays into map blocks when they have the same number of chunks. In the below example we'll create an array which is the inverse of our previous dask array, and perform element-wise product in parallel to get a matrix of ones.

$$
\begin{vmatrix}
    1 & 2 & 3 \\
    4 & 5 & 6 \\
    7 & 8 & 9
\end{vmatrix} \overset{\text{element wise}}{\times}
\begin{vmatrix}
    \frac{1}{1} & \frac{1}{2} & \frac{1}{3} \\
    \frac{1}{4} & \frac{1}{5} & \frac{1}{6} \\
    \frac{1}{7} & \frac{1}{8} & \frac{1}{9}
\end{vmatrix} =
\begin{vmatrix}
    1 & 1 & 1 \\
    1 & 1 & 1 \\
    1 & 1 & 1
\end{vmatrix}
$$

In [16]:
# Inverse array chunked identically to the original
inv_arr = 1 / np.arange(1,10).reshape(3,3)
inv_arr = da.from_array(inv_arr, chunks=(1,1))

In [17]:
# Product function
def prod(x, y):
    return x * y

In [18]:
# Computing result by supplying both arrays as input
with LocalCluster(processes=False) as cluster, Client(cluster) as client:
    ones = da.map_blocks(prod, dask_arr, inv_arr).compute()

ones

Perhaps you already have a cluster running?
Hosting the HTTP server on port 52611 instead


array([[1., 1., 1.],
       [1., 1., 1.],
       [1., 1., 1.]])

### Delayed
For turning your functions into those which can add to the task graph and be executed lazily. See the docs for delayed [here](https://docs.dask.org/en/stable/delayed.html).

Here's some simple functions written in standard Python

In [19]:
def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

Here's what happens when executing them in standard Python, computing in real time.

In [20]:
a = inc(1)
b = inc(2)
c = add(a, b)
c

5

And here we apply `delayed` and get a `Delayed` object as output. We have not yet computed the result.

In [21]:
a = dask.delayed(inc)(1)
b = dask.delayed(inc)(2)
c = dask.delayed(add)(a, b)
c

Delayed('add-ed93f16d-8aa2-4213-95af-1b1ffdcf6144')

Here we compute the result, and see we get the same answer as with standard Python.

In [22]:
c.compute()

5

We can also visualize the task graph. This requires [graphviz](https://www.graphviz.org/), or [cytoscape](https://pypi.org/project/ipycytoscape/), or some other engine to run.

In [23]:
c.visualize()

RuntimeError: No visualization engine detected, please install graphviz or ipycytoscape

The `delayed` decorator can be used to turn your functions into delayed functions, which can make using them cleaner, and allow for parallel code.

In [24]:
@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x * 2

@dask.delayed
def add(x, y):
    return x + y

In [25]:
data = [1, 2, 3, 4, 5]

output = []
for d in data:
    a = inc(d)
    b = double(d)
    c = add(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

Delayed('sum-8a7a47bc-ebd1-48db-add2-a2ae2d627db8')

In [27]:
with LocalCluster() as cluster, Client(cluster) as client:
    total = total.compute()

total

Perhaps you already have a cluster running?
Hosting the HTTP server on port 53062 instead


50

### Other
I won't go into detail, but there are other functions for different use cases including but not limited to:

* [`blockwise`](https://docs.dask.org/en/latest/generated/dask.array.blockwise.html) - another way for applying functions over dask arrays
* [`apply_ufunc`](https://docs.xarray.dev/en/stable/generated/xarray.apply_ufunc.html) - For use with `xarray`
* [`map_partitions`](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) - The `dask.dataframe` equivalent of `map_blocks`