Installations:

`conda install dask`

`pip install dask-jobqueue>=0.7.1`

`pip install ipywidgets`

This jobqueue version is important - 0.7.0 had a bug that affected passing in scheduler_options like the dashboard port

In [2]:
from cmldask import CMLDask
from dask.distributed import wait, as_completed, progress

In [2]:
client = CMLDask.new_dask_client_slurm("test_dask", "1GB")

Unique port for jrudoler is 51360
{'dashboard_address': ':51360'}
To view the dashboard, run: 
`ssh -fN jrudoler@rhino2.pysch.upenn.edu -L 8000:192.168.86.142:51360` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


In [3]:
import numpy as np
from time import sleep

def add(a, b):
    sleep(np.random.randint(0, 10))
    return a + b

def error_add(a, b):
    sleep(np.random.randint(0, 10))
    if a % 2:
        raise ValueError
    return a + b

In [4]:
futures = client.map(add, range(40), range(40))
progress(futures)

VBox()

In [5]:
results = client.gather(futures)
np.array(results)

array([ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
       34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66,
       68, 70, 72, 74, 76, 78])

Now with errors

In [6]:
futures = client.map(error_add, range(40), range(40))

Wait for all jobs to finish, check for errors

In [7]:
wait(futures)
errors = CMLDask.get_exceptions(futures, range(40))
errors

Unnamed: 0_level_0,param,exception,traceback_obj
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,1,ValueError(),<traceback object at 0x2adc15985a80>
3,3,ValueError(),<traceback object at 0x2adc15bdd380>
5,5,ValueError(),<traceback object at 0x2adc15b41800>
7,7,ValueError(),<traceback object at 0x2adc159951c0>
9,9,ValueError(),<traceback object at 0x2adc15a1d2c0>
11,11,ValueError(),<traceback object at 0x2adc15bd0540>
13,13,ValueError(),<traceback object at 0x2adc15b03380>
15,15,ValueError(),<traceback object at 0x2adc15b3ccc0>
17,17,ValueError(),<traceback object at 0x2adc15b56a40>
19,19,ValueError(),<traceback object at 0x2adc15b5dec0>


Pick out the index where you want to view the traceback message

In [8]:
CMLDask.print_traceback(errors, 5)

  File "/tmp/2714072.1.jupyter.q/ipykernel_33629/1260408039.py", line 11, in error_add
    raise ValueError


Notice gathering these doesn't work because there are errors

In [9]:
client.gather(futures)

ValueError: 

Instead, let's filter for successful ones

In [10]:
good_futures = CMLDask.filter_futures(futures)
client.gather(good_futures)

[0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48, 52, 56, 60, 64, 68, 72, 76]

## IMPORTANT: Shutdown your client (or restart your kernel, which will do so automatically)

In [11]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


Happy parallel computing!