# Demonstration Notebook for kubeflow enabled dask-kubernetes package

In [1]:
import os
from time import sleep
from datetime import datetime

from dask_kubernetes import KubeCluster
from dask.distributed import Client
from distributed.core import Status
import dask.array as da

## Show versions of dask related packages

In [2]:
!pip list | grep "\(dask\|distributed\)"

dask                     2021.12.0
dask-kubernetes          kubeflow-enablement-0.2.0-dev2
dask-labextension        5.1.0
distributed              2021.12.0


## Helper Functions

In [3]:
print(f'working dir: {os.getcwd()}')

def run_test_case(client, array):
    number_of_workers = count_ready_workers(client)
    start_time = datetime.now()
    answer = array.mean().compute()
    elpased_time = datetime.now() - start_time
    print(
        '***\n'
        f'* number of workers: {number_of_workers}, '
        f'elapsed time: {elpased_time}, answer: {answer}'
        '\n***'
    )

def count_ready_workers(client):
    count = 0
    for c in client.cluster.workers:
        if client.cluster.workers[c].status == Status.running:
            count += 1
    return count

working dir: /home/jovyan/dask_kubeflow/tests


## Setup for the test, start DASK KubeCluster

Test case based on [`dask-kubernetes` Quickstart Example](https://kubernetes.dask.org/en/latest/kubecluster.html#quickstart)

In [4]:
# start up cluster with 1 workers
cluster = KubeCluster(
    'worker-spec.yaml', 
    n_workers=1, 
    enable_kubeflow=True  # new parameter to tell DASK that cluster is in Kubeflow cluster
)
client = Client(cluster, timeout=60)  # wait longer for first time image download
client

Creating scheduler pod on cluster. This may take some time.



+-------------+-----------+------------------------+---------+
| Package     | client    | scheduler              | workers |
+-------------+-----------+------------------------+---------+
| blosc       | None      | 1.10.2                 | None    |
| cloudpickle | 1.6.0     | 2.0.0                  | None    |
| distributed | 2021.12.0 | 2021.12.0+22.g96ee7f7b | None    |
| lz4         | None      | 3.1.10                 | None    |
| toolz       | 0.11.1    | 0.11.2                 | None    |
+-------------+-----------+------------------------+---------+


0,1
Connection method: Cluster object,Cluster type: dask_kubernetes.KubeCluster
Dashboard: http://dask-jovyan-67869b14-b.kubeflow-user:8787/status,

0,1
Dashboard: http://dask-jovyan-67869b14-b.kubeflow-user:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.42.0.55:8786,Workers: 0
Dashboard: http://10.42.0.55:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [5]:
# Create a large array and calculate the mean
array = da.ones((10000, 1000, 1000), chunks=100)

## Initial test case with 1 worker

In [6]:
%%time
print("test case for one worker")
run_test_case(client, array)

test case for one worker
***
* number of workers: 1, elapsed time: 0:00:26.946223, answer: 1.0
***
CPU times: user 1.19 s, sys: 157 ms, total: 1.35 s
Wall time: 26.9 s


## Test case with 3 workers

In [7]:
# scale up
print("test case for three workers")
cluster.scale(3)
counter = 0
while True:
    ready_worker_count = count_ready_workers(client) 
    sleep(1)
    counter += 1
    print(f'scaling...ready workers: {ready_worker_count}...waiting {counter} seconds')
    if ready_worker_count == 3:
        break
    elif counter > 60:
        raise RuntimeError('Scale up operation, did not complete in required time.')

test case for three workers
scaling...ready workers: 1...waiting 1 seconds
scaling...ready workers: 3...waiting 2 seconds


In [8]:
%%time
run_test_case(client, array)

***
* number of workers: 3, elapsed time: 0:00:12.532821, answer: 1.0
***
CPU times: user 1.16 s, sys: 56.1 ms, total: 1.21 s
Wall time: 12.5 s


## Test case with 2 workers

In [9]:
# scale down
print("test case for two workers")
cluster.scale(2)
counter = 0
while True:
    ready_worker_count = count_ready_workers(client) 
    sleep(1)
    counter += 1
    print(f'scaling...ready workers: {ready_worker_count}...waiting {counter} seconds')
    if ready_worker_count == 2:
        break
    elif counter > 60:
        raise RuntimeError('Scale up operation, did not complete in required time.')

test case for two workers
scaling...ready workers: 3...waiting 1 seconds
scaling...ready workers: 2...waiting 2 seconds


In [10]:
%%time
run_test_case(client, array)

***
* number of workers: 2, elapsed time: 0:00:15.243809, answer: 1.0
***
CPU times: user 1.03 s, sys: 42 ms, total: 1.07 s
Wall time: 15.3 s


## Retrieve log files from Scheduler and Workers

In [11]:
logs = cluster.get_logs()

# display avaialable logs
logs.keys()

dict_keys(['Cluster', 'Scheduler', 'tcp://10.42.0.56:44027', 'tcp://10.42.1.61:39267'])

In [12]:
# display Scheduler logs
print(logs['Scheduler'])

distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://10.42.0.55:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.scheduler - INFO - Receive client connection: Client-df626302-6c25-11ec-818b-b69d9dd65938
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.42.0.56:44027', name: 0, status: running, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.0.56:44027
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.42.0.57:38467', name: 2, status: closed, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.0.57:38467
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.42.1.61:39267', name: 1, status: running, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting 

In [13]:
# Display a Woker log file
print(logs[list(logs.keys())[2]])

distributed.worker - INFO -       Start worker at:     tcp://10.42.0.56:44027
distributed.worker - INFO -          Listening to:     tcp://10.42.0.56:44027
distributed.worker - INFO -          dashboard at:           10.42.0.56:39993
distributed.worker - INFO - Waiting to connect to: tcp://dask-jovyan-67869b14-b.kubeflow-user:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   0.93 GiB
distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-jhmdt6hc
distributed.worker - INFO - -------------------------------------------------

+-------------+------------------------+------------------------+-----------------------------------------+
| Package     | This Worker            | scheduler              | workers                                 |
+-------------+------------------------+------------

## Shutdown cluster

In [14]:
# close DASK Connection
client.close()

# Close KubeCluster
cluster.close()

print('all done')

all done
