In [1]:
# Imports for dask
from dask.distributed import Client
from dask import delayed

def run_cmd(cmd):
    import subprocess
    pp = subprocess.run(cmd,shell=True,stdout=subprocess.PIPE,stderr= subprocess.PIPE)
    return pp

client_with_foo = Client(processes = False,
    n_workers= 2,
    threads_per_worker=10,
    resources = {'foo':1}
               )

client = Client(processes = False,
    n_workers= 2,
    threads_per_worker=10,
               )


In [2]:
client


0,1
Client  Scheduler: inproc://137.187.30.241/31193/8  Dashboard: http://localhost:55199/status,Cluster  Workers: 2  Cores: 20  Memory: 34.36 GB


In [3]:
client_with_foo

0,1
Client  Scheduler: inproc://137.187.30.241/31193/1  Dashboard: http://localhost:8787/status,Cluster  Workers: 2  Cores: 20  Memory: 34.36 GB


I want to define a function that will take 2 seconds to run. This is a function that I'm going to pretend has resource needs that I can't easily control. For example a function that uses openmp internally and grabs every cpu it can find on its host. I have defined a resource foo that I can run computations with while restricting the resources used. I will run this with and without this magical resource specified. In this case it won't crash my laptop because it isn't really using the cpu for each thread it runs.

In [4]:
@delayed
def do_work(cmd=None, interval=2):
    import os
    import time
    time.sleep(interval)
    return os.getpid()

In [5]:
import time

In [6]:
task_graph = []
for i in range(10):
    task_graph.append(do_work())

In [7]:
task_graph

[Delayed('do_work-6c0435d3-e7df-45fd-a007-aaebf49989f5'),
 Delayed('do_work-2582e7dd-2fcd-49fb-a009-a3b1409a94d7'),
 Delayed('do_work-4b67cf7f-5ce3-4d09-8e87-d05e19277a13'),
 Delayed('do_work-9dbf8582-bbcb-46ed-ae6b-70aaed2f35d6'),
 Delayed('do_work-91be5834-e9b6-4e11-8a6d-e6ef54012987'),
 Delayed('do_work-b0bda2ba-db76-4088-bcc9-2a5af2c772cd'),
 Delayed('do_work-d2098536-4811-494e-a94a-a4f5153eea04'),
 Delayed('do_work-4e11d515-59dd-4a2a-8493-29a2098c7a31'),
 Delayed('do_work-17f48a79-cb3f-49c0-a3b1-2d08289fba1a'),
 Delayed('do_work-203aedd4-14b0-47cb-bf1a-0bfecd3788ca')]

# Not using foo

In [8]:
start = time.time()
result = client_with_foo.compute(task_graph)
output = client_with_foo.gather(result)
end = time.time()
print(end - start)

2.0266599655151367


In [9]:
start = time.time()
result = client.compute(task_graph)
output = client.gather(result)
end = time.time()
print(end - start)

2.0279300212860107


# Using foo

### And finally correctly using our foo

Ten 2-second tasks distributed across two workers takes 10 seconds to execute

In [10]:
start = time.time()
result = client_with_foo.compute(task_graph, resources = {'foo':1})
output = client_with_foo.gather(result)
end = time.time()
print(end - start)

10.064136981964111


### This does not result in a happy situtation

We hang our process with  the following:

In [11]:
start = time.time()
result = client.compute(task_graph, resources = {'foo':1})
output = client.gather(result)
end = time.time()
print(end - start)

KeyboardInterrupt: 