
Custom Workloads with Dask Delayed
==================================

This notebook shows using [dask.delayed](http://dask.pydata.org/en/latest/delayed.html) to parallelize generic Python code.  

All the examples are taken from the official dask examples!

## Create simple functions

These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work.

In [None]:
import time
import random

def inc(x):
    time.sleep(0.5)
    return x + 1

def dec(x):
    time.sleep(0.5)
    return x - 1
    
def add(x, y):
    time.sleep(0.5)
    return x + y 

We can run them like normal Python functions below

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

These ran one after the other, in sequence.  Note though that the first two lines `inc(1)` and `dec(2)` don't depend on each other, we *could* have called them in parallel had we been clever.

## Annotate functions with Dask Delayed to make them lazy

We can call `dask.delayed` on our funtions to make them lazy.  Rather than compute their results immediately, they record what we want to compute as a task into a graph that we'll run later on parallel hardware.

In [None]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

Calling these lazy functions is now almost free.  We're just constructing a graph

In [None]:
x = inc(1)
y = dec(2)
z = add(x, y)
z

## Visualize computation

You will need graphviz installed for this to work

In [None]:
z.visualize(rankdir='LR')

## Run in parallel

Call `.compute()` when you want your result as a normal Python object


In [None]:
%%timeit
z.compute()

## Decorate functions to make them lazy

In [None]:
from dask import delayed

@delayed
def inc(x):
    time.sleep(0.5)
    return x + 1

@delayed
def double(x):
    time.sleep(0.5)
    return 2 * x

@delayed
def add(x, y):
    time.sleep(0.5)
    return x + y

In [None]:
%%time

data = [1, 2, 3, 4]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = delayed(sum)(output)
total

In [None]:
total.visualize()

In [None]:
%%time
total.compute()

## Make it a bit more complicated

In [None]:
import numpy as np
import random

In [None]:
%%time

data = [1, 2, 3, 4]

output = []
a=b=c=1

for i in range(20):
    ind1 = np.random.randint(0, 4)
    rn = np.random.randint(0, 3)
    if rn == 0:
        a = inc(data[ind1])
        c = add(a, b)
    elif rn == 1:
        b = double(data[ind1])
        c = add(a, b)
    else:
        c = add(a, b)
    output.append(c)

total = delayed(sum)(output)
total

In [None]:
total.visualize()

## Starting a Dask local cluster

Dask allows the creation of clusters to run calculations. They can be anything from the local machine to queueing systems like SLURM, SGE etc. For this example, just a local client is used.

In [None]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=1, n_workers=2)
client

The client provides a dashboard which can show the progress of calculations.

## Parallelize Normal Python code

Now we use Dask in normal for-loopy Python code.  This generates graphs instead of doing computations directly, but still looks like the code we had before.  Dask is a convenient way to add parallelism to existing workflows.

In [None]:
zs = []

In [None]:
%%time
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)

In [None]:
zs = dask.persist(*zs)  # trigger computation in the background

To make this go faster, add additional workers.

In [None]:
client.cluster.scale(4)  # ask for ten 4-thread workers

In [None]:
zs[10].compute()

By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc..

## Dask arrays

Depending on the focus of your work, Dask Array is likely to be the first interface you use for Dask after Dataframe ... or perhaps just the first interface you use (e.g., if you work primarily with NumPy).

Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.

Dask arrays coordinate many NumPy arrays arranged into a grid. These NumPy arrays may live on disk or on other machines.

<img src="images/dask-array-black-text.svg">

- Dask arrays are chunked, n-dimensional arrays
- Can think of a Dask array as a collection of NumPy `ndarray` arrays
- Dask arrays implement a large subset of the NumPy API using blocked algorithms
- For many purposes Dask arrays can serve as drop-in replacements for NumPy arrays

In [None]:
import numpy as np
import dask.array as da

In [None]:
a_np = np.arange(1, 50, 1)
a_np

In [None]:
a_da = da.arange(1, 50, 1, chunks=5)
a_da

In [None]:
print(a_da.dtype)
print(a_da.shape)

In [None]:
print(a_da.chunks)
print(a_da.chunksize)

In [None]:
a_da.visualize()

In [None]:
(a_da ** 2).visualize()

In [None]:
(a_da ** 2).compute()

Dask arrays support a large portion of the NumPy interface:

- Arithmetic and scalar mathematics: `+`, `*`, `exp`, `log`, ...

- Reductions along axes: `sum()`, `mean()`, `std()`, `sum(axis=0)`, ...

- Tensor contractions / dot products / matrix multiply: `tensordot`

- Axis reordering / transpose: `transpose`

- Slicing: `x[:100, 500:100:-2]`

- Fancy indexing along single axes with lists or numpy arrays: `x[:, [10, 1, 5]]`

- Some linear algebra: `svd`, `qr`, `solve`, `solve_triangular`, `lstsq`, ...

- ...

See the [Dask array API docs](http://docs.dask.org/en/latest/array-api.html) for full details about what portion of the NumPy API is implemented for Dask arrays.

### Blocked Algorithms

Dask arrays are implemented using _blocked algorithms_. These algorithms break up a computation on a large array into many computations on smaller peices of the array. This minimizes the memory load (amount of RAM) of computations and allows for working with larger-than-memory datasets in parallel.

In [None]:
x = da.random.random(20, chunks=5)
x

In [None]:
result = x.sum()
result

In [None]:
result.visualize()

In [None]:
result.compute()

Dask supports a large portion of the NumPy API. This can be used to build up more complex computations using the familiar NumPy operations you're used to.

In [None]:
x = da.random.random(size=(15, 15), chunks=(10, 5))
x

In [None]:
result = (x + x.T).sum()
result

In [None]:
result.visualize()

In [None]:
result.compute()

We can perform computations on larger-than-memory arrays!

In [None]:
x = da.random.random(size=(15000, 15000), chunks=(1500, 1500))
x

In [None]:
result = (x + x.T).sum()
result

In [None]:
result.compute()

## Dataframes

In [None]:
import dask.dataframe

In [None]:
ddf = dask.dataframe.read_csv("data/pte.csv", blocksize=1e4)

In [None]:
ddf

In [None]:
ddf.map_partitions(type).compute()

In [None]:
ddf.head()

In [None]:
mean_phase_tm = ddf.groupby('Phase').MeltingPoint.agg(["mean", "std", "count"])

In [None]:
mean_phase_tm

In [None]:
mean_phase_tm.compute()

In [None]:
client.close()