# Array multinode

In this last notebook we are using several processes to load data in memory but then we are building a single distributed matrix, we can operate on it using methods or even slicing like if it were a `numpy` array, `dask` will take care of managing all the process to process communication for us.

Here we are using `dask-distributed` so we could run on many different nodes.

In [1]:
def load_file(filename):
    import numpy
    data = numpy.loadtxt(fname=filename, delimiter=',')
    return data

In [2]:
import glob
import os
filenames = glob.glob("inflammation-*.csv")
for i in range(len(filenames)):
    filenames[i] = os.getcwd() + '/' + filenames[i]

In [3]:
from distributed import Client

In [4]:
executor = Client('127.0.0.1:8786')

In [5]:
data_futures = executor.map(load_file, filenames)

In [6]:
data_futures

[<Future: finished, type: numpy.ndarray, key: load_file-a19c44bb02b27270fa99fd11a071f4a5>,
 <Future: finished, type: numpy.ndarray, key: load_file-9c09133f6a2da4a40ce76db8e7644380>,
 <Future: finished, type: numpy.ndarray, key: load_file-beb322c8b45abed16d16bc2a68d4d9ea>,
 <Future: finished, type: numpy.ndarray, key: load_file-4cc911d587b46c089616c499e97279a7>,
 <Future: finished, type: numpy.ndarray, key: load_file-6b5870afd97a0ec3f9d51229797aac4e>,
 <Future: finished, type: numpy.ndarray, key: load_file-e9cda123646c2160b3a0aad7c192badf>,
 <Future: finished, type: numpy.ndarray, key: load_file-a34c99b29af4fa7bb902fc7fe37a0f14>,
 <Future: finished, type: numpy.ndarray, key: load_file-0f945183316fd7ed56d6591db5428db6>,
 <Future: finished, type: numpy.ndarray, key: load_file-6797091973318b87b95aece140893aba>,
 <Future: finished, type: numpy.ndarray, key: load_file-74e0e833e424630623b061b5609fd82c>,
 <Future: finished, type: numpy.ndarray, key: load_file-e9f459c9b7f3526c94d94e3342693e38>,

In [7]:
data_futures[0].result().size

2400

In [8]:
import dask.array as da
# many small dask arrays
distributed_dask_arrays = [da.from_delayed(future,(60,40),"float64") for future in data_futures]
# one large dask array, joined by time
all_distributed_data = da.concatenate(distributed_dask_arrays, axis=0)

In [9]:
all_distributed_data.shape

(720, 40)

In [10]:
all_distributed_data.max().compute()

20.0

In [11]:
executor.compute([all_distributed_data.max(), all_distributed_data[:,1].max()], sync=True)

[20.0, 1.0]