In [1]:
from dask.distributed import Client, get_client
import numpy as np
import time
from scipy.interpolate import RegularGridInterpolator

We generate a few functions which we make artifically slow: 

In [2]:
def slow_function_for_sqrt(x: float, time_to_sleep: float =5) -> float:
    time.sleep(time_to_sleep)
    return np.sqrt(x)

def slow_function_for_add(x: float, y: float, time_to_sleep: float =5) -> float:
    time.sleep(time_to_sleep)
    return x + y

def slow_function_for_sub(x: float, y: float, time_to_sleep: float =5) -> float:
    time.sleep(time_to_sleep)
    return x - y

def slow_function_for_mul(x: float, y: float, time_to_sleep: float =5) -> float:
    time.sleep(time_to_sleep)
    return x * y

## Process jobs with `map`

In this scenario, we want to map a function over a list of inputs with dask. We can do this with the `map` method of the client. The `Client` class provides an instance of a local cluter to which jobs can be submitted. It also automatically spwans a status page which you can view in your browser. (`https://localhost:8787/status`).

In [3]:
client = Client()

The `map` function takes a function and a list of arguments. It returns a list of futures, which are objects that represent the result of the computation. The futures can be used to get the result of the computation, or to wait for the computation to finish.

In [4]:
results = client.map(slow_function_for_sqrt, [1, 2, 3])
results

[<Future: pending, key: slow_function_for_sqrt-4827e73542353c207e6923c087cf87b5>,
 <Future: pending, key: slow_function_for_sqrt-d3cde674842c0dcdb35485f164d0d744>,
 <Future: pending, key: slow_function_for_sqrt-d9b57d474a1081519b039fc997f97bf0>]

If we check again a brief moment later, we see that the computation has finished. We can get the result of the computation with the `result` method of the future.

In [6]:
results

[<Future: finished, type: numpy.float64, key: slow_function_for_sqrt-4827e73542353c207e6923c087cf87b5>,
 <Future: finished, type: numpy.float64, key: slow_function_for_sqrt-d3cde674842c0dcdb35485f164d0d744>,
 <Future: finished, type: numpy.float64, key: slow_function_for_sqrt-d9b57d474a1081519b039fc997f97bf0>]

We can retrieve the results of all the futures with the `gather` method of the client. This method takes a list of futures and returns a list of results.

In [7]:
client.gather(results)

[1.0, 1.4142135623730951, 1.7320508075688772]

## Process jobs with `submit`

The key difference of using `submit` instead of `map` is that `submit` returns a future immediately, while `map` returns a list of futures. This means that we can submit jobs to the cluster before all the previous jobs have finished. This is useful if we want to submit jobs to the cluster as soon as they are available.

*Note*: Some of the results immediately show up as `finished` here: Obviously, `dask` detects that we are using the same function and arguments multiple times and only submits the job once.

In [8]:
results = []
for x in range(10):
    results.append(client.submit(slow_function_for_sqrt, x))
results

[<Future: pending, key: slow_function_for_sqrt-d24f4a7f3bcec3d53cd819b65117791e>,
 <Future: finished, type: numpy.float64, key: slow_function_for_sqrt-4827e73542353c207e6923c087cf87b5>,
 <Future: finished, type: numpy.float64, key: slow_function_for_sqrt-d3cde674842c0dcdb35485f164d0d744>,
 <Future: finished, type: numpy.float64, key: slow_function_for_sqrt-d9b57d474a1081519b039fc997f97bf0>,
 <Future: pending, key: slow_function_for_sqrt-ab474183c25d3b7b0abbccb62a550dd7>,
 <Future: pending, key: slow_function_for_sqrt-c5ec33eacea3351318da9538aa8e2ba0>,
 <Future: pending, key: slow_function_for_sqrt-4f609e3b7340ada35ca483fd32f88970>,
 <Future: pending, key: slow_function_for_sqrt-13251970827e547145c30a0a810b2dba>,
 <Future: pending, key: slow_function_for_sqrt-dde763564b9f6bf61a68c23c5fc458e6>,
 <Future: pending, key: slow_function_for_sqrt-5226d671ad1f0d899d6bd01db9b5c7a5>]

This is super helpful if we want to pass the results of one computation to another computation. For example, we can use the results of the previous computation to compute the sum of the squares of the numbers.

In [9]:
results = []
for x in range(10):
    
    # calculate square root and square
    square_root = client.submit(slow_function_for_sqrt, x)
    square = client.submit(slow_function_for_mul, x, x)

    # calculate square root + square
    results.append(client.submit(slow_function_for_add, square_root, square))
results

[<Future: pending, key: slow_function_for_add-54fe4748df0218e325300b3d5cfa211a>,
 <Future: pending, key: slow_function_for_add-3f1e322be0116aa85d38c2ecbcd52648>,
 <Future: pending, key: slow_function_for_add-9c665519553c6b36ccc52415d7b5a68b>,
 <Future: pending, key: slow_function_for_add-a92a563c83173603a83973215f30b811>,
 <Future: pending, key: slow_function_for_add-f1471d0df2f606965f22cec45499c711>,
 <Future: pending, key: slow_function_for_add-f10c088c6cf9c7873efe34130f2830c5>,
 <Future: pending, key: slow_function_for_add-921c9e905fca9cb6a9bb508e3197d474>,
 <Future: pending, key: slow_function_for_add-5893cda9d4ce50192fe85cb68c2b811d>,
 <Future: pending, key: slow_function_for_add-8412d555081d8d1ebc78f157f4e7071f>,
 <Future: pending, key: slow_function_for_add-e4d42a85df022613ce669f8224f91b6d>]

## Catching errors

One intriguing feature of using dask to process big amounts of data, is that we can catch errors and continue processing the rest of the data. We can demonstrate this by passing some negative values to the functions we created above, among which the `sqrt` function will raise an error. To demonstrate this, we have to modify one of the functions above:

In [10]:
def slow_function_for_sqrt(x: float, time_to_sleep: float =5) -> float:
    time.sleep(time_to_sleep)
    if x < 0:
        raise ValueError("x must be positive")
    return np.sqrt(x)

In [12]:
results = []
for x in range(-2, 10):
    
    # calculate square root and square
    square_root = client.submit(slow_function_for_sqrt, x)
    square = client.submit(slow_function_for_mul, x, x)

    # calculate square root + square
    results.append(client.submit(slow_function_for_add, square_root, square))

In [14]:
results

[<Future: error, key: slow_function_for_add-d15ecd5f3e59bc81c64e8d8bd54bfcf5>,
 <Future: error, key: slow_function_for_add-f0e21b48504a6b412d640d6a046cbb31>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-757441b4c19ca7a4671e18f2f7804c27>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-de5787ab9e2771dec32a7e1bfe35a34d>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-f1f95fa54c21489b9c10157f2fbd61d8>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-508a80244946e066568a369448c355cc>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-048b854e8162512061216a24f7c41232>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-db8b30ca43a8ccfd9352c69db8672089>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-5a513bf5b05ac474b1f44eb84d627c82>,
 <Future: finished, type: numpy.float64, key: slow_function_for_add-7eea42c67d1f806d5a8fc9496b445951>,
 <Future: finished

## Process functions with multiple arguments

We can also process functions with multiple arguments with `map` and `submit`. To demonstrate this in a more practical scenario, we show how to interpolate an image with `scipy.interpolate.RegularGridInterpolator`. We first create a function which takes a list of coordinates and returns the interpolated values at those coordinates.

In [15]:
def interpolate_at_point(image: np.ndarray, coordinate: np.ndarray, interpolation_method='nearest') -> float:
    interpolator = RegularGridInterpolator((np.arange(1000), np.arange(1000)), image, method=interpolation_method)
    time.sleep(2)
    return interpolator(coordinate)

We also create some radom data: A `1000x1000` image with random values between `0` and `1` and a list of `300` random coordinates at which we want to interpolate the image.

In [16]:
image = np.random.random(size=(1000, 1000))
sampling_coordinates = np.random.random(size=(300, 2)) * 1000

With `client.submit`, we can submit the interpolation job to the cluster. To each call, we can pass a modified set of arguments and keyword arguments using the splash-operator (`*` and `**`):

`client.submit(function, *arguments, **keyword_arguments)`

In [17]:
results = []
for idx, coordinate in enumerate(sampling_coordinates):
    arguments = (image, coordinate)
    
    # use different interpolation methods for every second point
    if idx % 2 == 0:
        keyword_arguments = {'interpolation_method': 'nearest'}
    else:
        keyword_arguments = {'interpolation_method': 'linear'}

    results.append(client.submit(interpolate_at_point, *arguments, **keyword_arguments))

  [array([[0.36043388, 0.09493973, 0.18830925, ...,  ... 934.52816074])]
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


We can also use `client.map` to submit jobs with multiple arguments. Note that we don't need to repeat the keyword arguments n times; We can spceify them once and they will be passed to each call of the function.

In [18]:
# Copy the arguments and keyword arguments n times:
arguments = ([image] * len(sampling_coordinates), sampling_coordinates)
keyword_arguments = {'interpolation_method': 'nearest'}

# Submit the job
results = client.map(interpolate_at_point,  *arguments, **keyword_arguments)
results = client.gather(results)

## Scattering data

For this, we receive warnings that we should use `client.scatter`. This is the case if the data we want to pass to every job is relatively "large". We can use `client.scatter` to distribute the data to the workers rather than putting it into our local memory. This method returns a list of futures, which we can use to get the distributed data. The key difference between using `.scatter` or avoiding this is the following:

* With `scatter`: The data is distributed to the workers and the workers can access the data directly.
* Without `scatter`: The data is first loaded into the local memory of the client and afterwards distributed to the workers. Hence, the data needs to be loaded twice.

In [19]:
results = []
for coordinate in sampling_coordinates:
    image_future = client.scatter(image)
    results.append(client.submit(interpolate_at_point, image_future, coordinate))

## Sending jobs to a client from within a function

In some scenarios, we want to send jobs to a worker from a namespace where we do not have access to the `client`. In this case, we can retrieve the `client` with the `get_client` function (Example taken from [here](https://distributed.dask.org/en/stable/task-launch.html?highlight=delayed#dask-delayed))

In [20]:
def fibonacci(number_of_iterations: int):
    if number_of_iterations < 2:
        return number_of_iterations
    client = get_client()
    a_future = client.submit(fibonacci, number_of_iterations - 1)
    b_future = client.submit(fibonacci, number_of_iterations - 2)
    a, b = client.gather([a_future, b_future])
    time.sleep(5)
    return a + b

In [21]:
future = client.submit(fibonacci, 10)

In [None]:
client.gather(future)

55

