## `dask.array`

<img src="assets/array.png" width="25%" align="right">
Dask array provides a parallel, larger-than-memory, n-dimensional array using blocked algorithms. Simply put: distributed Numpy.

*  **Parallel**: Uses all of the cores on your computer
*  **Larger-than-memory**:  Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
*  **Blocked Algorithms**:  Perform large computations by performing many smaller computations

**Related Documentation**

* [Documentation](http://dask.readthedocs.io/en/latest/array.html)
* [API reference](http://dask.readthedocs.io/en/latest/array-api.html)

## Blocked Algorithms

A *blocked algorithm* executes on a large dataset by breaking it up into many small blocks.

For example, consider taking the sum of a billion numbers.  We might instead break up the array into 1,000 chunks, each of size 1,000,000, take the sum of each chunk, and then take the sum of the intermediate sums.

We achieve the intended result (one sum on one billion numbers) by performing many smaller results (one thousand sums on one million numbers each, followed by another sum of a thousand numbers.)

We do exactly this with Python and NumPy in the following example.

## Start Dask Client for Dashboard

Starting the Dask Client is optional.  It will provide a dashboard which 
is useful to gain insight on the computation.  

The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [None]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=2,
                n_workers=1, memory_limit='2GB')
client

## Create Random array

This creates a 10000x10000 array of random numbers, represented as many numpy arrays of size 1000x1000 (or smaller if the array cannot be divided evenly). In this case there are 100 (10x10) numpy arrays of size 1000x1000.

In [None]:
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

The following operations resemble operations from NumPy.

In [None]:
# y is the sum of x with it's transpose
y = x + x.T
y

In [None]:
# z is the mean along the first axis after slicing y
# Along the first axis we take every second column
# We take the last half of the rows (5000 of them)
z = y[::2, 5000:]
z

In [None]:
# v is the mean of z along the first axis (5000 numbers)
v = z.mean(axis=1)
v

So far nothing has been calculated, but the task graph has been assembled.

This entire sequence of operations could be replaced by the one-lined:
```python
v = (x + x.T)[::2, 5000:].mean(axis=1)
```

We can inspect the task graph for this operation:

In [None]:
v.visualize()

We can now compute the result:

In [None]:
%%time
result = v.compute()

In [None]:
print("Length of results:", len(result))
print("First 5 means:", result[:5])

In [None]:
client.close()

## Try the computation again with different scheduling options

### Distributed: one worker, four threads-per-worker

In [None]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

In [None]:
%%time
result = v.compute()

In [None]:
client.close()

### Distributed: four workers, one thread-per-worker

In [None]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=1,
                n_workers=4, memory_limit='2GB')
client

In [None]:
%%time
result = v.compute()

In [None]:
client.close()

### Distributed: one worker, one thread-per-worker

This is basically serial.

In [None]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=1,
                n_workers=1, memory_limit='2GB')
client

In [None]:
%%time
result = v.compute()

In [None]:
client.close()

### Non-distributed: synchronous scheduler

In [None]:
%%time
result = v.compute(scheduler='synchronous')

### Non-distributed: threaded scheduler

In [None]:
%%time
result = v.compute(scheduler='threads')

### Non-distributed: multiple processes

In [None]:
%%time
result = v.compute(scheduler='processes')