In [6]:
from dask.distributed import Client
from dask.diagnostics import ProgressBar
import dask
from dask_jobqueue import SLURMCluster

import time
import pandas as pd
import numpy as np

In [2]:
# set up the credentials and meta information of your Slurm environment.

cluster = SLURMCluster(cores=45,  # the maximum number of cores per node you want to use / can use. 
                       header_skip=['--mem'],
                       processes=1,      # The number of jobs you want to execute per node (leave this at 1 for now).
                       interface="ib0",  # make sure to set this if your cluster supports infiniband. Otherwise delete this line.
                       memory="100 GB",  # Maximum Memory per Node
                       project="machnitz",  # If you have a project that needs to be charged, this goes here (might be your username).
                       walltime="02:00:00",  # How long one task is expected to take. Try to keep as small as possible.
                       local_directory="/gpfs/work/machnitz/dask_storage",  # Important to set if you have a limited home directory / work directory.
                       job_extra=['--partition=pAll',   # Here you can sepcify any extra key words you want to pass to you cluster.
#                                   '--exclusive', 
                                 ])

# Let dask decide at every point how many slurm jobs it wants to start, by deploying an
# adjustable cluster:
cluster.adapt(maximum_jobs=4) 
client = Client(cluster)

In [10]:
# lets define a toy problem:
def costly_simulation(list_param):
    time.sleep(np.random.random())
    return sum(list_param)

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])

input_params.head()

Unnamed: 0,param_a,param_b,param_c,param_d
0,0.392774,0.734296,0.274932,0.770398
1,0.750989,0.633406,0.147591,0.083397
2,0.1496,0.436891,0.970219,0.418896
3,0.358504,0.117175,0.887545,0.681357
4,0.976891,0.78431,0.676401,0.60908


In [11]:
# use dask delayed api to calculate any costly function:
lazy_results = []

for parameters in input_params.values[:10]:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

See how the execution of the last cell did not take any time at all?

This is due to the fact, that nothing has been computed yet. Dask Only created a 
list of things it needs to do. To actually run the computation you have to trigger the
compute. Before you do this have a look at your slurm jobs (squeue -u YourUserName) and you will se that dask will 
create new jobs (maximum what we told it in maximum_jobs) to execute the given task. The execution can take a little
while, espetially if no nodes are available:

In [12]:
dask.compute(*lazy_results)

(2.172400481608721,
 1.6153827148033815,
 1.975606687342601,
 2.0445803546523846,
 3.046681081360251,
 1.699245619327113,
 1.571955574347839,
 1.712072235073311,
 1.7637077992949028,
 1.7472026203913116)