# Parallelizing Parcels with Dask

This will show how to distribute Parcels execution (using the Brownian motion example) to a dask cluster.

## Prerequisites

Install Parcels and dask:

```bash
conda create -n py3_parcels_dask -c conda-forge python=3 parcels dask distributed bokeh
```

## First, reproduce the brownian motion example

(Note that we pass the random seed as a keyword arg.)

In [1]:
from parcels import FieldSet, Field, ParticleSet, ScipyParticle, JITParticle, BrownianMotion2D
import numpy as np
from datetime import timedelta as delta
from parcels import random

In [2]:
ptype = {'scipy': ScipyParticle, 'jit': JITParticle}

In [3]:
def zeros_fieldset(xdim=2, ydim=2):
    """Generates a zero velocity field"""
    lon = np.linspace(-20, 20, xdim, dtype=np.float32)
    lat = np.linspace(-20, 20, ydim, dtype=np.float32)

    dimensions = {'lon': lon, 'lat': lat}
    data = {'U': np.zeros((ydim, xdim), dtype=np.float32),
            'V': np.zeros((ydim, xdim), dtype=np.float32)}
    return FieldSet.from_data(data, dimensions, mesh='spherical')

In [4]:
def run_brownian_example(mode='scipy', npart=3000, random_seed=12345, runtime=delta(days=1)):
    fieldset = zeros_fieldset()

    # Set diffusion constants.
    kh_zonal = 100
    kh_meridional = 100

    # Create field of Kh_zonal and Kh_meridional, using same grid as U
    grid = fieldset.U.grid
    fieldset.add_field(Field('Kh_zonal', kh_zonal * np.ones((2, 2)),
                             grid=grid))
    fieldset.add_field(Field('Kh_meridional', kh_meridional * np.ones((2, 2)),
                             grid=grid))

    # Set random seed
    random.seed(random_seed)
    
    pset = ParticleSet(fieldset=fieldset, pclass=ptype[mode],
                       lon=np.zeros(npart), lat=np.zeros(npart))
    pset.execute(pset.Kernel(BrownianMotion2D),
                 runtime=runtime, dt=delta(hours=1))

    return pset

Run the default setting once.

In [5]:
%%time
pset = run_brownian_example(mode='jit')

INFO: Compiled random ==> /tmp/parcels-1000/random.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/8bab2818c41e5b898bc8af669c861c89.so


CPU times: user 748 ms, sys: 28 ms, total: 776 ms
Wall time: 1.88 s


## Next step (no dask yet):  Create a list of realizations

Create a list of seeds and map `run_brownian_motion` to it.

In [6]:
seeds = np.arange(4)
print(seeds)

[0 1 2 3]


In [7]:
%%time
list_of_psets = list(map(lambda s: run_brownian_example(mode='jit', random_seed=s),
                         seeds))

INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/74d9a05b0d6153dad987f4ba8db53401.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/4efabbfec368a792a3d82d635b2bca44.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/330323f6a53c823ba595ac51dfb0cc89.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/c85aaaae4e4face49067612083956ad8.so


CPU times: user 2.25 s, sys: 40 ms, total: 2.29 s
Wall time: 5.85 s


## Now, execute Parcels for a list of realizations using a local dask cluster

In [8]:
import dask.bag as db
from dask.distributed import Client

In [9]:
client = Client(n_workers=2, threads_per_worker=2, processes=False)

In [10]:
client

0,1
Client  Scheduler: inproc://134.245.217.57/19523/1  Dashboard: http://localhost:8787/status,Cluster  Workers: 2  Cores: 4  Memory: 21.06 GB


_Make sure to **click on the link above!**_

In [11]:
seeds_bag = db.from_sequence(seeds, partition_size=1)
psets_jit_bag = db.map(lambda s: run_brownian_example(mode='jit', random_seed=s),
                       seeds_bag)

In [12]:
%%time
psets_jit = psets_jit_bag.compute()

INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/dee94dfa7edd708cf3eee30f83f87732.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/ebe079b32cad3e55cfbbf5a2a80c33f2.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/db1158fbc8444ce392fb212e0bbe7ffe.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/598b1ddd22d171d9594f803f5e0b48dc.so


CPU times: user 2.93 s, sys: 124 ms, total: 3.05 s
Wall time: 3.69 s


## Some longer integration

In [18]:
psets_jit_long_bag = db.map(lambda s: run_brownian_example(mode='jit', random_seed=s,
                                                           runtime=delta(days=10),
                                                           npart=int(1e5)),
                            seeds_bag)

In [19]:
%%time
psets_jit_long = psets_jit_long_bag.compute();

INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/6225d33ce72a0ac407d98209270dff3f.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/3c3b97f9a2d4e6cfdda9911f46fab3c9.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/df8d25e1c966f0fdc79fb89cc8e0e9f8.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/352ebd0c80d39bda328b771ca01ba454.so


CPU times: user 2min 44s, sys: 1min 6s, total: 3min 51s
Wall time: 2min 11s


In [20]:
%%time
pset = run_brownian_example(mode='jit', random_seed=0,
                            runtime=delta(days=10), npart=int(1e5));

INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/c18eedd1070e8f2be92728c96858b77b.so


CPU times: user 20.3 s, sys: 272 ms, total: 20.6 s
Wall time: 20.4 s


In [21]:
%%time
list_of_psets_long = list(map(lambda s: run_brownian_example(mode='jit', random_seed=s,
                                                             runtime=delta(days=10),
                                                             npart=int(1e5)),
                              seeds))

INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/52d6a1186ee413ffc8aa48dda45d74fe.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/2605570d8ae634bc878cedec40dd07b3.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/780253500d442926130d806a0d0e766c.so
INFO: Compiled JITParticleBrownianMotion2D ==> /tmp/parcels-1000/671a3cb47ad9a8e167875b5c56736508.so


CPU times: user 1min 20s, sys: 2.17 s, total: 1min 22s
Wall time: 1min 21s


## Discussion

- Note how we set `processes=False` in creation of the dask cluster.  Running different processes fails because the ctypes used in Parcels cannot be pickled.  (Test by re-running with `processes=True`.)

- For the brownian motion example, we cannot circumvent this pickling issue by using Scipy particles, because `parcels.random` seems to imply some compilation.

- Note that the approach outlined here could also be achived with Python's own `multiprocessing` toolbox.  But as there, inter-process communication is done by pickling name spaces as well, we're down to making sure that the C types can be pickled again.  On the up-side, getting Parcels to work with dask very likely also paves the way for many other (and still to come?) distributed computing tools that rely on some serialization for inter-worker communication.

## Next steps

- _**How does this scale?**_  — We'd probably like to allow for much longer run times and some machine larger than my desktop computer.

- _**How to split tasks across dask resources?**_ — The obvious way is to go for splitting a particle set across different workers and re-merge them after the execution.

## Boilerplate for the sake of (some) reproducibility

In [17]:
!conda list

# packages in environment at /home/wrath/miniconda3/envs/py3_parcels_dask:
#
appdirs                   1.4.3                      py_0    conda-forge
attrs                     17.4.0                     py_0    conda-forge
backcall                  0.1.0                      py_0    conda-forge
binutils_impl_linux-64    2.28.1               had2808c_3  
binutils_linux-64         7.2.0                        26  
bleach                    2.1.3                      py_0    conda-forge
bokeh                     0.12.15                  py36_0    conda-forge
bottleneck                1.2.1                    py36_1    conda-forge
bzip2                     1.0.6                         1    conda-forge
ca-certificates           2018.4.16                     0    conda-forge
cachetools                2.0.1                      py_0    conda-forge
certifi                   2018.4.16                py36_0    conda-forge
cgen                      2017.1                   py36_0    conda-forge
