# DASK using HPC cluster through PBS Pro batch queue system.
To use pangeo(http://pangeo.io), one need to get used to with some basic python packages used in pangeo.  DASK(http://dask.org) is one of those key component of pangeo.  Through DASK, you can parallelize your computation.  In this notebook, we use dask_jobqueue package based on PBS cluster implementation to show how one can make use. 

***For MPI users, this is like combination of writing code using mpi_init, submitting mpirun script to a cluster (without knowing what your context of code will be....)***

---

## 1. Set up python environments. 

In [1]:
import os, sys
import dask
import xarray as xr

---
## 2. Set up dask worker's configuration.  
This configuration is based on hal(CNES HPC Cluster), which use PBSPro as batch scheduler.   

In [2]:
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores=6,memory='30 gb', walltime='1:00:00')

- In case you need small chunk of nodes, use following set up.

*cluster = PBSCluster(cores=1,memory='5 gb', walltime='1:00:00')*

ATT these chunks should be chosen well so that the chunk fits well to the cluster's pbs configuration. In this example it chose 'walltime 1 hour' since that is the max time limit of short quick job queue.

**In case you are using HPC cluster, if you make short, and small chunk, your job generally fits to some gap of non used resources of HPC cluster, thus it may start faster.  But this also creates small chunks of used resources, which makes it difficult to run big job for other users.  Thus it is important to consult with your HPC cluster managers.  **

---

## 3.  Start DASK worker. 

*** Attention, from this step, you starts to 'occupy' a cluster resource ***
***As you 'occupy' a cluster resource, do not forget to kill your 'dask-cluster' using 'qdel' command, or use dask-cluster command 'cluster.close()' from the note book ***

In [3]:
w = cluster.scale(10)

---

## 4. Check  your batch job 

you can do from your terminal following command to see if your pbs jobs are running or not, and if running on which nodes.  You can try to connect those nodes with ssh, and check them with 'top, ps ...' commands to examine how your DASK workers are running. 

qstat -u your-login -n -1

In [4]:
!qstat -u odakat -n -1


admin01: 
                                                            Req'd  Req'd   Elap
Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
4398868.admin01 odakat   qdev     jupyterhub  72639   1  16   61gb 12:00 R 05:20 node558/0*16
4410568.admin01 odakat   qt1h     dask-worke   9146   1   6   28gb 01:00 R 00:00 node089/3*6
4410569.admin01 odakat   qt1h     dask-worke  22172   1   6   28gb 01:00 R 00:00 node098/1*6
4410570.admin01 odakat   qt1h     dask-worke  22174   1   6   28gb 01:00 R 00:00 node098/2*6
4410571.admin01 odakat   qt1h     dask-worke  23137   1   6   28gb 01:00 R 00:00 node099/1*6
4410572.admin01 odakat   qt1h     dask-worke  23152   1   6   28gb 01:00 R 00:00 node099/2*6
4410573.admin01 odakat   qt1h     dask-worke   4493   1   6   28gb 01:00 R 00:00 node104/1*6
4410574.admin01 odakat   qt1h     dask-worke  17044   1   6   28gb 01:00 R 00:00 nod

Following command can be used to check how pbs jobs are submitted.


In [5]:
 print(cluster.job_script())

#!/bin/bash

#!/usr/bin/env bash
#PBS -N dask-worker
#PBS -l select=1:ncpus=6:mem=28GB
#PBS -l walltime=1:00:00
JOB_ID=${PBS_JOBID%.*}



/home/mp/odakat/miniconda3/envs/equinox/bin/python -m distributed.cli.dask_worker tcp://10.120.43.58:57642 --nthreads 6 --memory-limit 30.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60



Following commands will enable you to check your DASK client status.  
The ***dashboard*** link enable you to monitor your DASK cluster.   
- DASK dashbord's worker tab to see how each worker use memory and cpu in a graphical mode.  
- 'System' shows system usage of jupyternotebook which host DASK scheduler.  
Other tabs are also usefull to understand how parallel process are working.  ATT, it use cpu and memory of your 'jupyter notebook node' if you try to see too complicated graphical interface, your jupyter notebook itself may get slower.

In [6]:
from dask.distributed import Client
client=Client(cluster)
client

0,1
Client  Scheduler: tcp://10.120.43.58:57642  Dashboard: http://10.120.43.58:8787/status,Cluster  Workers: 10  Cores: 60  Memory: 300.00 GB


---

## 5. Increase  (or scale down) number of workers  

In [7]:
cluster.scale_up(11)

In [8]:
!qstat -u odakat


admin01: 
                                                            Req'd  Req'd   Elap
Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
4398868.admin01 odakat   qdev     jupyterhub  72639   1  16   61gb 12:00 R 05:20
4410568.admin01 odakat   qt1h     dask-worke   9146   1   6   28gb 01:00 R 00:00
4410569.admin01 odakat   qt1h     dask-worke  22172   1   6   28gb 01:00 R 00:00
4410570.admin01 odakat   qt1h     dask-worke  22174   1   6   28gb 01:00 R 00:00
4410571.admin01 odakat   qt1h     dask-worke  23137   1   6   28gb 01:00 R 00:00
4410572.admin01 odakat   qt1h     dask-worke  23152   1   6   28gb 01:00 R 00:00
4410573.admin01 odakat   qt1h     dask-worke   4493   1   6   28gb 01:00 R 00:00
4410574.admin01 odakat   qt1h     dask-worke  17044   1   6   28gb 01:00 R 00:00
4410575.admin01 odakat   qt1h     dask-worke  17579   1   6   28gb 01:00 R 00:00
44105

In [9]:
!qstat -u odakat |grep dask-work |wc -l

11


As you can see, the number of dask worker increased; you have 11 of them now. One could use other command like
*cluster.scale(11)* instead of *cluster.scale_up(11)*.

You can also try to decrease the number of workers.  

In [10]:
cluster.scale(5)

In [11]:
!qstat -u odakat 


admin01: 
                                                            Req'd  Req'd   Elap
Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
4398868.admin01 odakat   qdev     jupyterhub  72639   1  16   61gb 12:00 R 05:20
4410573.admin01 odakat   qt1h     dask-worke   4493   1   6   28gb 01:00 R 00:00
4410574.admin01 odakat   qt1h     dask-worke  17044   1   6   28gb 01:00 R 00:00
4410575.admin01 odakat   qt1h     dask-worke  17579   1   6   28gb 01:00 R 00:00
4410576.admin01 odakat   qt1h     dask-worke   2706   1   6   28gb 01:00 R 00:00
4410577.admin01 odakat   qt1h     dask-worke  17619   1   6   28gb 01:00 R 00:00


Let's say your dask-worker been killed for whatever the reason

In [12]:
!qstat -u odakat |grep dask-work |awk '{print "qdel " $1 }' >./del-daskworker 
!chmod +x ./del-daskworker
!./del-daskworker

In [13]:
client

0,1
Client  Scheduler: tcp://10.120.43.58:57642  Dashboard: http://10.120.43.58:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


You can re-scale your cluster and get back to your parallel computation. 

In [14]:
cluster.scale(8)

In [15]:
!qstat -u odakat


admin01: 
                                                            Req'd  Req'd   Elap
Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
4398868.admin01 odakat   qdev     jupyterhub  72639   1  16   61gb 12:00 R 05:21
4410601.admin01 odakat   qt1h     dask-worke   9444   1   6   28gb 01:00 R 00:00
4410602.admin01 odakat   qt1h     dask-worke  22503   1   6   28gb 01:00 R 00:00
4410603.admin01 odakat   qt1h     dask-worke  22505   1   6   28gb 01:00 R 00:00
4410604.admin01 odakat   qt1h     dask-worke  23454   1   6   28gb 01:00 R 00:00
4410605.admin01 odakat   qt1h     dask-worke  23455   1   6   28gb 01:00 R 00:00
4410606.admin01 odakat   qt1h     dask-worke   4765   1   6   28gb 01:00 R 00:00
4410607.admin01 odakat   qt1h     dask-worke  17752   1   6   28gb 01:00 R 00:00
4410608.admin01 odakat   qt1h     dask-worke  18295   1   6   28gb 01:00 R 00:00


---
## 6. once you finish your computation, do not forget to stop your dask worker with following command.


In [16]:
cluster.close()