## ## Parallel workloads 

* [Parallel workload tutorial](https://examples.dask.org/applications/embarrassingly-parallel.html) 

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:37781  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 80  Memory: 540.94 GB


In [2]:
import time
import random

def costly_simulation(list_param):
    time.sleep(random.random())
    return sum(list_param)

In [3]:
%time costly_simulation([1, 2, 3, 4])

CPU times: user 8.81 ms, sys: 0 ns, total: 8.81 ms
Wall time: 285 ms


10

In [4]:
import pandas as pd
import numpy as np

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()

Unnamed: 0,param_a,param_b,param_c,param_d
0,0.538072,0.568526,0.670063,0.573377
1,0.051977,0.666182,0.091653,0.851108
2,0.268011,0.061663,0.946579,0.155039
3,0.152447,0.564689,0.787004,0.136737
4,0.161339,0.576635,0.485337,0.74411


In [5]:
results = []

In [6]:
%%time
for parameters in input_params.values[:10]:
    result = costly_simulation(parameters)
    results.append(result)

CPU times: user 127 ms, sys: 21.7 ms, total: 149 ms
Wall time: 5.42 s


In [7]:
results

[2.3500387624175945,
 1.6609192406609088,
 1.4312916802132318,
 1.6408775488406349,
 1.9674208961487865,
 1.8379310473587465,
 1.7325438304601861,
 1.629651763111184,
 1.1595187924746564,
 1.622432664988943]

## ## Use Dask Delayed to make our function lazy

In [8]:
import dask
lazy_results = []

In [9]:
%%time

for parameters in input_params.values[:10]:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

CPU times: user 3.11 ms, sys: 0 ns, total: 3.11 ms
Wall time: 2.37 ms


In [10]:
lazy_results[0]

Delayed('costly_simulation-fb683acc-d3ca-4d2a-b99d-45f32c90a0fa')

In [None]:
%time dask.compute(*lazy_results)

In [11]:
futures = dask.persist(*lazy_results)

In [12]:
futures

(Delayed('costly_simulation-fb683acc-d3ca-4d2a-b99d-45f32c90a0fa'),
 Delayed('costly_simulation-7d8cd53e-3f68-494f-a761-399c199fc5cb'),
 Delayed('costly_simulation-4884078f-2cab-485e-855c-cd2cc0802a68'),
 Delayed('costly_simulation-ecadca49-0adc-499c-aff0-87f63ceaaec4'),
 Delayed('costly_simulation-84f08f05-7eff-4f01-80d5-64548dbd5a21'),
 Delayed('costly_simulation-449d3f6c-249f-447a-af1b-5787f6208583'),
 Delayed('costly_simulation-a6d73315-2dcd-4c69-a2fc-6e231dda75d2'),
 Delayed('costly_simulation-a6a657fe-77cc-4c30-8384-96c7f2454bc0'),
 Delayed('costly_simulation-e29972f9-0b82-4349-8337-dbf806384934'),
 Delayed('costly_simulation-4e7779a9-0bbb-46a0-b092-fdc79b51819f'))

In [13]:
%time results_dask = dask.compute(*futures)

CPU times: user 5.44 ms, sys: 6.39 ms, total: 11.8 ms
Wall time: 11.2 ms


In [14]:
results_dask

(2.3500387624175945,
 1.6609192406609088,
 1.4312916802132318,
 1.6408775488406349,
 1.9674208961487865,
 1.8379310473587465,
 1.7325438304601861,
 1.629651763111184,
 1.1595187924746564,
 1.622432664988943)

## ## Futures API 

In [15]:
futures_api = []
for parameters in input_params.values[:10]:
    future_api = client.submit(costly_simulation, parameters)
    futures_api.append(future_api)

In [16]:
futures_api

[<Future: finished, type: numpy.float64, key: costly_simulation-c447264f6021516d3cd2df32e72fc2af>,
 <Future: finished, type: numpy.float64, key: costly_simulation-dd697ca1e35862a017f1c9a5cb6e7adf>,
 <Future: finished, type: numpy.float64, key: costly_simulation-ea2d72180e7df9de41c6677c95870118>,
 <Future: finished, type: numpy.float64, key: costly_simulation-dd84f51b0ab705ee6f3d603c8226616d>,
 <Future: finished, type: numpy.float64, key: costly_simulation-03b1154ad6236e4edc00d51a1397980e>,
 <Future: finished, type: numpy.float64, key: costly_simulation-f78b55ebd6d573f68d9ecdfa473baeda>,
 <Future: finished, type: numpy.float64, key: costly_simulation-84ec3f098d7b03b7fe2f64818e4b845c>,
 <Future: finished, type: numpy.float64, key: costly_simulation-5b75bf21aef79988d2391a8704ece11d>,
 <Future: finished, type: numpy.float64, key: costly_simulation-34aaf87355adb779cc5dd13cb189860e>,
 <Future: finished, type: numpy.float64, key: costly_simulation-0c6fe31b3f02f29e94732995b11e03c4>]

In [17]:
results_api = client.gather(futures_api)
results_api[:5]

[2.3500387624175945,
 1.6609192406609088,
 1.4312916802132318,
 1.6408775488406349,
 1.9674208961487865]