
Custom Workloads with Dask Delayed
==================================

This notebook shows using [dask.delayed](http://dask.pydata.org/en/latest/delayed.html) to parallelize generic Python code.  

All the examples are taken from the official dask examples!

## Starting a Dask local cluster

Dask allows the creation of clusters to run calculations. They can be anything from the local machine to queueing systems like SLURM, SGE etc. For this example, just a local client is used.

In [1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:36713  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 8.08 GB


The client provides a dashboard which can show the progress of calculations.

## Create simple functions

These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work.

In [2]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1
    
def add(x, y):
    time.sleep(random.random())
    return x + y 

We can run them like normal Python functions below

In [3]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

CPU times: user 48.6 ms, sys: 883 µs, total: 49.5 ms
Wall time: 1.94 s


3

These ran one after the other, in sequence.  Note though that the first two lines `inc(1)` and `dec(2)` don't depend on each other, we *could* have called them in parallel had we been clever.

## Annotate functions with Dask Delayed to make them lazy

We can call `dask.delayed` on our funtions to make them lazy.  Rather than compute their results immediately, they record what we want to compute as a task into a graph that we'll run later on parallel hardware.

In [4]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

Calling these lazy functions is now almost free.  We're just constructing a graph

In [5]:
x = inc(1)
y = dec(2)
z = add(x, y)
z

Delayed('add-f1a846ed-623b-4086-ae16-e2ba62728805')

## Visualize computation

You will need graphviz installed for this to work

In [6]:
z.visualize(rankdir='LR')

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

## Run in parallel

Call `.compute()` when you want your result as a normal Python object


In [None]:
z.compute()

## Parallelize Normal Python code

Now we use Dask in normal for-loopy Python code.  This generates graphs instead of doing computations directly, but still looks like the code we had before.  Dask is a convenient way to add parallelism to existing workflows.

In [None]:
zs = []

In [None]:
%%time
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)

In [None]:
zs = dask.persist(*zs)  # trigger computation in the background

To make this go faster, add additional workers.

In [None]:
client.cluster.scale(10)  # ask for ten 4-thread workers

By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc..