## Basic cluster creation
* The following code will connect to the Dask Gateway server, use jupyterhub OAuth to authenticate and ask the server for a new cluster.
* This willl trigger the server to start a Scheduler pod in OKD, which will be exposed via Traefik. 
* To connect to the scheduler, use the public route: tls://dask-gateway-tls.fnal.gov:443
* This route is static for now. Functionality is being built to expose it dynamically

In [None]:
import logging
import os
import re
import pprint
import time
import socket
import dask_gateway
from dask_gateway import Gateway

# For debugging mostly. these two are used for authenticatng against Jupyter
print(os.environ['JUPYTERHUB_API_TOKEN'])
print(os.environ['JUPYTERHUB_API_URL'])

# These settings are static
gateway = Gateway(
     address="http://172.30.227.32",
     auth='jupyterhub',
)

# Use cluster = gateway.new_cluster(shutdown_on_close=False) 
# to keep the cluster running after the client process dies
print("Creating GatewayCluster, this might take up to 10 seconds")
cluster = gateway.new_cluster()
# Enable cluster adaptive scaling
#cluster.adapt(minimum=0, maximum=10)
# Make sure the cluster exists
pprint.pprint(cluster)

## Staging and htcondor worker submit
* In order to have hybrid Dask clusters, we have added a mode of operation to Dask Gateway Server in which external Dask Singularity workers join a containerized Kubernetes scheduler on the fly. 
* The following code will use the Dask Gateway API to interact with the cluster, obtain information for to configure workers to properly connect and authenticate, and finally join the scheduler in a hybrid cluster.
* To connect to the scheduler, we use the public route: tls://dask-gateway-tls.fnal.gov:443

In [None]:
import pwd

def prepareWorkerJob(cluster, n_workers):
    username = pwd.getpwuid( os.getuid() )[ 0 ]
    security = cluster.security
    tmproot = f"/uscmst1b_scratch/lpc1/3DayLifetime/{username}/{cluster_name}"
    condor_logdir = f"{tmproot}/condor"
    credentials_dir = f"{tmproot}/dask-credentials"
    worker_space_dir = f"{tmproot}/dask-worker-space"
    image_name = f"/cvmfs/unpacked.cern.ch/registry.hub.docker.com/coffeateam/coffea-dask-cc7-gateway:0.7.12-fastjet-3.3.4.0rc9-g8a990fa"
    os.makedirs(tmproot, exist_ok=True)
    os.makedirs(condor_logdir, exist_ok=True)
    os.makedirs(credentials_dir, exist_ok=True)
    os.makedirs(worker_space_dir, exist_ok=True)

    with open(f"{credentials_dir}/dask.crt", 'w') as f:
        f.write(security.tls_cert)
    with open(f"{credentials_dir}/dask.pem", 'w') as f:
        f.write(security.tls_key)
    with open(f"{credentials_dir}/api-token", 'w') as f:
        f.write(os.environ['JUPYTERHUB_API_TOKEN'])
        
    # Prepare JDL
    jdl = """executable = start.sh
arguments = """+cluster_name+""" htcdask-worker_$(Cluster)_$(Process)
output = condor/htcdask-worker$(Cluster)_$(Process).out
error = condor/htcdask-worker$(Cluster)_$(Process).err
log = condor/htcdask-worker$(Cluster)_$(Process).log
request_cpus = 4
request_memory = 2100
should_transfer_files = yes
transfer_input_files = """+credentials_dir+""", """+worker_space_dir+""" , """+condor_logdir+"""
Queue """+str(n_workers)+""
    
    with open(f"{tmproot}/htcdask_submitfile.jdl", 'w+') as f:
        f.writelines(jdl)
        
    # Prepare singularity command
    sing = """#!/bin/bash
export SINGULARITYENV_DASK_GATEWAY_WORKER_NAME=$2
export SINGULARITYENV_DASK_GATEWAY_API_URL="https://dask-gateway-api.fnal.gov/api"
export SINGULARITYENV_DASK_GATEWAY_CLUSTER_NAME=$1
export SINGULARITYENV_DASK_GATEWAY_API_TOKEN=/etc/dask-credentials/api-token
export SINGULARITYENV_DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="debug"

worker_space_dir=${PWD}/dask-worker-space/$2
mkdir $worker_space_dir

singularity exec -B ${worker_space_dir}:/srv/dask-worker-space -B dask-credentials:/etc/dask-credentials /cvmfs/unpacked.cern.ch/registry.hub.docker.com/coffeateam/coffea-dask-cc7-gateway:0.7.12-fastjet-3.3.4.0rc9-g8a990fa \
dask-worker --name $2 --tls-ca-file /etc/dask-credentials/dask.crt --tls-cert /etc/dask-credentials/dask.crt --tls-key /etc/dask-credentials/dask.pem --worker-port 10000:10070 --no-nanny --no-dashboard --local-directory /srv --nthreads 1 --nprocs 1 tls://dask-gateway-tls.fnal.gov:443"""
    
    with open(f"{tmproot}/start.sh", 'w+') as f:
        f.writelines(sing)
    os.chmod(f"{tmproot}/start.sh", 0o775)
    return tmproot

## Scaling up
* Inform the GatewayServer that the cluster will scale to `n_workers`
* Call the function to stage all relevant files and scripts for HTCondor Dask Workers
* Call `condor_submit` from the command line, to avoid disrupting existing condor wrappers installed by T1 admins

In [None]:
cluster_name = cluster.name
n_workers = 1

cluster.scale(n_workers)
print("Staging "+str(n_workers)+" batch workers for cluster: "+cluster_name)

tmproot = prepareWorkerJob(cluster,n_workers)
print("Sandbox folder located at: "+tmproot)

print("Submitting HTCondor job(s) for "+str(n_workers)+" workers")
import subprocess
import sys

# We add this to avoid a bug on Farruk's condor_submit wrapper (a fix is in progress)
os.environ['LS_COLORS']="ExGxBxDxCxEgEdxbxgxcxd"

# Submit our jdl, print the result and call the cluster widget
result = subprocess.check_output(['sh','-c','/usr/local/bin/condor_submit htcdask_submitfile.jdl'], cwd=tmproot)
pprint.pprint(result)

cluster

In [None]:
# Obtain a client for connecting to your cluster scheduler
# Your cluster should be ready to take requests
cluster = gateway.connect(cluster.name)
client = cluster.get_client()
client

In [None]:
# Run computations on the cluster
# At this point you should be able to use normal dask methods to do work.
# For example, here we take the mean of a random array.
import dask.array as da
a = da.random.normal(size=(1000, 1000), chunks=(500, 500))
a.mean().compute()

In [None]:
# Only run this cell to remove/shutdown the cluster
cluster = gateway.connect(cluster.name)
cluster
cluster.shutdown()