# Array multinode

In this last notebook we are using several processes to load data in memory but then we are building a single distributed matrix, we can operate on it using methods or even slicing like if it were a `numpy` array, `dask` will take care of managing all the process to process communication for us.

Here we are using `dask-distributed` so we could run on many different nodes.

In [None]:
def load_file(filename):
    import numpy
    data = numpy.loadtxt(fname=filename, delimiter=',')
    return data

In [None]:
from glob import glob

In [None]:
filenames = glob("data/infl*csv")

In [None]:
# add to the path packages installed with pip install --user
import sys
from os.path import expanduser
sys.path.append(expanduser("~") + "/.local/lib/python3.5/site-packages/")

In [None]:
from distributed import Executor

In [None]:
executor = Executor('127.0.0.1:8786')

In [None]:
data_futures = executor.map(load_file, filenames)

In [None]:
data_futures

In [None]:
data_futures[0].result()

In [None]:
from distributed.collections import futures_to_dask_arrays
distributed_dask_arrays = futures_to_dask_arrays(data_futures)  # many small dask arrays

In [None]:
import dask.array
all_distributed_data = dask.array.concatenate(distributed_dask_arrays, axis=0)        # one large dask array, joined by time

In [None]:
all_distributed_data.shape

In [None]:
all_distributed_data.max().compute()

In [None]:
executor.compute([all_distributed_data.max(), all_distributed_data[:,1].max()], sync=True)