Custom Workloads with Futures
=============================

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">

Dask futures provide fine-grained real-time execution for custom situations.  This is the foundation for other APIs like Dask arrays and dataframes.

## Start Dask Client

Unlike for arrays and dataframes, you need the Dask client to use the Futures interface.  Additionally the client provides a dashboard which 
is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 4,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54733,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:54738,Total threads: 4
Dashboard: http://127.0.0.1:54739/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:54736,
Local directory: /Users/rpelgrim/Documents/git/dask-examples/dask-worker-space/worker-b0q_as3p,Local directory: /Users/rpelgrim/Documents/git/dask-examples/dask-worker-space/worker-b0q_as3p


## Create simple functions

These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work.

In [21]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def double(x):
    time.sleep(random.random())
    return 2 * x
    
def add(x, y):
    time.sleep(random.random())
    return x + y 

We can run them locally

In [14]:
inc(1)

2

Or we can submit them to run remotely with Dask.  This immediately returns a future that points to the ongoing computation, and eventually to the stored result.

In [7]:
future = client.submit(inc, 1)  # returns immediately with pending future
future

If you wait a second, and then check on the future again, you'll see that it has finished.

In [9]:
future  # scheduler and client talk constantly

You can block on the computation and gather the result with the `.result()` method.

In [10]:
future.result()

2

## Chain dependencies

You can submit tasks on other futures.  This will create a dependency between the inputs and outputs.  Dask will track the execution of all tasks, ensuring that downstream tasks are run at the proper time and place and with the proper data.

In [15]:
x = client.submit(inc, 1)
y = client.submit(double, 2)
z = client.submit(add, x, y)
z

In [16]:
z.result()

6

Note that we never blocked on `x` or `y` nor did we ever have to move their data back to our notebook.

## Submit many tasks

So we've learned how to run Python functions remotely.  This becomes useful when we add two things:

1.  We can submit thousands of tasks per second
2.  Tasks can depend on each other by consuming futures as inputs

We submit many tasks that depend on each other in a normal Python for loop

In [22]:
zs = []

In [23]:
%%time

for i in range(256):
    x = client.submit(inc, i)     # x = inc(i)
    y = client.submit(double, x)  # y = inc(x)
    z = client.submit(add, x, y)  # z = inc(y)
    zs.append(z)

CPU times: user 773 ms, sys: 34.8 ms, total: 808 ms
Wall time: 798 ms


In [24]:
total = client.submit(sum, zs)

In [40]:
zs[0].result()

3

`zs[0]` is the result of the 0th iteration of `z = x + y` where `x` is 0+1 and `y` is 1*2

In [28]:
total

To make this go faster, add an additional workers with more cores 

(although we're still only working on our local machine, this is more practical when using an actual cluster)

In [29]:
client.cluster.scale(8)  # ask for ten 4-thread workers

## Custom computation: Tree summation

As an example of a non-trivial algorithm, consider the classic tree reduction.  We accomplish this with a nested for loop and a bit of normal Python logic.

```
finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
```

In [36]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = client.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new

`L` is now a list containing futures.

In [35]:
L[0]

If you're watching the [dashboard's status page](../proxy/8787/status) then you may want to note two things:

1.  The red bars are for inter-worker communication.  They happen as different workers need to combine their intermediate values
2.  There is lots of parallelism at the beginning but less towards the end as we reach the top of the tree where there is less work to do.

Alternatively you may want to navigate to the [dashboard's graph page](../proxy/8787/graph) and then run the cell above again.  You will be able to see the task graph evolve during the computation.

Building a computation dynamically
----------------------------------

In the examples above we explicitly specify the task graph ahead of time.  We know for example that the first two futures in the list `L` will be added together.  

Sometimes this isn't always best though, sometimes you want to dynamically define a computation as it is happening.  For example we might want to sum up these values based on whichever futures show up first, rather than the order in which they were placed in the list to start with.

For this, we can use operations like [as_completed](http://dask.pydata.org/en/latest/futures.html#distributed.as_completed).

We recommend watching the dashboard's graph page when running this computation.  You should see the graph construct itself during execution.

`as_completed` returns an iterator containing the futures in the order that they're completed. Calling `next` on the operator will block until the next future completes, irrespective of order.

You can also add more futures to this object during computation using the `.add` method.

In [37]:
del future, L, new_L, total  # clear out some old work

`zs` is a list of futures, each containing the result of `z = x + y`.

Passing this list of futures into the `inc` function means we're submitting futures as input. Each future will be incremented by 1.

In [57]:
from dask.distributed import as_completed

zs = client.map(inc, zs)
seq = as_completed(zs)

# for future in seq:
#     print(seq.next().result())
    
# is the same as
for future in seq:
    print(future.result())

45
84
135
117
90
93
147
162
57
18
102
27
21
78
51
63
30
33
72
15
12
219
165
42
141
60
24
126
132
87
198
36
54
114
192
81
300
66
258
195
210
69
249
108
120
75
123
39
237
231
96
144
129
177
153
105
138
150
156
303
228
282
306
390
246
207
243
99
201
48
330
309
423
159
252
186
255
171
393
180
204
270
342
225
321
360
363
213
264
324
273
174
234
288
495
513
348
318
447
267
240
111
216
336
357
261
477
276
312
345
420
492
402
168
498
327
222
456
333
285
411
468
189
462
618
381
372
366
291
471
537
183
294
387
378
486
546
384
624
522
354
426
315
369
561
432
441
543
615
564
408
528
405
297
519
414
555
534
339
279
417
489
666
438
459
597
579
351
576
474
429
396
510
516
549
594
399
633
606
570
621
540
375
636
681
450
444
699
639
525
435
501
654
588
603
465
567
609
708
750
738
663
651
504
531
765
696
687
453
558
690
591
756
645
573
669
648
675
747
552
672
735
660
600
771
483
744
753
480
627
582
726
684
723
585
732
741
705
507
717
612
714
702
774
678
630
711
657
729
759
720
693
762
777
768
642


NOTE: use `seq` to iterate over futures as they're completed (dynamic computation building). `seq` does not hold the results, those are in `zs`

In [58]:
while seq.count() > 1:  # at least two futures left
    a = next(seq)
    b = next(seq)
    new = client.submit(add, a, b)  # add them together
    seq.add(new)                    # add new future back into loop

We can use `.add()` to dynamically add computations into the task graph as it's running