<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">

Custom Workflows
------------------

We submit tasks directly to the task scheduler.  This demonstrates the flexibility that can be achieved with the `submit` function and normal Python for loops.

Later on we map functions across Python queues to construct data processing pipelines.

In [1]:
from dask.distributed import Executor, progress
e = Executor('localhost:8786')
e

<Executor: scheduler="localhost:8786" processes=1 cores=4>

In [2]:
from time import sleep

def inc(x):
    from random import random
    sleep(random())
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x
    
def add(x, y):
    from random import random
    sleep(random())
    return x + y 

In [3]:
inc(1)

2

In [4]:
future = e.submit(inc, 1)  # returns immediately with pending future
future

<Future: status: pending, key: inc-034e352530d02eccdc76c6ce4a799c5c>

In [6]:
future  # scheduler and client talk constantly

<Future: status: finished, type: int, key: inc-034e352530d02eccdc76c6ce4a799c5c>

In [7]:
future.result()

2

### Submit many tasks

We submit many tasks that depend on each other in a normal Python for loop

In [8]:
%%time
zs = []
for i in range(16):
    x = e.submit(inc, i)     # x = inc(i)
    y = e.submit(double, x)  # y = inc(x)
    z = e.submit(add, x, y)  # z = inc(y)
    zs.append(z)

CPU times: user 16 ms, sys: 4 ms, total: 20 ms
Wall time: 21.1 ms


In [9]:
e.gather(zs)

[3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48]

### Custom computation: Tree summation

As an example of a non-trivial algorithm, consider the classic tree reduction.  We accomplish this with a nested for loop and a bit of normal Python logic.

```
finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
```

In [10]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = e.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new
    
progress(L)

In [None]:
L

In [None]:
e.gather(L)

Example with data streams
----------------------------

The executor can map functions over lists or queues.  This is nothing more than calling `submit` many times.  We can chain maps on queues together to construct simple data processing pipelines.

All of this logic happens on the client-side.  None of this logic was hard-coded into the scheduler.  This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling.

In [None]:
from queue import Queue
from threading import Thread

def multiplex(n, q, **kwargs):
    """ Convert one queue into several equivalent Queues
    
    >>> q1, q2, q3 = multiplex(3, in_q)
    """
    out_queues = [Queue(**kwargs) for i in range(n)]
    def f():
        while True:
            x = q.get()
            for out_q in out_queues:
                out_q.put(x)
    t = Thread(target=f)
    t.daemon = True
    t.start()
    return out_queues        

```
           ----inc---->
          /            \ 
in_q --> q              \_add__ results
          \             / 
           ---double-->/
```

In [None]:
in_q = Queue()
q = e.scatter(in_q)

In [None]:
in_q.put(1)
q.get()

In [None]:
q_1, q_2 = multiplex(2, q)

inc_q = e.map(inc, q_1)
double_q = e.map(double, q_2)

add_q = e.map(add, inc_q, double_q)

out_q = e.gather(add_q)

In [None]:
in_q.put(10)
out_q.get()

In [None]:
from random import random

def feed(q):
    for i in range(10000):
        sleep(random())
        q.put(i)
        
t = Thread(target=feed, args=(q,))
t.daemon = True
t.start()

In [None]:
out_q.qsize()