<img src="images/dask_horizontal.svg"
     width="45%"
     alt="Dask logo\">
     
# Schedulers

So far we've been discussing Dask collections like Dask DataFrames and Dask Delayed objects which are used to build up task graphs for a computation. After these graphs are generated, they need to be executed (potentially in parallel). This is the job of a Dask task scheduler. This notebook covers the basics of Dask's task schedulers.

Different task schedulers exist within Dask. Each will consume a task graph and compute the same result, but with potentially different performance characteristics. Dask has two different classes of schedulers: single-machine schedulers and a distributed scheduler.

![grid-search](images/grid_search_schedule.gif "grid-search")

# Single Machine Schedulers

Single machine schedulers provide basic features on a local process or thread pool and require no setup (only use the Python standard library). The different single machine schedulers Dask provides are:

- `"threads"`: The threaded scheduler executes computations with a local `multiprocessing.pool.ThreadPool`. The threaded scheduler is the default choice for Dask arrays, Dask DataFrames, and Dask delayed. 

- `"processes"`: The multiprocessing scheduler executes computations with a local `multiprocessing.Pool`.

- `"single-threaded"`: The single-threaded synchronous scheduler executes all computations in the local thread, with no parallelism at all. This is particularly valuable for debugging and profiling, which are more difficult when using threads or processes.

You can configure which scheduler is used in a few different ways. You can set the scheduler globally by using the `dask.config.set(scheduler=)` command

In [1]:
from dask.datasets import timeseries

df = timeseries(start="2020-11-11", end="2020-11-15", partition_freq="10min")
result = df[["x", "y"]].resample("1h").mean()
result

Unnamed: 0_level_0,x,y
npartitions=96,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-11-11 00:00:00,float64,float64
2020-11-11 01:00:00,...,...
...,...,...
2020-11-14 23:00:00,...,...
2020-11-15 00:00:00,...,...


In [8]:
from dask.distributed import Client
client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 64801 instead


0,1
Client  Scheduler: tcp://127.0.0.1:64802  Dashboard: http://127.0.0.1:64801/status,Cluster  Workers: 4  Cores: 8  Memory: 8.50 GB


In [9]:
import dask

dask.config.set(scheduler='threads')
result.compute(); # Will use the multi-threading scheduler

or use it as a context manager to set the scheduler for a block of code

In [10]:
with dask.config.set(scheduler='processes'):
    result.compute()  # Will use the multi-processing scheduler

or even within a single compute call

In [11]:
result.compute(scheduler='threads');  # Will use the multi-threading scheduler

The `num_workers` argument is used to specify the number of threads or processes to use

In [12]:
result.compute(scheduler='threads', num_workers=4);

# Distributed Scheduler

Despite having "distributed" in it's name, the distributed scheduler works well on both single and multiple machines. Think of it as the "advanced scheduler".

The Dask distributed cluster is composed of a single centralized scheduler and one or more worker processes. A `Client` object is used as the user-facing entry point to interact with the cluster.

<img src="images/dask-cluster.svg"
     width="85%"
     alt="Dask components\">
     

Deploying a remote Dask cluster involves some [additional setup](https://distributed.dask.org/en/latest/setup.html). There are several open source projects for deploying a Dask cluster on commonly used computing resources:

- [Dask-Kubernetes](https://kubernetes.dask.org/en/latest/) for deploying Dask using native Kubernetes APIs
- [Dask-Yarn](https://yarn.dask.org/en/latest/) for deploying Dask on YARN clusters
- [Dask-Jobqueue](https://jobqueue.dask.org/en/latest/) for deploying Dask on job queuing systems (e.g. PBS, Slurm, etc.)
- [Dask Cloud Provider](https://cloudprovider.dask.org/en/latest/) for deploying Dask on cloud-based infrastructure (e.g. AWS Fargate, AzureML)
- [Dask-MPI](http://mpi.dask.org/en/latest/) for deploying Dask on existing MPI environments

Setting up the distributed scheduler locally just involves creating a `Client` object, which lets you interact with the "cluster" (local threads or processes on your machine).

In [13]:
from dask.distributed import Client

In [14]:
client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 64878 instead


0,1
Client  Scheduler: tcp://127.0.0.1:64879  Dashboard: http://127.0.0.1:64878/status,Cluster  Workers: 4  Cores: 8  Memory: 8.50 GB


**Note**: when we create a distributed scheduler `Client`, by default it registers itself as the default Dask scheduler. All `.compute()` calls will automatically start using the distributed scheduler unless otherwise told to use a different scheduler. 

The distributed scheduler has many features:

- [Sophisticated memory management](https://distributed.dask.org/en/latest/memory.html)

- [Data locality](https://distributed.dask.org/en/latest/locality.html)

- [Adaptive deployments](https://distributed.dask.org/en/latest/adaptive.html)

- [Cluster resilience](https://distributed.dask.org/en/latest/resilience.html)

- ...

See the [Dask distributed documentation](https://distributed.dask.org) for full details about all the distributed scheduler features.

For this talk, we'll highlight two of the most commonly used distributed scheduler features: real time diagnostics and the futures interface.

In [15]:
result.compute()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-11-11 00:00:00,-0.006355,0.013654
2020-11-11 01:00:00,-0.016818,-0.000378
2020-11-11 02:00:00,-0.001648,-0.003441
2020-11-11 03:00:00,0.010456,0.002554
2020-11-11 04:00:00,0.001347,0.007363
...,...,...
2020-11-14 20:00:00,0.002659,-0.009587
2020-11-14 21:00:00,-0.007473,0.019631
2020-11-14 22:00:00,-0.010856,0.002075
2020-11-14 23:00:00,-0.002019,0.004531


# Futures interface

The Dask distributed scheduler implements a superset of Python's [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) interface that allows for finer control and asynchronous computation.

In [16]:
import time

def inc(x):
    time.sleep(0.5)
    return x + 1

def double(x):
    time.sleep(0.5)
    return 2 * x

def add(x, y):
    time.sleep(0.5)
    return x + y

We can run these functions locally

In [17]:
inc(1)

2

Or we can submit them to run remotely on a Dask worker node

In [18]:
future = client.submit(inc, 1)
future

The `Client.submit` function sends a function and arguments to the distributed scheduler for processing. It returns a `Future` object that refer to remote data on the cluster. The `Future` returns immediately while the computations run remotely in the background. There is no blocking of the local Python session.

If you wait a moment, and then check on the future again, you'll see that it has finished.

In [19]:
future

You can retrieve the result of a `Future` by calling the `.result()` method. If the status of the `Future` is "finished", meaning the task has been successfully run on one of the workers, then calling `.result()` will return almost immediately. Conversely, if the `Future` is still "pending" then calling `.result()` will block the current Python process and wait until the task has been run and then return the result. 

In [20]:
future.result()

2

Similar to `Client.submit`, there's also a `Client.map` function for running a function across an interable of inputs (similar to Python's built in `map` function).

In [21]:
# Similar to doing: [client.submit(inc, i) for i in range(10)]
futures = client.map(inc, list(range(10)))
futures

[<Future: pending, key: inc-77754b804cae314d87c9bd9bb2dd64b4>,
 <Future: finished, type: builtins.int, key: inc-7a6ce9ae718b79ca6731ba7aa8b176ef>,
 <Future: pending, key: inc-ded0b06942e0857b3d8e0691fbbd29ab>,
 <Future: pending, key: inc-092121e59a00ec7e984bf4b50bb4bb59>,
 <Future: pending, key: inc-829585c25dba6bc175c286188b1bd3dd>,
 <Future: pending, key: inc-c4b109735bf7cf09611f56f47ca0f4b8>,
 <Future: pending, key: inc-072c1f9008f6334505f5daad77150c5e>,
 <Future: pending, key: inc-c999dc54451d9e787a9dc11801bde4b4>,
 <Future: pending, key: inc-53ce3b27578193858a1921c66752cd0f>,
 <Future: pending, key: inc-10f30d21da868a18ccba82d2c8cbbfe1>]

`Client.map` returns a list of `Future` objects, one for each of the inputs being mapped over. To get the results for mutliple futures, you can use the `Client.gather` method:

In [22]:
# Similar to doing: [f.result() for f in futures]
results = client.gather(futures)
results

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

`Future`s obey standard Python garbage collection. That is, the data `Future`s point to will continue to live on a Dask worker until there are no more references to the `Future`, at which point they will be deleted from the cluster.

In [23]:
del futures[3]

### Specifying task dependencies

Much like the `dask.delayed` interface, we can submit new tasks that depend on the results of other futures. This will create a dependency between the inputs and outputs. Dask will track the execution of all tasks and ensure that downstream tasks are run at the proper time and place and with the proper data.

In [24]:
x = client.submit(inc, 20)
y = client.submit(double, 5)
z = client.submit(add, x, y)
z

In [25]:
z.result()

31

## Exercise 1: Parallelize a for-loop

For this exercise, we'll repeat Exercise 1 from the Dask Delayed notebook, but this time instead of using `dask.delayed` use the Distributed scheduler's `Future`s interface to parallelize the `for`-loop below:

In [26]:
%%time

import time

def inc(x):
    time.sleep(0.5)
    return x + 1

def double(x):
    time.sleep(0.5)
    return 2 * x

def add(x, y):
    time.sleep(0.5)
    return x + y

data = list(range(10))

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)
total

Wall time: 15.1 s


145

In [39]:
# Your solution here

import time

def inc(x):
    time.sleep(0.5)
    return x + 1

def double(x):
    time.sleep(0.5)
    return 2 * x

def add(x, y):
    time.sleep(0.5)
    return x + y

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

data = list(range(10))

result = client.submit(sum, output)
result

In [40]:
result.result()

145

In [35]:
# %load solutions/schedulers-1.py
import time

def inc(x):
    time.sleep(0.5)
    return x + 1

def double(x):
    time.sleep(0.5)
    return 2 * x

def add(x, y):
    time.sleep(0.5)
    return x + y

data = list(range(10))

output = []
for x in data:
    a = client.submit(inc, x)
    b = client.submit(double, x)
    c = client.submit(add, a, b)
    output.append(c)

total = client.submit(sum, output)
total.result()

# Fun Example: Dynamic computations

In the [Delayed notebook](1-delayed.ipynb), we saw that `Delayed` objects can't be used control flow (e.g. `if`/`else`), it's difficult to create dynamic workflows. For example, a workflow where whether or not subsequent tasks are submitted depends on the output of previous tasks. The `Future`s interface provides more flexibility in these situations where computations may evolve over time.

For this, we can use operations like [`as_completed()`](https://distributed.dask.org/en/latest/api.html#distributed.as_completed), which returns futures in the order in which they complete.

In [41]:
from dask.distributed import as_completed

# Create some initial set of Futures
futures = []
for i in range(64):
    x = client.submit(inc, i)     # x = inc(i)
    y = client.submit(double, x)  # y = inc(x)
    z = client.submit(add, x, y)  # z = inc(y)
    futures.append(z)

# Use as_completed to create an iterator which yields
# Futures as the complete
seq = as_completed(futures)
while seq.count() > 2:  # at least two futures left
    a = next(seq)
    b = next(seq)
    new = client.submit(add, a, b)  # add them together
    seq.add(new)                    # add new future back into as_completed iterator

This was a brief demo of the distributed scheduler. It's has lots of other cool features not touched on here. For more information, check out the [Distributed documentation](https://distributed.dask.org). 

# Additional Resources

- Dask links:
    - Documentation: https://docs.dask.org
    - GitHub repository: https://github.com/dask/dask
    - Dask tutorial repository: https://github.com/dask/dask-tutorial
- If you have a Dask usage questions, please ask it on [Stack Overflow with the #dask tag](https://stackoverflow.com/questions/tagged/dask). Dask developers monitor this tag and will answer questions.
- If you run into a bug, feel free to file a report on the [Dask GitHub issue tracker](https://github.com/dask/dask/issues).