# Array multinode

In this last notebook we are using several processes to load data in memory but then we are building a single distributed matrix, we can operate on it using methods or even slicing like if it were a `numpy` array, `dask` will take care of managing all the process to process communication for us.

Here we are using `dask-distributed` so we could run on many different nodes.

In [1]:
def load_file(filename):
    import numpy
    data = numpy.loadtxt(fname=filename, delimiter=',')
    return data

In [2]:
import glob
import os
filenames = glob.glob("inflammation-*.csv")
for i in range(len(filenames)):
    filenames[i] = os.getcwd() + '/' + filenames[i]

In [4]:
from distributed import Client

In [5]:
executor = Client('127.0.0.1:8786')

In [6]:
data_futures = executor.map(load_file, filenames)

In [7]:
data_futures

[<Future: status: finished, type: ndarray, key: load_file-62a583ce0939eb4b75c532278840c407>,
 <Future: status: finished, type: ndarray, key: load_file-877106e7cb375343642acae9f68bbb36>,
 <Future: status: finished, type: ndarray, key: load_file-c3bf8d890a05ba20db4232a50a3cf6e6>,
 <Future: status: finished, type: ndarray, key: load_file-ece9468203099fd1ec6a8eb89d071b97>,
 <Future: status: finished, type: ndarray, key: load_file-c080e136316e3479c15fd0c9927accd6>,
 <Future: status: finished, type: ndarray, key: load_file-9367dfd7addefcc37cd0db31b99f5324>,
 <Future: status: finished, type: ndarray, key: load_file-d240868f658f6a29ea240703695569b4>,
 <Future: status: finished, type: ndarray, key: load_file-86f96c3309c8563f6b7d1f3f85021a9c>,
 <Future: status: finished, type: ndarray, key: load_file-922ed2fe12f5e3285b8537112d79f1e8>,
 <Future: status: finished, type: ndarray, key: load_file-f6e238163b6b99a3e5c8f51be5899d75>,
 <Future: status: finished, type: ndarray, key: load_file-895eb0218396

In [15]:
data_futures[0].result().size

2400

In [23]:
import dask.array as da
# many small dask arrays
distributed_dask_arrays = [da.from_delayed(future,(60,40),"float64") for future in data_futures]
# one large dask array, joined by time
all_distributed_data = da.concatenate(distributed_dask_arrays, axis=0)

In [24]:
all_distributed_data.shape

(720, 40)

In [25]:
all_distributed_data.max().compute()

20.0

In [26]:
executor.compute([all_distributed_data.max(), all_distributed_data[:,1].max()], sync=True)

[20.0, 1.0]