# Simple use of `dask` with `submit` and Future objects

This notebook demonstrates the basic mechanism for using `dask` for distributed computation. We will see how to submit tasks for execution on a cluster, and how to get the results back.

**`dask` also provides various higher-level APIs built on top of these foundations, such as distributed data frames and distributed bags. These are great but we won't look at those in this notebook; if people are interested in these, we will have to do them another week.**

In [None]:
from dask.distributed import Client

Using `dask` in a distributed way requires three components:

1. _worker_ processes to do the computation
2. a _schedular_ process to coordinate the computation, by allocating (sub)tasks to workers, and moving data and results around
3. a _client_ to submit tasks to be carried out

For this demo, I have started worker and scheduler processes on an openstack instance; the client will be this notebook!


In [None]:
client = Client('127.0.0.1:8786')
client

Above we see some information about the resources available to the scheduler, and a link to a dashboard page that will allow us to monitor the execution of our tasks in detail.

Here is a naive function for primality testing, that will nevertheless enable us to create non-trivial amounts of CPU work.

In [None]:
from math import sqrt, floor

def is_prime(n):
    
    if n < 2:
        return False
    
    if n == 2:
        return True
    
    for i in range(floor(sqrt(n)) + 1):
        if i > 1 and n % i == 0:
            return False

    return True


def count_primes_less_than(m):  # Count number of primes less than input n
    return sum([is_prime(i) for i in range(m)])


Note that this implementation is stupid enough to generate quite long running times:

In [None]:
%%time
count_primes_less_than(5000000)

Instead of running this on our local machine, we can ask `dask` to compute it on the cluster using the `submit` function:

In [None]:
N = client.submit(count_primes_less_than, 5000000)
N

What we get back is a _Future_, which is an object that represents the result of a task we've launched. Currently the status of the Future is **pending**, meaning that the computation is running. When the task is finished and the result value is ready, the status will become **finished**. Futures can also have the status **error** if something went wrong, plus a couple of others e.g. **cancelled** if you cancel the task before it completes.


Once the Future has the **finished** state, you can call its `result` method to get the actual result value out:

In [None]:
N

In [None]:
N.result()

Unlike `submit`, the `result()` method is _synchronous_: if you call `result` before the result is ready, the call will simply block until it is ready:

In [None]:
M = client.submit(count_primes_less_than, 5000001)
print(M.result())

Note that the `submit` call is _eager_ - the scheduler will start the submitted task running on the cluster as soon it can. It is also possible to start tasks in a _lazy_ way, so that they don't start running until a later processing stage needs their output, but we won't cover that in this notebook.

### Running stuff in parallel

So far you might have been wondering what the point of `dask` is - what does it get us? One answer is that if we have many worker processes, running on a powerful computer or across several powerful computers, we can **run things in parallel**.

The `map` method is like `submit`, but we can give a whole list or iterable of arguments, and one task will be create for each one. The scheduler will run these in parallel where possible:

In [None]:
futures = client.map(count_primes_less_than, range(3000001, 3000010))
futures

As the tasks complete, we'll see that some of these Futures start to get the **finished** status.

In [None]:
futures

This is how we can save a massive amount of time.

Let's do some timing to prove it:

In [None]:
def local_test():
    for i in range(9):
        print(count_primes_less_than(1500000 + i))

In [None]:
%%time
local_test()

In [None]:
def distributed_test():
    futures = client.map(count_primes_less_than, [1500000 + i for i in range(9)])
    print(futures)
    results = client.gather(futures)
    for r in results:
        print(r)

In [None]:
%%time
distributed_test()

The method `gather` used in the above is like `result` but used for getting results from a whole list of Futures as one.

### Chaining together tasks

As well as taking regular value arguments, `.submit` can take Futures as well. In this case the submitted computation will start running as soon as all the required arguments are available.

In [None]:
def add_them(x, y):
    return x + y

In [None]:
M1 = client.submit(count_primes_less_than, 4000000)
M2 = client.submit(count_primes_less_than, 4500000)

Msum = client.submit(add_them, M1, M2)
Msum

In [None]:
Msum.result()

In this way, if we want to we can **build up a whole DAG of computation**:

In [None]:
M3 = client.submit(count_primes_less_than, 4000003)
M4 = client.submit(count_primes_less_than, 4500004)
M5 = client.submit(count_primes_less_than, 4000005)
M6 = client.submit(count_primes_less_than, 4500006)
M7 = client.submit(count_primes_less_than, 4000007)
M8 = client.submit(count_primes_less_than, 4500008)

Msum1 = client.submit(add_them, M3, M4)
Msum2 = client.submit(add_them, M5, M6)
Msum3 = client.submit(add_them, M7, M8)

Msum4 = client.submit(add_them, Msum1, Msum2)
Msum5 = client.submit(add_them, Msum4, Msum3)

Msum6 = client.submit(add_them, M3, Msum3)

K = Msum5.result()
print(K)

### What about when things go wrong?

When an error occurs while executing a task on the cluster, the Future object gets status **error**:

In [None]:
def goes_wrong():
    return 10 / 0

In [None]:
N = client.submit(goes_wrong)

In [None]:
N

In such cases, if we try to get the result value with `.result()` we will get the traceback for the error! Yay, it's just a python traceback of the kind we are used to by now :)

In [None]:
N.result()

### Restarting things

If things somehow go irredeemably pear-shaped, you can get back to a clean state in the scheduler and workers like this:

In [None]:
client.restart()