# Array multinode

In this last notebook we are using several processes to load data in memory but then we are building a single distributed matrix, we can operate on it using methods or even slicing like if it were a `numpy` array, `dask` will take care of managing all the process to process communication for us.

Here we are using `dask-distributed` so we could run on many different nodes.

In [14]:
def load_file(filename):
    import numpy
    data = numpy.loadtxt(fname=filename, delimiter=',')
    return data

In [15]:
from glob import glob

In [16]:
filenames = glob("data/infl*csv")

In [17]:
from distributed import Executor

In [18]:
executor = Executor('127.0.0.1:8786')

In [22]:
data_futures = executor.map(load_file, filenames)

In [23]:
data_futures

[<Future: status: finished, key: load_file-32bd3771519a30408f1a1e0035a8f641>,
 <Future: status: finished, key: load_file-d2438b67345f617986ddcc68439e1664>,
 <Future: status: finished, key: load_file-49edd16b6453f28b1275851ff95956e5>,
 <Future: status: finished, key: load_file-624642290f1844c39e1f18c719e1a4e0>,
 <Future: status: finished, key: load_file-1035ef9aa9fa5682b05bcbc1bd15f4d8>,
 <Future: status: finished, key: load_file-2cd2e956a4f0aae83df8a4d66ac6c5b3>,
 <Future: status: finished, key: load_file-d0a1a55bbbeb0599d7564a22de320fc0>,
 <Future: status: finished, key: load_file-0a1206a9aee3b22300feac528d666b20>,
 <Future: status: finished, key: load_file-2d881e3f161221ad9d7471dd94cfe4b4>,
 <Future: status: finished, key: load_file-0d2ba654860580c4b69f0a523641685d>,
 <Future: status: finished, key: load_file-2c9c4a468e893731a1b4de3c3fe5409d>,
 <Future: status: finished, key: load_file-0146a49b0d9ad0aef17a29834f50174d>]

In [24]:
data_futures[0].result()

array([[ 0.,  1.,  0., ...,  0.,  2.,  1.],
       [ 0.,  0.,  2., ...,  1.,  0.,  1.],
       [ 0.,  1.,  0., ...,  0.,  0.,  1.],
       ..., 
       [ 0.,  0.,  2., ...,  0.,  1.,  1.],
       [ 0.,  1.,  2., ...,  1.,  2.,  1.],
       [ 0.,  0.,  1., ...,  1.,  0.,  1.]])

In [25]:
from distributed.collections import futures_to_dask_arrays
distributed_dask_arrays = futures_to_dask_arrays(data_futures)  # many small dask arrays

In [26]:
import dask.array
all_distributed_data = dask.array.concatenate(distributed_dask_arrays, axis=0)        # one large dask array, joined by time

In [27]:
all_distributed_data.shape

(720, 40)

In [28]:
all_distributed_data.max().compute()

20.0

In [29]:
executor.compute([all_distributed_data.max(), all_distributed_data[:,1].max()], sync=True)

[20.0, 1.0]