<img src="https://raw.githubusercontent.com/dask/dask/main/docs/source/images/dask_icon.svg"
     width="20%"
     align="right"
     alt="Dask logo\" />
          
# Schedulers

---

So far you have only seen the power of Dask Collections and familiarized yourself with the idea of task graphs. You learnt that these task graphs need to be executed to get the results of our computation. But what does it mean "to be executed"? Who takes care of this? Well, as you might have guess from the title of this notebook, this is the job of the Dask task scheduler. 


<img src="https://raw.githubusercontent.com/coiled/pydata-global-dask/master/images/grid_search_schedule.gif"
     width="95%"
     alt="Grid search schedule\" />


There are different task schedulers in Dask, and even though they will all compute the same result, but they might have different performances. There are two different classes of schedulers: single-machine and distributed schedulers.


## Single Machine Schedulers

Single machine schedulers require no setup, they only use the Python standard library, and they provide basic features on on a local process or threadpool. Dask provides different single machine schedulers:


- "threads": The threaded scheduler executes computations with a local `concurrent.futures.ThreadPoolExecutor`. The threaded scheduler is the default choice for Dask Array, Dask DataFrame, and Dask Delayed.

- "processes": The multiprocessing scheduler executes computations with a local `concurrent.futures.ProcessPoolExecutor`. The multiprocessing scheduler is the default choice for Dask Bag.

- "single-threaded": The single-threaded synchronous scheduler executes all computations in the local thread, with no parallelism at all. This is particularly valuable for debugging and profiling, which are more difficult when using threads or processes.

### Single machine schedulers in action

Using the same examples we used in the Delayed lesson, let's see how we can modify the scheduler and how this affects the performance of our computations. 

In [1]:
import dask
from dask import delayed
from time import sleep

In [2]:
@delayed
def inc(x):
    """Increments x by one"""
    sleep(1)
    return x + 1

In [3]:
data = list(range(8))

results = []
for i in data:
    y = inc(i)         
    results.append(y)
    
total = delayed(sum)(results)
total

Delayed('sum-8e410cdf-693a-4102-94e6-fc293b30b4dd')

###  The multi-threading scheduler (default)

In [4]:
%%time 
dask.config.set(scheduler='threads')
total.compute()

CPU times: user 4.05 ms, sys: 2.44 ms, total: 6.49 ms
Wall time: 1.01 s


36

In [5]:
%%time 
dask.config.set(scheduler='threads', num_workers=4)  #setting num_workers
total.compute()

CPU times: user 3.41 ms, sys: 1.84 ms, total: 5.25 ms
Wall time: 2.01 s


36

### The multi-process scheduler 

Notice that we can also set the scheduler as a context manager. 

In [6]:
%%time
with dask.config.set(scheduler='processes'): 
    total.compute()   

CPU times: user 13.9 ms, sys: 16.9 ms, total: 30.8 ms
Wall time: 6.31 s


### The single-threaded scheduler 

Tools like `pdb` do not work well with multi threads or process, but you can work around this by using the single-threaded scheduler when debugging.

In [7]:
%%time
total.compute(scheduler="single-threaded")  

CPU times: user 3.68 ms, sys: 1.44 ms, total: 5.11 ms
Wall time: 8.02 s


36

For more information about single-machine schedulers, and which one to choose you can visit the detailed the Dask documentation on [single-machine schedulers](https://docs.dask.org/en/latest/setup/single-machine.html). 

## Distributed Scheduler

The Dask distributed scheduler, despite having "distributed" in its name, also works well on a single machine. **We recommend using the distributed scheduler as it offers more features and diagnostics. You can think of the distributed scheduler as an "advanced scheduler".** 

The distributed scheduler can be used in a cluster as well as locally. Deploying a remote Dask cluster involves additional setup that you can read more about on the Dask [setup documentation](https://docs.dask.org/en/latest/setup.html).

For now, we will set up the scheduler locally. To set up the distributed scheduler locally we need to create a `Client` object, which will let you interact with the "cluster" (local threads or processes on your machine).

In [8]:
from dask.distributed import Client

In [9]:
client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49216,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:49231,Total threads: 3
Dashboard: http://127.0.0.1:49232/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:49220,
Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-hrth59ak,Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-hrth59ak

0,1
Comm: tcp://127.0.0.1:49237,Total threads: 3
Dashboard: http://127.0.0.1:49238/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:49221,
Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-u0cn_ne2,Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-u0cn_ne2

0,1
Comm: tcp://127.0.0.1:49234,Total threads: 3
Dashboard: http://127.0.0.1:49235/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:49219,
Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-e82p0q7_,Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-e82p0q7_

0,1
Comm: tcp://127.0.0.1:49240,Total threads: 3
Dashboard: http://127.0.0.1:49241/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:49222,
Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-9udyrcif,Local directory: /Users/pavithraes/Developer/Dask/dask-mini-tutorial/notebooks/dask-worker-space/worker-9udyrcif


When we create a distributed scheduler `Client`, by default it registers itself as the default Dask scheduler. From now on, all `.compute()` calls will start using the distributed scheduler unless otherwise is specified. 

The distributed scheduler has many features that you can learn more about in the [Dask Distributed documentation](https://distributed.dask.org/en/latest/) but a nice feature to explore is diagnostic the Dashboard. We took a look at the dashboard as we performed computations but for a brief overview of the main components of the dashboard you can check the Dask documentation on [diagnosing performance](https://distributed.dask.org/en/latest/diagnosing-performance.html).

If you click on the link of the dashboard on the cell above and run the computation of `total` as we did before you will see now some action happening on the dashboard.  

In [10]:
total.compute()

36

In [11]:
client.close()

## Resources to learn more

- [Dask documentation on scheduling](https://docs.dask.org/en/latest/scheduling.html)
- Example Dynamic computations using Futures: [PyData Global Dask tutorial - schedulers](https://github.com/coiled/pydata-global-dask/blob/master/3-schedulers.ipynb)
- Advance Delayed with distributed scheduler: [Dask tutorial - Advanced delayed](https://github.com/dask/dask-tutorial/blob/main/06_distributed_advanced.ipynb)