Skip to content

Commit

Permalink
Run model(s) in parallel using dask (#119)
Browse files Browse the repository at this point in the history
* add and document API

* implement run batch in parallel (+ tests)

* simplify model context manager

* fix pickle process classes

See
cloudpipe/cloudpickle#320
python-attrs/attrs#458

* fix zarr in-memory store and dask processes

* disable dask parallel schedulers on CI

Is it really supported?

* get a dask lock for create / resize zarr datasets

* clean-up

* add test for DummyLock

* run model processes in parallel + more docstrings

* update release notes

* docstrings tweaks

* doc: add run parallel section
  • Loading branch information
benbovy committed Apr 6, 2020
1 parent 2c963f8 commit 32bee7b
Show file tree
Hide file tree
Showing 20 changed files with 417 additions and 139 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ jobs:
shell: bash -l {0}
run: pytest xsimlab --cov=xsimlab --cov-report term-missing --cov-report xml --verbose --color=yes
continue-on-error: ${{ matrix.cfg.allow-failure }}
env:
DASK_SINGLE_THREADED: true
- name: Codecov
if: matrix.cfg.codecov
uses: codecov/codecov-action@v1
1 change: 1 addition & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
"xarray": ("https://xarray.pydata.org/en/stable/", None),
"zarr": ("https://zarr.readthedocs.io/en/stable/", None),
"dask": ("https://docs.dask.org/en/latest/", None),
}


Expand Down
38 changes: 12 additions & 26 deletions doc/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,37 +53,23 @@ components (i.e., Python classes) together form.
Does xarray-simlab support running model(s) in parallel?
--------------------------------------------------------

There is currently no support for model execution in parallel but it
is a top priority for the next releases!
Yes! Three levels of parallelism are possible:

Three levels of parallelism are possible:

- "inter-model" parallelism, i.e., execution of multiple model runs in
- "multi-models" parallelism, i.e., execution of multiple model runs in
parallel,
- "inter-process" parallelism, i.e., execution of multiple processes of
- "single-model" parallelism, i.e., execution of multiple processes of
a model in parallel,
- "intra-process" parallelism, i.e., parallel execution of some code
- "user-specific" parallelism, i.e., parallel execution of some code
written in one or more processes.

Note that the notion of process used above is different from
multiprocessing: a process here corresponds to a component of a model
(see Section :ref:`framework`).

The first level "inter-model" is an embarrassingly parallel problem.
Next versions of xarray-simlab will allow to very easily run
simulations in parallel (e.g., for sensitivity analyses).

It shouldn't be hard to add support for the second level
"inter-process" given that processes in a model together form a
directed acyclic graph. However, those processes usually perform most
of their computation on shared data, which may significantly reduce
the gain of parallel execution when using multiple OS processes or in
distributed environments. Using multiple threads is limited by the
CPython's GIL, unless it is released by the code executed in model
processes.

The third level "intra-process" is more domain specific. Users are
free to develop xarray-simlab compatible models with custom code (in
Note that the notion of process used above is different from multiprocessing: a
process here corresponds to a component of a model. See Section
:ref:`framework`.

For the first two levels, see Section :ref:`run_parallel`.

The third level "user-specific" is not part the xarray-simlab framework. Users
are free to develop xarray-simlab compatible models with custom code (in
processes) that is executed either sequentially or in parallel.

Is it possible to use xarray-simlab without xarray?
Expand Down
5 changes: 2 additions & 3 deletions doc/framework.rst
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,8 @@ computationally consistent can then be obtained using topological
sorting. This is done at Model object creation. The same ordering is
used at every stage of a model run.

In principle, the DAG structure would also allow running the processes
in parallel at every stage of a model run. This is not yet
implemented, though.
The DAG structure also allows running the processes in parallel at every stage
of a model run, see Section :ref:`run_parallel_single`.

Model inputs
------------
Expand Down
2 changes: 2 additions & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Documentation index
* :doc:`inspect_model`
* :doc:`run_model`
* :doc:`io_storage`
* :doc:`run_parallel`
* :doc:`monitor`
* :doc:`testing`

Expand All @@ -49,6 +50,7 @@ Documentation index
inspect_model
run_model
io_storage
run_parallel
monitor
testing

Expand Down
6 changes: 2 additions & 4 deletions doc/installing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ Required dependencies
- `numpy <http://www.numpy.org/>`__
- `xarray <http://xarray.pydata.org>`__ (0.10.0 or later)
- `zarr <https://zarr.readthedocs.io>`__ (2.3.0 or later)
- `dask <https://docs.dask.org>`__

Optional dependencies
---------------------

For model visualization
~~~~~~~~~~~~~~~~~~~~~~~

- `graphviz <http://graphviz.readthedocs.io>`__
- `graphviz <http://graphviz.readthedocs.io>`__ (for model visualization)

Install using conda
-------------------
Expand Down
4 changes: 4 additions & 0 deletions doc/io_storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ supported by xarray, e.g.,
out_ds.to_netcdf("advect_model_run.nc")
.. _io_storage_zarr:

Using zarr
----------

Expand Down Expand Up @@ -202,6 +204,8 @@ to the xarray Dataset or DataArray :meth:`~xarray.Dataset.stack`,
...: .dropna('particles')
...: .to_dataframe())

.. _io_storage_encoding:

Encoding options
~~~~~~~~~~~~~~~~

Expand Down
6 changes: 5 additions & 1 deletion doc/run_model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,17 @@ time at a fixed rate:
@savefig run_advect_model_time.png width=100%
out_ds5.profile__u.plot(col='otime', figsize=(9, 3));
.. _run_batch:

Run multiple simulations
------------------------

Besides a time dimension, model inputs may also accept another extra dimension
that is used to run batches of simulations. This is very convenient for
sensitivity analyses: the inputs and results from all simulations are neatly
combined into one xarray Dataset object.
combined into one xarray Dataset object. Another advantage is that those
simulations can be run in parallel easily, see Section
:ref:`run_parallel_multi`.

.. note::

Expand Down
109 changes: 109 additions & 0 deletions doc/run_parallel.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
.. _run_parallel:

Run Model(s) in Parallel
========================

xarray-simlab allows running one or more models in parallel via the Dask_
library. There are two parallel modes:

- run one simulation in parallel (single-model parallelism)
- run a batch of simulations in parallel (multi-model parallelism)

.. warning::

This is an experimental feature.

.. note::

Dask is a versatile library that provides many ways of executing tasks in
parallel (i.e., threads vs. processes, single machine vs. distributed
environments). xarray-simlab lets you choose which alternative best suits
your needs. Beware, however, that not all alternatives are optimal or
supported depending on your case. More details below.

.. _Dask: https://docs.dask.org/en/latest/

.. _run_parallel_single:

Single-model parallelism
------------------------

This mode runs each process in a model in parallel.

A :class:`~xsimlab.Model` object can be viewed as a Directed Acyclic Graph (DAG)
built from a collection of processes (i.e., process-decorated classes). At each
simulation stage, a task graph is built from this graph, which is then executed
by one of the schedulers available in Dask.

To activate this parallel mode, simply set ``parallel=True`` when calling
:func:`xarray.Dataset.xsimlab.run`:

.. code:: python
>>> in_ds.xsimlab.run(model=my_model, parallel=True)
The default Dask scheduler used here is ``"threads"`` (this is the one used by
``dask.delayed``). Other schedulers may be selected via the ``scheduler``
argument of :func:`~xarray.Dataset.xsimlab.run`. Dask also supports other ways to
select a scheduler, see `here
<https://docs.dask.org/en/latest/setup/single-machine.html>`_.

Note, however, that multi-processes schedulers are not supported for this mode,
since simulation active data (shared between all model components) is stored
using a simple Python dictionary.

Note also that the code in the process-decorated classes must be thread-safe
and should release the CPython's Global Interpreter Lock (GIL) as much as
possible in order to see a gain in performance. For example, most Numpy
functions release the GIL.

The gain in performance compared to sequential execution of the model processes
will also depend on how the DAG is structured, i.e., how many processes can be
executed in parallel.

.. _run_parallel_multi:

Multi-models parallelism
------------------------

This mode runs multiple simulations in parallel, using the same model but
different input values.

.. note::

This mode should scale well from a few dozen to a few thousand of
simulations but it has not been tested yet beyond that level.

.. note::

It may not work well with dynamic-sized arrays.

This parallel mode is automatically selected when a batch dimension label is set
while calling :func:`xarray.Dataset.xsimlab.run` (see Section
:ref:`run_batch`). You still need to explicitly set ``Parallel=True``:

.. code:: python
>>> in_ds.xsimlab.run(model=my_model, batch_dim="batch", parallel=True)
As opposed to single-model parallelism, both multi-threads and multi-processes
Dask schedulers are supported for this embarrassingly parallel problem.

If you use a multi-threads scheduler, the same precautions apply regarding
thread-safety and CPython's GIL.

If you use a multi-processes scheduler, beware of the following:

- The code in the process-decorated classes must be serializable.
- Not all Zarr stores are supported for model outputs, see `Zarr's documentation
<https://zarr.readthedocs.io/en/stable/api/storage.html>`_. For example, the
default in-memory store is not supported. See Section :ref:`io_storage_zarr`
on how to specify an alternative store.
- By default, the chunk size of Zarr datasets along the batch dimension is equal
to 1 in order to prevent race conditions during parallel writes. This might
not be optimal for further post-processing, though. It is possible to override
this default and set larger chunk sizes (via the ``encoding`` parameter of
:func:`~xarray.Dataset.xsimlab.run`), but then you should also use one of the
Zarr's synchronizers (either :class:`zarr.sync.ThreadSynchronizer` or
:class:`zarr.sync.ProcessSynchronizer`) to ensure that all output values will
be properly saved.
1 change: 1 addition & 0 deletions doc/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Enhancements
parameter of :func:`xarray.Dataset.xsimlab.run` (:issue:`115`).
- Added 'object' variables :func:`~xsimlab.any_object` for sharing arbitrary
Python objects between processes (:issue:`118`).
- Run one or multiple simulations in parallel using Dask (:issue:`119`).

Bug fixes
~~~~~~~~~
Expand Down
64 changes: 43 additions & 21 deletions xsimlab/drivers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import copy
from enum import Enum
from typing import Any, Iterator, Mapping

import attr
import dask
import pandas as pd

from .hook import flatten_hooks, group_hooks, RuntimeHook
Expand Down Expand Up @@ -200,6 +199,8 @@ def __init__(
check_dims=CheckDimsOption.STRICT,
validate=ValidateOption.INPUTS,
hooks=None,
parallel=False,
scheduler=None,
):
# these are not yet supported with zarr
self.dataset, self.multi_indexes = _reset_multi_indexes(dataset)
Expand Down Expand Up @@ -228,8 +229,21 @@ def __init__(
hooks = []
self.hooks = _get_all_active_hooks(hooks)

self.parallel = parallel
self.scheduler = scheduler

if parallel:
lock = dask.utils.get_scheduler_lock(scheduler=scheduler)
else:
lock = None

self.store = ZarrSimulationStore(
self.dataset, model, zobject=store, encoding=encoding, batch_dim=batch_dim
self.dataset,
model,
zobject=store,
encoding=encoding,
batch_dim=batch_dim,
lock=lock,
)

def _maybe_transpose(self, xr_var, p_name, var_name):
Expand Down Expand Up @@ -321,18 +335,28 @@ def run_model(self):

if self.batch_dim is None:
model = self.model
self._run_one_model(self.dataset, model)
self._run_one_model(self.dataset, model, parallel=self.parallel)

else:
ds_gby_batch = self.dataset.groupby(self.batch_dim)
futures = []

for batch, (_, ds_batch) in enumerate(ds_gby_batch):
model = self.model.clone()
self._run_one_model(ds_batch, model, batch=batch)

if self.parallel:
futures.append(
dask.delayed(self._run_one_model)(ds_batch, model, batch=batch)
)
else:
self._run_one_model(ds_batch, model, batch=batch)

if self.parallel:
dask.compute(futures, scheduler=self.scheduler)

self.store.write_index_vars(model=model)

def _run_one_model(self, dataset, model, batch=-1):
def _run_one_model(self, dataset, model, batch=-1, parallel=False):
"""Run one simulation.
- Set model inputs from the input Dataset (update
Expand All @@ -345,7 +369,7 @@ def _run_one_model(self, dataset, model, batch=-1):

validate_all = self._validate_option is ValidateOption.ALL

runtime_context = RuntimeContext(
rt_context = RuntimeContext(
batch_size=self.batch_size,
batch=batch,
sim_start=ds_init["_sim_start"].values,
Expand All @@ -357,13 +381,18 @@ def _run_one_model(self, dataset, model, batch=-1):
model.set_inputs(in_vars, ignore_static=True)
self._maybe_validate_inputs(model, in_vars)

model.execute(
"initialize", runtime_context, hooks=self.hooks, validate=validate_all,
)
execute_kwargs = {
"hooks": self.hooks,
"validate": validate_all,
"parallel": parallel,
"scheduler": self.scheduler,
}

model.execute("initialize", rt_context, **execute_kwargs)

for step, (_, ds_step) in enumerate(ds_gby_steps):

runtime_context.update(
rt_context.update(
step=step,
step_start=ds_step["_clock_start"].values,
step_end=ds_step["_clock_end"].values,
Expand All @@ -374,19 +403,12 @@ def _run_one_model(self, dataset, model, batch=-1):
model.set_inputs(in_vars, ignore_static=False)
self._maybe_validate_inputs(model, in_vars)

model.execute(
"run_step", runtime_context, hooks=self.hooks, validate=validate_all,
)
model.execute("run_step", rt_context, **execute_kwargs)

self.store.write_output_vars(batch, step, model=model)

model.execute(
"finalize_step",
runtime_context,
hooks=self.hooks,
validate=validate_all,
)
model.execute("finalize_step", rt_context, **execute_kwargs)

self.store.write_output_vars(batch, -1, model=model)

model.execute("finalize", runtime_context, hooks=self.hooks)
model.execute("finalize", rt_context, **execute_kwargs)

0 comments on commit 32bee7b

Please sign in to comment.