In [6]:
from dask_jobqueue import SLURMCluster
from distributed import Client

In [3]:
cluster = SLURMCluster(cores=1, memory='1GB')

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39220 instead
  http_address["port"], self.http_server.port


In [44]:
cluster.scale(2)

In [7]:
client = Client(cluster)

In [56]:
client

0,1
Client  Scheduler: tcp://192.33.123.23:36136  Dashboard: http://192.33.123.23:39220/status,Cluster  Workers: 2  Cores: 2  Memory: 2.00 GB


Use ```joblib``` with Dask option and see if it can handle nested parallelization and provide a single interface for both distributed and sequential computation

In [17]:
from joblib import parallel_backend, Parallel, delayed

In [10]:
from tasks import dummy_function

In [25]:
%%time
# Code incapsulated in parallel_backend is scheduled and waited for
# As can be seen in the dashboard, this is correctly spread over the cluster

with parallel_backend('dask', scheduler_host=cluster):
    results = Parallel()(delayed(dummy_function)(n) for n in range(8))

CPU times: user 1.01 s, sys: 80.1 ms, total: 1.09 s
Wall time: 20.1 s


In [27]:
results

[0, 1, 4, 9, 16, 25, 36, 49]

In [38]:
%%time
# This isn't parallelized over the cluster

results = Parallel()(delayed(dummy_function)(n) for n in range(8))

CPU times: user 2.07 s, sys: 150 ms, total: 2.22 s
Wall time: 40 s


In [42]:
%%time
# This is correctly parallelized LOCALLY

results = Parallel(n_jobs=8)(delayed(dummy_function)(n) for n in range(8))

CPU times: user 320 ms, sys: 130 ms, total: 451 ms
Wall time: 5.65 s


In [77]:
def nested_function(arg):
    with parallel_backend('dask', scheduler_host=cluster):
        results = Parallel()(delayed(dummy_function)(n) for n in range(arg))
    return results

def simple_nested_function(arg):
    return Parallel(n_jobs=arg)(delayed(dummy_function)(n) for n in range(arg))

def simple_nested_function_no_arg(arg):
    return Parallel()(delayed(dummy_function)(n) for n in range(arg))

In [82]:
%%time

with parallel_backend('dask', scheduler_host=cluster):
    results = Parallel()(delayed(simple_nested_function_no_arg)(n) for n in [4, 2])

CPU times: user 1.69 s, sys: 91.5 ms, total: 1.78 s
Wall time: 15.1 s


In [83]:
results

[[0, 1, 4, 9], [0, 1]]