<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


## Schedulers

In the previous notebooks, we used `dask.delayed` and `dask.dataframe` to parallelize computations.
These work by building a *task graph* instead of executing immediately.
Each *task* represents some function to call on some data, and the full *graph* is the relationship between all the tasks.

When we wanted the actual result, we called `compute`, which handed the task graph off to a *scheduler*.

**Schedulers are responsible for running a task graph and producing a result**.

![](https://raw.githubusercontent.com/dask/dask-org/master/images/grid_search_schedule.gif)

First, there are the single machine schedulers that execute things in parallel using threads or processes (or synchronously for debugging). These are what we've used up until now. Second, there's the `dask.distributed` scheduler, which is newer and has more features than the single machine scheduler.

In this notebook we'll first talk about the different schedulers. Then we'll use the `dask.distributed` scheduler in more depth.

### Local Schedulers

Dask separates computation description (task graphs) from execution (schedulers). This allows you to write code once, and run it locally or scale it out across a cluster.

Here we discuss the *local* schedulers - schedulers that run only on a single machine. The three options here are:

- `dask.threaded.get         # uses a local thread pool`
- `dask.multiprocessing.get  # uses a local process pool`
- `dask.get                  # uses only the main thread (useful for debugging)`

In each case we change the scheduler used in a few different ways:

- By providing a `get=` keyword argument to `compute`:

```python
total.compute(get=dask.multiprocessing.get)
# or 
dask.compute(a, b, get=dask.multiprocessing.get)
```

- Using `dask.set_options`:

```python
# Use multiprocessing in this block
with dask.set_options(get=dask.multiprocessing.get):
    total.compute()
# Use multiprocessing globally
dask.set_options(get=dask.multiprocessing.get)
```

---

*Note: on master, we've also added a `scheduler=` keyword to `compute` that takes in scheduler names instead of scheduler functions. In future releases `dask.compute(..., scheduler='threads')` or `dask.set_options(scheduler='threads')` will be the preferred methods.*

---

Here we repeat a simple dataframe computation from the previous section using the different schedulers:

In [None]:
import os
import dask.dataframe as dd

In [None]:
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

# Maximum non-cancelled delay
largest_delay = df[~df.Cancelled].DepDelay.max()

In [None]:
largest_delay

In [None]:
%time _ = largest_delay.compute()  # this uses threads by default

In [None]:
import dask.multiprocessing
%time _ = largest_delay.compute(scheduler='processes')  # this uses processes

In [None]:
%time _ = largest_delay.compute(scheduler='single-threaded')  # This uses a single thread

By default the threaded and multiprocessing schedulers use the same number of workers as cores. You can change this using the `num_workers` keyword in the same way that you specified `get` above:

```
largest_delay.compute(get=dask.multiprocessing.get, num_workers=2)
```

To see how many cores you have on your computer, you can use `multiprocessing.cpu_count`

In [None]:
from multiprocessing import cpu_count
cpu_count()

### Some Questions to Consider:

- How much speedup is possible for this task (hint, look at the graph).
- Given how many cores are on this machine, how much faster could the parallel schedulers be than the single-threaded scheduler.
- How much faster was using threads over a single thread? Why does this differ from the optimal speedup?
- Why is the multiprocessing scheduler so much slower here?

---

## In what cases would you want to use one scheduler over another?

http://dask.pydata.org/en/latest/setup/single-machine.html

---

## Distributed Scheduler

The `dask.distributed` system is composed of a single centralized scheduler and many worker processes. [Deploying](http://dask.pydata.org/en/latest/setup.html) a remote Dask cluster involves some additional effort. But doing things locally is just involves creating a `Client` object, which lets you interact with the "cluster" (local threads or processes on your machine). For more information see [here](http://dask.pydata.org/en/latest/setup/single-distributed.html).

In [None]:
from dask.distributed import Client

# Setup a local cluster.
# By default this sets up 1 worker per core
client = Client()
client.cluster

Be sure to click the `Dashboard` link to open up the diagnostics dashboard.

By default, creating a `Client` makes it the default scheduler. Any calls to `.compute` will use the cluster your `client` is attached to (See http://dask.pydata.org/en/latest/scheduling.html for how to specify which scheduler to use).

In [None]:
%time largest_delay.compute()

#### Some Questions to Consider

- How does this compare to the optimal parallel speedup?
- Why is this faster than the threaded scheduler?

---

### Exercise

Run the following computations while looking at the diagnostics page. In each case what is taking the most time?

In [None]:
# Number of flights
_ = len(df)

In [None]:
# Number of non-cancelled flights
_ = len(df[~df.Cancelled])

In [None]:
# Number of non-cancelled flights per-airport
_ = df[~df.Cancelled].groupby('Origin').Origin.count().compute()

In [None]:
# Average departure delay from each airport?
_ = df[~df.Cancelled].groupby('Origin').DepDelay.mean().compute()

In [None]:
# Average departure delay per day-of-week
_ = df.groupby(df.Date.dt.dayofweek).DepDelay.mean().compute()

---

### New API

The distributed scheduler is more sophisticated than the single machine schedulers. It can compute asynchronously, and also provides an api similar to that of `concurrent.futures`. This will be discussed more [in part 6](06-distributed-advanced.ipynb). For further information you can also see the docs http://distributed.readthedocs.io/en/latest/.