In [1]:
import dask.distributed as dd
from dask.distributed import Client, LocalCluster, progress
from dask_jobqueue import PBSCluster
from distributed.utils import tmpfile
from dask.distributed import get_worker
import dask.dataframe as dd
import os

  from distributed.utils import tmpfile


In [2]:
# create the user directory if it doesn't already exist
! mkdir -p /scratch/vp91/$USER

In [3]:
# set the path 
user = os.getenv('USER', 'default value')
path = '/scratch/vp91/'+user
print(path)

/scratch/vp91/jxj900


In [4]:
# The jupyter notebook is launched from your $HOME directory.
# Change the working directory the user directory under /scratch/vp91
os.chdir(os.path.expandvars(path))

In [5]:
# Make sure the python we use is from the venv
os.environ['DASK_PYTHON'] = '/scratch/vp91/Training-Venv/dask/dask-venv/bin/python3'

In [6]:
# Make sure all the modules are loaded.
# It is essential that we use the same python and library for all aspects of dask
# If we dont activate the venv then the workers may have a different versions of libraries
setup_commands = ["module load python3/3.11.0", "source /scratch/vp91/Training-Venv/dask/dask-venv/bin/activate"]

In [7]:
# Gadi use custom PBS directives
# So some of the default values to launch a PBS job through Dask call will not work in Gadi
# Any directive specific to gadi should be mentioned here.
# refer : https://opus.nci.org.au/display/Help/Gadi+Quick+Reference+Guide
extra = ['-q normal',
         '-P vp91', 
         '-l ncpus=48', 
         '-l mem=192GB']

In [8]:
# walltime: Walltime for each worker job.
# cores: Total number of cores per job.
# shebang: Path to desired interpreter for your batch submission script.
# job_extra_directives: List of other PBS options. Each option will be prepended with the #PBS prefix.
# local_directory: Dask worker local directory for file spilling.
# job_directives_skip: Directives to skip in the generated job script header. Directives lines containing 
#                      the specified strings will be removed. Directives added by job_extra_directives 
#                      won’t be affected.
# interface: Network interface like ‘eth0’ or ‘ib0’. This will be used both for the Dask scheduler and 
#            the Dask workers interface
# job_script_prologue: Commands to add to script before launching worker
# python: Python executable used to launch Dask workers. Defaults to the Python that is submitting these jobs



cluster = PBSCluster(walltime="00:50:00", 
                     cores=48, 
                     memory="192GB",
                     shebang='#!/usr/bin/env bash',
                     job_extra_directives=extra, 
                     local_directory='$TMPDIR', 
                     job_directives_skip=["select"], 
                     interface="ib0",
                     job_script_prologue=setup_commands,
                     python=os.environ["DASK_PYTHON"])

In [9]:
print(cluster.job_script())

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -l walltime=00:50:00
#PBS -q normal
#PBS -P vp91
#PBS -l ncpus=48
#PBS -l mem=192GB
module load python3/3.11.0
source /scratch/vp91/Training-Venv/dask/dask-venv/bin/activate
/scratch/vp91/Training-Venv/dask/dask-venv/bin/python3 -m distributed.cli.dask_worker tcp://10.6.24.71:35577 --nthreads 6 --nworkers 8 --memory-limit 22.35GiB --name dummy-name --nanny --death-timeout 60 --local-directory $TMPDIR --interface ib0



In [10]:
# create a cluster with 2 nodes
cluster.scale(jobs=2)

In [16]:
# Verify the workers have been allocated as expected
!qstat

Job id                 Name             User              Time Use S Queue
---------------------  ---------------- ----------------  -------- - -----
95589566.gadi-pbs      sys-dashboard-s* jxj900            00:01:05 R normal-exec     
95589910.gadi-pbs      dask-worker      jxj900            00:02:59 R normal-exec     
95589920.gadi-pbs      dask-worker      jxj900            00:03:18 R normal-exec     


In [17]:
cluster

0,1
Dashboard: /proxy/8787/status,Workers: 16
Total threads: 96,Total memory: 357.60 GiB

0,1
Comm: tcp://10.6.24.71:35577,Workers: 16
Dashboard: /proxy/8787/status,Total threads: 96
Started: 35 minutes ago,Total memory: 357.60 GiB

0,1
Comm: tcp://10.6.61.54:37395,Total threads: 6
Dashboard: /proxy/38053/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:35095,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-nhigykmp,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-nhigykmp

0,1
Comm: tcp://10.6.61.54:39407,Total threads: 6
Dashboard: /proxy/39221/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:33887,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-p_uckrvt,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-p_uckrvt

0,1
Comm: tcp://10.6.61.54:34239,Total threads: 6
Dashboard: /proxy/35037/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:43389,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-h_hpo_6h,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-h_hpo_6h

0,1
Comm: tcp://10.6.61.54:38021,Total threads: 6
Dashboard: /proxy/45583/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:38567,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-s6nii7mg,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-s6nii7mg

0,1
Comm: tcp://10.6.61.54:44361,Total threads: 6
Dashboard: /proxy/45759/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:40709,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-j3rh9fln,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-j3rh9fln

0,1
Comm: tcp://10.6.61.54:40877,Total threads: 6
Dashboard: /proxy/36557/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:45895,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-xan08w6k,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-xan08w6k

0,1
Comm: tcp://10.6.61.54:36561,Total threads: 6
Dashboard: /proxy/34999/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:32789,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cngm7qm9,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cngm7qm9

0,1
Comm: tcp://10.6.61.54:36393,Total threads: 6
Dashboard: /proxy/41983/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:32989,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cuktjhzt,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cuktjhzt

0,1
Comm: tcp://10.6.59.71:38039,Total threads: 6
Dashboard: /proxy/40325/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:40341,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-nkde6vd4,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-nkde6vd4

0,1
Comm: tcp://10.6.59.71:45241,Total threads: 6
Dashboard: /proxy/36693/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:37893,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hjjuho4f,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hjjuho4f

0,1
Comm: tcp://10.6.59.71:44999,Total threads: 6
Dashboard: /proxy/41229/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:45945,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-fn54s5wl,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-fn54s5wl

0,1
Comm: tcp://10.6.59.71:42967,Total threads: 6
Dashboard: /proxy/44307/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:33257,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-p2myl7qi,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-p2myl7qi

0,1
Comm: tcp://10.6.59.71:44117,Total threads: 6
Dashboard: /proxy/46699/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:40105,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-l1asbp3u,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-l1asbp3u

0,1
Comm: tcp://10.6.59.71:33283,Total threads: 6
Dashboard: /proxy/43227/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:37091,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-efjw4bz2,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-efjw4bz2

0,1
Comm: tcp://10.6.59.71:41251,Total threads: 6
Dashboard: /proxy/36741/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:35759,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hxupdx0y,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hxupdx0y

0,1
Comm: tcp://10.6.59.71:40561,Total threads: 6
Dashboard: /proxy/42391/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:44129,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-jkc6pfmj,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-jkc6pfmj


In [18]:
# create the client
client = Client(cluster)

In [19]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 16
Total threads: 96,Total memory: 357.60 GiB

0,1
Comm: tcp://10.6.24.71:35577,Workers: 16
Dashboard: /proxy/8787/status,Total threads: 96
Started: 35 minutes ago,Total memory: 357.60 GiB

0,1
Comm: tcp://10.6.61.54:37395,Total threads: 6
Dashboard: /proxy/38053/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:35095,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-nhigykmp,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-nhigykmp

0,1
Comm: tcp://10.6.61.54:39407,Total threads: 6
Dashboard: /proxy/39221/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:33887,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-p_uckrvt,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-p_uckrvt

0,1
Comm: tcp://10.6.61.54:34239,Total threads: 6
Dashboard: /proxy/35037/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:43389,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-h_hpo_6h,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-h_hpo_6h

0,1
Comm: tcp://10.6.61.54:38021,Total threads: 6
Dashboard: /proxy/45583/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:38567,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-s6nii7mg,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-s6nii7mg

0,1
Comm: tcp://10.6.61.54:44361,Total threads: 6
Dashboard: /proxy/45759/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:40709,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-j3rh9fln,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-j3rh9fln

0,1
Comm: tcp://10.6.61.54:40877,Total threads: 6
Dashboard: /proxy/36557/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:45895,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-xan08w6k,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-xan08w6k

0,1
Comm: tcp://10.6.61.54:36561,Total threads: 6
Dashboard: /proxy/34999/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:32789,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cngm7qm9,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cngm7qm9

0,1
Comm: tcp://10.6.61.54:36393,Total threads: 6
Dashboard: /proxy/41983/status,Memory: 22.35 GiB
Nanny: tcp://10.6.61.54:32989,
Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cuktjhzt,Local directory: /jobfs/95589920.gadi-pbs/dask-scratch-space/worker-cuktjhzt

0,1
Comm: tcp://10.6.59.71:38039,Total threads: 6
Dashboard: /proxy/40325/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:40341,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-nkde6vd4,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-nkde6vd4

0,1
Comm: tcp://10.6.59.71:45241,Total threads: 6
Dashboard: /proxy/36693/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:37893,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hjjuho4f,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hjjuho4f

0,1
Comm: tcp://10.6.59.71:44999,Total threads: 6
Dashboard: /proxy/41229/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:45945,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-fn54s5wl,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-fn54s5wl

0,1
Comm: tcp://10.6.59.71:42967,Total threads: 6
Dashboard: /proxy/44307/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:33257,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-p2myl7qi,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-p2myl7qi

0,1
Comm: tcp://10.6.59.71:44117,Total threads: 6
Dashboard: /proxy/46699/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:40105,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-l1asbp3u,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-l1asbp3u

0,1
Comm: tcp://10.6.59.71:33283,Total threads: 6
Dashboard: /proxy/43227/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:37091,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-efjw4bz2,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-efjw4bz2

0,1
Comm: tcp://10.6.59.71:41251,Total threads: 6
Dashboard: /proxy/36741/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:35759,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hxupdx0y,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-hxupdx0y

0,1
Comm: tcp://10.6.59.71:40561,Total threads: 6
Dashboard: /proxy/42391/status,Memory: 22.35 GiB
Nanny: tcp://10.6.59.71:44129,
Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-jkc6pfmj,Local directory: /jobfs/95589910.gadi-pbs/dask-scratch-space/worker-jkc6pfmj


## Distributed Training

Distributed Training is most useful for training large models on medium-sized datasets.You may have a large model when searching over many hyper-parameters, or when using an [ensemble method](https://scikit-learn.org/stable/modules/ensemble.html) with many individual estimators.


 

In [20]:
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline

We use a dataset available within [Scikit learn](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.fetch_20newsgroups.html). It loads the filenames and data from the 20 newsgroups dataset.

In [21]:
data = fetch_20newsgroups(subset='train', categories=None)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()

11314 documents
20 categories



### Create a pipeline:
1. [HashingVectorizer](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html): Convert a collection of text documents to a matrix of token occurrences.
2. [TfidfTransformer](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfTransformer.html): Transform a count matrix to a normalized tf or tf-idf representation.
3. [SGDClassifier](https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDClassifier.html): This estimator implements regularized linear models with stochastic gradient descent (SGD) learning.


In [41]:
pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])

In [29]:
pipeline

GridSearchCV is the process of performing hyperparameter tuning in order to determine the optimal values for a given model.

In [30]:
parameters = {
    'tfidf__use_idf': (True, False),
    'tfidf__norm': ('l1', 'l2',  ),
    'clf__alpha': (0.00001, 0.000001),
}

In [31]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False)

Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.

Alternatively, Scikit-Learn can use Dask for parallelism.  This lets you train those estimators using all the cores of your *cluster* without significantly changing your code.  This is most useful for training large models on medium-sized datasets.

In [32]:
import joblib

with joblib.parallel_backend('dask'):
    grid_search.fit(data.data, data.target)

Fitting 3 folds for each of 8 candidates, totalling 24 fits


In [33]:
grid_search.best_score_

0.9369934159407846

In [34]:
grid_search.best_params_

{'clf__alpha': 1e-06, 'tfidf__norm': 'l1', 'tfidf__use_idf': True}

In [39]:
import pandas as pd
cv_results = pd.DataFrame(grid_search.cv_results_)
cv_results.head()

Unnamed: 0,mean_fit_time,std_fit_time,mean_score_time,std_score_time,param_clf__alpha,param_tfidf__norm,param_tfidf__use_idf,params,split0_test_score,split1_test_score,split2_test_score,mean_test_score,std_test_score,rank_test_score
0,1.221473,0.160469,0.465106,0.249073,1e-05,l1,True,"{'clf__alpha': 1e-05, 'tfidf__norm': 'l1', 'tf...",0.916084,0.944056,0.940351,0.933497,0.012405,2
1,0.379104,0.151755,0.432076,0.461234,1e-05,l1,False,"{'clf__alpha': 1e-05, 'tfidf__norm': 'l1', 'tf...",0.926573,0.91958,0.891228,0.912461,0.015283,8
2,1.283618,0.72672,0.387383,0.294188,1e-05,l2,True,"{'clf__alpha': 1e-05, 'tfidf__norm': 'l2', 'tf...",0.909091,0.940559,0.919298,0.922983,0.013109,5
3,0.61994,0.569808,0.196498,0.201145,1e-05,l2,False,"{'clf__alpha': 1e-05, 'tfidf__norm': 'l2', 'tf...",0.93007,0.91958,0.891228,0.913626,0.016407,7
4,1.514803,0.492653,0.397726,0.253296,1e-06,l1,True,"{'clf__alpha': 1e-06, 'tfidf__norm': 'l1', 'tf...",0.923077,0.947552,0.940351,0.936993,0.01027,1
