# Starting Dask cluster and client using dask-hpcconfig on Datarmor

dask-hpcconfig wraps certain configuration for optimam usage of dask on HPC clusters for users, so that each users does not need to specify manually.  

This notebook shows ways to start different size of dask clusters on datarmor using dask-hpcconfg.  

- Example 1: Single node  (in default mode) 
- Example 2: single node  (with specification of size of memory for each worker) 
- Example 3: Starting dask workers on different compute nodes.  (in default mode) 
- Example 4: Starting dask workers on different compute nodes.  (with specification of size of memory for each worker) 
- Example 5: Some advanced use case.

In [None]:
import dask_hpcconfig
from distributed import Client

In [None]:
dask_hpcconfig.print_clusters()

In [None]:
dask_hpcconfig.__version__

## Example 1: Single node

This use case will run 14 workers on your local datarmor nodes.  It will set several optimnisation for you ( spill=false, optimal number of threads for each workers...)

In [None]:
cluster = dask_hpcconfig.cluster("datarmor-local")
client = Client(cluster)
client

### Click here the 'Launch dashboard in JupyterLab to see how your dask workers are allocated. 
You are ready to use dask in your enviroment now.  
Before trying next example, do not forget to close your cluster.  


In [None]:
cluster.close()

##  Example 2: Big-mem single node

This example show the use case that requires max 25GB of memory for each worker

In [None]:
memory_size = 25
n_worker_per_node = 115 // memory_size
n_threads_per_worker = 28 // n_worker_per_node
print(n_worker_per_node, n_threads_per_worker)
overrides = {
    "cluster.threads_per_worker": n_threads_per_worker,
    "cluster.n_workers": n_worker_per_node,
}

cluster = dask_hpcconfig.cluster("datarmor-local", **overrides)
client = Client(cluster)
client

In [None]:
cluster.close()

##  Example 3: Multiple node to run dask workers
This use case will run 14 workers on each node of Datarmor.  It will set several optimnisation for you ( spill=false, optimal number of threads for each workers...)

In [None]:
overrides = {}
# overrides = { "cluster.cores": 28 , "cluster.processes": 6 }

cluster = dask_hpcconfig.cluster("datarmor", **overrides)
client = Client(cluster)

### Here you specify, how many workers you want in total
This example asks 20 workers

In [None]:
cluster.scale(20)

#### You can use qstat to verify the jobs submitted in datarmor
You will see there are two job submitted, and there are in fact 28 workers stated.  It is because we asked for '14 workers' for each node.  Thus to make it 20, it must at least start 2 jobs to have 20, thus you got 28 in total.  


In [None]:
!qstat -u todaka

In [None]:
client

In [None]:
cluster.close()

##  Example 4: Big-mem single node

This example show the use case that requires max 25GB of memory for each worker

In [None]:
memory_size = 25
n_worker_per_node = 120 // memory_size
# n_threads_per_worker=28//n_worker_per_node
print("nb workers we use for each node ", n_worker_per_node)  # ,n_threads_per_worker)
overrides = {
    "cluster.processes": n_worker_per_node
}  # ,"cluster.c": n_worker_per_node }

cluster = dask_hpcconfig.cluster("datarmor", **overrides)
client = Client(cluster)

In [None]:
cluster.scale(8)

#### You can use qstat to verify the jobs submitted in datarmor
You will see there are two job submitted, and there are in fact 28 workers stated.  It is because we asked for '14 workers' for each node.  Thus to make it 20, it must at least start 2 jobs to have 20, thus you got 28 in total.  


In [None]:
!qstat -u todaka

In [None]:
client

In [None]:
cluster.close()

## Example 5: advanced usage.  
For advanced users, you can pass different dask cluster, or worker options 
- 'datarmor-local': https://docs.dask.org/en/stable/deploying-python.html#reference
- 'datarmor': 
-  for all, worker specification: https://distributed.dask.org/en/stable/worker.html#api-documentation

Next cells shows for the case that you want change 'distributed.worker.memory.pause' option to 80%


In [None]:
# one can specify mo
overrides = {"cluster.cores": 3, "distributed.worker.memory.pause": 0.80}
cluster = dask_hpcconfig.cluster("datarmor-local", **overrides)
cluster = dask_hpcconfig.cluster("datarmor-local", **overrides)
client = Client(cluster)