# Using Dask to distribute computations

In addition to the paralellization tricks that you saw so far, Dask has an entire other side to it devoted to distributed computing. Of course, parallelization and distributed computing are closely linked, because one of the limitations on parallelization is the compute that is available. There are a lot of ways to get a cluster of dask "workers" going, but once you do have one of these, the interface is again remarkably consistent. For the purpose of demonstration here, we will use a so-called "local cluster" 

In [1]:
from dask.distributed import Client

In [2]:
client = Client()

distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/source/nh2022-curriculum/rokem-parallel/dask-worker-space/worker-td7clynq', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/source/nh2022-curriculum/rokem-parallel/dask-worker-space/worker-m_4m629x', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/source/nh2022-curriculum/rokem-parallel/dask-worker-space/worker-04vim5ks', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/source/nh2022-curriculum/rokem-parallel/dask-worker-space/worker-5fmvhbfm', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/source/nh2022-curriculum/rokem-parallel/dask-worker-space/worker-ycizx40p', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/source/nh2022-curriculum/rokem-parallel/dask-worker-space/worker-kq5u5p3c', purging


In [3]:
import time

def inc(x):
    time.sleep(1)
    return x + 1


There are a few things we can do with the client. The first is to map a function (for example, our incrementation function) to a sequence of inputs (for example, all the numbers from 0 to 999). 

In [4]:
incs = client.map(inc, range(100))

The result is a sequence of so-called "concurrent futures" objects. This is a Python abstraction for things that are happening in parallel. Dask follows the standard Python API for this kind of thing, so that other Python libraries that know how to operate with this kind of object all work. What's in here? A sequence of these these objects, together with their status. One of the powerful facts about these objects is that the results of the computations are stored on the cluster, and we don't necessarily have to pull them down here (although we could, each one of the has a `result` method that pulls this down from the cluster). Instead, we can pass them on to another operation.

Which brings us to the second thing that a client can do for us, which is to submit a function to the cluster, together with its inputs. In this case, the inputs are the sequence of futures. The function is sent to the cluster, and computation takes place there. So, if the results of computation are very big, we are not introducing a big memory or data transfer bottleneck. Instead, if the cluster is large (has lots of workers, and potentially lots of memory), we can take advantage of its distributed nature to never overwhelm any particular part of the system

In [5]:
total = client.submit(sum, incs)

This too is a concurrent futures object, and its resolution depends on the resolution of the previous operations

In [11]:
total

Finally we can send dask objects to the cluster to be computed there. For example, consider a situation where we want to summarize a collection of really large data structures. For example arrays with 1000-by-1000-by-1000 elements. We can ask Dask to perform the summary/aggregation on the cluster, and then collect the result at the end.

In [41]:
import dask.array as da
random_arrays = da.concatenate([da.random.random((1000, 1000, 1000)) for i in range(10)])

In [42]:
means = da.mean(random_arrays, -1)

In [43]:
my_means = client.compute(means)

In [44]:
my_means

In [45]:
client.gather(my_means)

array([[0.50819211, 0.50867339, 0.5012711 , ..., 0.50298155, 0.50676199,
        0.49435751],
       [0.49902563, 0.49384037, 0.4998886 , ..., 0.49611254, 0.4963958 ,
        0.49249021],
       [0.49971012, 0.49499839, 0.50482917, ..., 0.50321588, 0.50323226,
        0.50166189],
       ...,
       [0.50164299, 0.48836632, 0.49430836, ..., 0.51168625, 0.50501338,
        0.49787479],
       [0.49776087, 0.49109849, 0.50324673, ..., 0.49561485, 0.50006045,
        0.50560226],
       [0.5166063 , 0.52233526, 0.50358243, ..., 0.51052183, 0.49485912,
        0.49424677]])

It's a good idea to cleanly close the client once you are done, so that you don't have cruft from the client still hanging around. These objects tend to create all kinds of traces in memory and on your file system

In [None]:
client.close()