# Dask Gateway Example Notebook
Advice and suggestions for starting and working with a Dask Gateway cluster on Pangeo. Additional information can be found here: http://pangeo.io/cloud.html#dask, and here: https://gateway.dask.org/usage.html

## Working with a Dask Gateway client
A Dask Gateway object is a client for the Dask Gateway server and allows the creation and shutdown of Dask clusters. Creating a Gateway does not create a Dask cluster or start any new pods when it is created. The correct defaults needed are already set so it can usually be run without any parameters.

In [32]:
from dask_gateway import Gateway
gateway = Gateway()

In [21]:
gateway?

[0;31mType:[0m        Gateway
[0;31mString form:[0m Gateway<http://10.1.34.198:8000/services/dask-gateway>
[0;31mFile:[0m        /srv/conda/envs/notebook/lib/python3.7/site-packages/dask_gateway/client.py
[0;31mDocstring:[0m  
A client for a Dask Gateway Server.

Parameters
----------
address : str, optional
    The address to the gateway server.
proxy_address : str, int, optional
    The address of the scheduler proxy server. Defaults to `address` if not
    provided. If an int, it's used as the port, with the host/ip taken from
    ``address``. Provide a full address if a different host/ip should be
    used.
public_address : str, optional
    The address to the gateway server, as accessible from a web browser.
    This will be used as the root of all browser-facing links (e.g. the
    dask dashboard).  Defaults to ``address`` if not provided.
auth : GatewayAuth, optional
    The authentication method to use.
asynchronous : bool, optional
    If true, starts the client in asy

### List the clusters already running
It is a good idea to make sure that there aren't already any clusters that are running which sometimes happens. We can do this by listing the clusters with the `list_clusters()` function. Note that if you run these cells you may not have any clusters running or pending so the output may change.

In [11]:
gateway.list_clusters()

[ClusterReport<name=ooi-staging.eed460b981a84aa0b472f2b8570bbf8f, status=PENDING>]

In [17]:
gateway.list_clusters()[0].start_time

datetime.datetime(2020, 11, 3, 20, 42, 56)

In [21]:
type(gateway.list_clusters()[0].scheduler_address)

NoneType

### Stop any clusters if necessary
Listing the clusters shows a pending cluster that was started yesterday, and does not have an IP address. I will stop this cluster because it is orphaned and no longer needed. Once that is done `list_clusters()` will return an empty list.

In [23]:
orphaned_cluster_name = gateway.list_clusters()[0].name
orphaned_cluster_name

'ooi-staging.eed460b981a84aa0b472f2b8570bbf8f'

In [24]:
gateway.stop_cluster(orphaned_cluster_name)

In [25]:
gateway.list_clusters()

[]

## Create a new Dask cluster with the default options using the Gateway client
Now we will create a new Dask GatewayCluster object using the Gateway object's `new_cluster()` function. This command creates a Dask scheduler pod which will be responsible for scheduling jobs for workers. For now we will use the default worker options. Later we will look at changing the CPU and memory requests. Because this command starts a pod (just the scheduler pod), it can take several seconds or longer to complete.

In [33]:
cluster = gateway.new_cluster()

In [4]:
type(cluster)

dask_gateway.client.GatewayCluster

Now that we have created a cluster, `list_clusters()` will return the name of a running cluster that has a URL.

In [5]:
gateway.list_clusters()

[ClusterReport<name=ooi-staging.94fff0d08f4146a2a3e5852061756d78, status=RUNNING>]

In [6]:
gateway.list_clusters()[0].scheduler_address

'gateway://traefik-ooi-staging-dask-gateway.ooi-staging:80/ooi-staging.94fff0d08f4146a2a3e5852061756d78'

In [7]:
cluster?

[0;31mType:[0m        GatewayCluster
[0;31mString form:[0m GatewayCluster<ooi-staging.94fff0d08f4146a2a3e5852061756d78, status=running>
[0;31mFile:[0m        /srv/conda/envs/notebook/lib/python3.7/site-packages/dask_gateway/client.py
[0;31mDocstring:[0m  
A dask-gateway cluster.

Parameters
----------
address : str, optional
    The address to the gateway server.
proxy_address : str, int, optional
    The address of the scheduler proxy server. If an int, it's used as the
    port, with the host/ip taken from ``address``. Provide a full address
    if a different host/ip should be used.
public_address : str, optional
    The address to the gateway server, as accessible from a web browser.
    This will be used as the root of all browser-facing links (e.g. the
    dask dashboard).  Defaults to ``address`` if not provided.
auth : GatewayAuth, optional
    The authentication method to use.
cluster_options : mapping, optional
    A mapping of cluster options to use to start the clus

### Scaling a cluster explicitly
We now have a Dask cluster we can communicate with but we do not have any workers. One way to get a set of workers is to scale the cluster explicitly to some number of workers. This command will create a set of worker pods that will need to start, and although this command is non-blocking, it often will take many minutes for workers to become available, depending on whether nodes need to be started or Docker images need to be downloaded.

In [34]:
cluster.scale(4)

### Adaptively scaling a cluster
Coming soon.

## Connect to the Dask cluster
To use the cluster we need to get a cluster Client object using the `get_client()` function which will allow us to submit jobs to the cluster.

In [35]:
client = cluster.get_client()

In [26]:
type(client)

distributed.client.Client

In [27]:
client?

[0;31mType:[0m        Client
[0;31mString form:[0m <Client: 'tls://10.0.128.255:8786' processes=4 threads=4, memory=5.37 GB>
[0;31mFile:[0m        /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py
[0;31mDocstring:[0m  
Connect to and submit computation to a Dask cluster

The Client connects users to a Dask cluster.  It provides an asynchronous
user interface around functions and futures.  This class resembles
executors in ``concurrent.futures`` but also allows ``Future`` objects
within ``submit/map`` calls.  When a Client is instantiated it takes over
all ``dask.compute`` and ``dask.persist`` calls by default.

It is also common to create a Client without specifying the scheduler
address , like ``Client()``.  In this case the Client creates a
:class:`LocalCluster` in the background and connects to that.  Any extra
keywords are passed from Client to LocalCluster in this case.  See the
LocalCluster documentation for more information.

Parameters
---------

In [28]:
client

0,1
Client  Scheduler: gateway://traefik-ooi-staging-dask-gateway.ooi-staging:80/ooi-staging.05536b8e4de045a4974e1d64701ae75a  Dashboard: /services/dask-gateway/clusters/ooi-staging.05536b8e4de045a4974e1d64701ae75a/status,Cluster  Workers: 4  Cores: 4  Memory: 5.37 GB


## Activate the Dask labextension
To use the Dask extension tools we need to populate the URL in the Dask extansion pane. Calling the client provides the dashboard link for the extension. Shift-right-click on it to copy the address.

In [13]:
client

0,1
Client  Scheduler: gateway://traefik-ooi-staging-dask-gateway.ooi-staging:80/ooi-staging.94fff0d08f4146a2a3e5852061756d78  Dashboard: /services/dask-gateway/clusters/ooi-staging.94fff0d08f4146a2a3e5852061756d78/status,Cluster  Workers: 4  Cores: 4  Memory: 5.37 GB


## Test calculation with the Dask cluster
This is a basic test calculation that should take quite a while to complete but can be sped up by submitting this job to the Dask cluster.

In [14]:
import dask.array as da

In [15]:
%%time
a = da.random.normal(size=(10000, 40000), chunks=(500, 500))
f = client.submit(a.mean().compute)
f.result()

CPU times: user 192 ms, sys: 34.4 ms, total: 227 ms
Wall time: 36 s


-9.939651340816679e-05

## Shutdown the Dask Cluster
It is very important to shutdown your Dask cluster when you are done using it.

In [16]:
cluster.shutdown()

Loop through `list_clusters()` to shutdown all clusters in the gateway.

In [None]:
for cluster in gateway.list_clusters():
    print('Stopping %s' % cluster.name)
    gateway.stop_cluster(cluster.name)

## Starting a Dask cluster directly
Here we create a new Dask GatewayCluster object directly using `GatewayCluster()`. This command creates a Dask scheduler pod which will be responsible for scheduling jobs for workers. Because this command starts a pod, it can take several seconds or longer to complete.

In [58]:
from dask_gateway import GatewayCluster
cluster = GatewayCluster()

In [59]:
cluster.scale(4)

In [60]:
client = cluster.get_client()

In [66]:
client

0,1
Client  Scheduler: gateway://traefik-ooi-staging-dask-gateway.ooi-staging:80/ooi-staging.0baddcd1d51648488b0c0f1832e96107  Dashboard: /services/dask-gateway/clusters/ooi-staging.0baddcd1d51648488b0c0f1832e96107/status,Cluster  Workers: 6  Cores: 6  Memory: 8.05 GB


In [64]:
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [67]:
cluster.shutdown()

## Create a cluster with modified options

In [1]:
from dask_gateway import Gateway
gateway = Gateway()
options = gateway.cluster_options()

In [2]:
options

VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…

In [3]:
# set the options programatically, or through their HTML repr
options.worker_memory = 5
options.worker_cores = 1.6

In [4]:
# Create a cluster with those options
cluster = gateway.new_cluster(options)
cluster.scale(4)

In [5]:
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [42]:
client = cluster.get_client()

In [43]:
client

0,1
Client  Scheduler: gateway://traefik-ooi-staging-dask-gateway.ooi-staging:80/ooi-staging.dae7858a510a4a3f8275010e36b00659  Dashboard: /services/dask-gateway/clusters/ooi-staging.dae7858a510a4a3f8275010e36b00659/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [None]:
cluster.shutdown()

## Other misc commands
These commands are useful for getting information but aren't normally needed

In [None]:
import dask

In [None]:
# get the dask-gateway version
dask_gateway.__version__

In [8]:
# show the default dask-gateway settings
dask.config.config['gateway']

{'auth': {'type': 'jupyterhub', 'kwargs': {}},
 'cluster': {'options': {'image': '{JUPYTER_IMAGE_SPEC}'}},
 'public_address': '/services/dask-gateway/',
 'address': 'http://10.1.34.198:8000/services/dask-gateway/',
 'proxy_address': 'gateway://traefik-ooi-staging-dask-gateway.ooi-staging:80',
 'http-client': {'proxy': True}}

In [None]:
dask.config.config['distributed']['worker']

In [None]:
# show the current default image to be started on workers
os.environ['JUPYTER_IMAGE_SPEC']

In [None]:
# explicit gateway call
#gateway = Gateway(address = 'https://ooi.pangeo.io/services/dask-gateway/',
#                  proxy_address = 'gateway://traefik-ooi-prod-dask-gateway.ooi-prod:80',
#                  auth = 'jupyterhub')

In [None]:
#default gateway call
gateway = Gateway()

In [None]:
# call new_cluster with explicit image
#cluster = gateway.new_cluster(image=os.environ['JUPYTER_IMAGE'])

In [None]:
# default new_cluster call
cluster = gateway.new_cluster()

In [None]:
gateway.list_clusters()

In [None]:
#gateway.stop_cluster('ooi-prod.c78d1e274be647328e9400d681101616')

In [None]:
# get the name of the new cluster
gateway.list_clusters()[0].name

In [None]:
# connect to cluster that already exists
# this is helpful if the cluster exists but it is disconnected for some reason
#cluster_name = gateway.list_clusters()[0].name
#cluster = gateway.connect(cluster_name)

In [None]:
# the dashboard_link property will show the link that can be pasted into the Dask labextension
cluster.dashboard_link

In [None]:
client.scheduler_info()['services']

In [None]:
gateway.address

In [None]:
cluster.scheduler_address

In [None]:
cluster.scheduler_comm

In [None]:
cluster.scheduler_info