# Using Dask distributed

In [1]:
from dask_jobqueue import PBSCluster

In [2]:
cluster = PBSCluster(project='p06010014')

Scale cluster, get 10 (or 2) workers

In [3]:
#cluster.scale(10)
cluster.scale(2)

In [4]:
from dask.distributed import Client

In [5]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://128.117.181.200:37691  Dashboard: http://128.117.181.200:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [6]:
client.scheduler_info()['services']

{'dashboard': 8787}

Need to update the dashboard URL default

In [6]:
import dask

In [8]:
dask.config.get('distributed.dashboard.link')

'{scheme}://{host}:{port}/status'

In [7]:
dask.config.set({'distributed.dashboard.link': "/proxy/{port}/status"});

In [10]:
dask.config.get('distributed.dashboard.link')

'/proxy/{port}/status'

In [8]:
client

0,1
Client  Scheduler: tcp://128.117.181.200:37691  Dashboard: /proxy/8787/status,Cluster  Workers: 1  Cores: 36  Memory: 109.00 GB


In [9]:
dask.config.get('temporary-directory')

Create a test dataframe

In [10]:
import dask.dataframe as dd
df = dd.demo.make_timeseries()
df

Unnamed: 0_level_0,id,name,x,y
npartitions=11,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-31,int64,object,float64,float64
2000-02-29,...,...,...,...
...,...,...,...,...
2000-11-30,...,...,...,...
2000-12-31,...,...,...,...


Run persist to read into memory; watch dashboard to see computation live

In [11]:
df = df.persist()

### Question: how to do you release workers (i.e. cancel jobs?)

In [12]:
client.close() # This works without error,  but doesn't do anything?

In [13]:
cluster.close() # This works without error only if you run client.close() FIRST. This is the step that cancels jobs.

## Try the NCARCluster functionality

https://github.com/NCAR/ncar-jobqueue

In [1]:
from ncar_jobqueue import NCARCluster

In [2]:
cluster = NCARCluster(project='p06010014')

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


### Question: is there a way to get a random port each time I initiate a cluster? Throws a warning if I've already started a cluster because it tries to use port 8787 by default

In [3]:
cluster.scale(5)

In [6]:
client = Client(cluster)

In [7]:
client

0,1
Client  Scheduler: tcp://128.117.181.199:37321  Dashboard: https://jupyterhub.ucar.edu/ch/user/kdagon/proxy/37165/status,Cluster  Workers: 4  Cores: 144  Memory: 436.00 GB


The dashboard URL only works for jupyterhub (NCARCluster settings?)

In [10]:
dask.config.set({'distributed.dashboard.link': "/proxy/{port}/status"});

In [11]:
client

0,1
Client  Scheduler: tcp://128.117.181.199:37321  Dashboard: /proxy/37165/status,Cluster  Workers: 5  Cores: 180  Memory: 545.00 GB


In [37]:
# check versions (optional - for debugging)
#client.get_versions(check=True)['scheduler']

Create a test dataarray; watch dashboard to see various computations

In [38]:
import dask.array as da

In [39]:
x = da.random.random((5000,5000), chunks=(500,500))

In [40]:
x = x.persist()

In [43]:
x.nbytes / 1e9

0.2

In [44]:
y = (x + x.T) - x.mean(axis=0)

In [45]:
y = y.persist()

In [46]:
y.sum().compute()

12499424.928402135

In [12]:
client.close()
cluster.close()