Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a0bea98
move backend append logic to the prepare_variable methods
Dec 21, 2017
afdb254
deprecate variables/dimensions/attrs properties on AbstractWritableDa…
Dec 22, 2017
cc02150
warnings instead of errors for backend properties
Dec 24, 2017
86240cd
use attrs.update when setting zarr attributes
Dec 26, 2017
9c89ef2
more performance improvements to attributes in zarr backend
Dec 26, 2017
2568d21
Merge branch 'master' of github.com:pydata/xarray into fix/zarr_set_a…
Dec 28, 2017
d459c66
fix typo
Dec 28, 2017
c59ca57
Merge branch 'master' of github.com:pydata/xarray into fix/zarr_set_a…
Dec 28, 2017
2dd186a
Merge branch 'fix/zarr_set_attrs' of github.com:jhamman/xarray into f…
Dec 28, 2017
8f71b31
new set_dimensions method for writable data stores
Jan 2, 2018
07b9c21
Merge branch 'fix/zarr_set_attrs' of github.com:jhamman/xarray into f…
Jan 2, 2018
67fcd92
more fixes for zarr
Jan 2, 2018
b38e1a6
more tests for zarr and remove append logic for zarr
Jan 2, 2018
47ba8b6
more tests for zarr and remove append logic for zarr
Jan 2, 2018
9152b12
Merge branch 'fix/zarr_set_attrs' of github.com:jhamman/xarray into f…
Jan 2, 2018
26b6bcb
a few more tweaks to zarr attrs
Jan 2, 2018
b7681ae
Add encode methods to writable data stores, fixes for Zarr tests
Jan 4, 2018
e084e9e
fix for InMemoryDataStore
Jan 5, 2018
a6aeb36
fix for unlimited dimensions Scipy Datastores
Jan 5, 2018
264b13f
another patch for scipy
Jan 5, 2018
9c03bfc
whatsnew
Jan 6, 2018
4ba6e9c
initial commit returning dask futures from to_netcdf and to_zarr methods
Jan 7, 2018
c92020a
ordereddict
Jan 7, 2018
18434f9
address some of rabernats comments, in particular, this commit remove…
Jan 9, 2018
9f89c7c
stop skipping zero-dim zarr tests
Jan 9, 2018
3590d28
update minimum zarr version for tests
Jan 9, 2018
69cacee
Merge branch 'master' into fix/zarr_set_attrs
Jan 9, 2018
8d744e0
Merge branch 'master' into fix/zarr_set_attrs
Jan 9, 2018
a8dabdf
Merge branch 'master' of github.com:pydata/xarray into fix/zarr_set_a…
Jan 10, 2018
7858db7
Merge branch 'fix/zarr_set_attrs' of github.com:jhamman/xarray into f…
Jan 10, 2018
3d4e25f
Merge branch 'fix/zarr_set_attrs' of github.com:jhamman/xarray into f…
Jan 10, 2018
74c1976
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
Feb 19, 2018
1ab1815
cleanup a bit before adding tests
Feb 19, 2018
fd3af3e
tempoary checkin
Feb 21, 2018
4577cca
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
Mar 11, 2018
3ddf40b
cleanup implementation of compute=False for to_foo functions, still n…
Mar 11, 2018
5f7ded2
docs and more tests, failing tests on h5netcdf backend only
Mar 11, 2018
0a9f55e
skip h5netcdf/netcdf4 tests in certain places
Mar 11, 2018
6081da1
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
Mar 16, 2018
27aec1d
remove spurious returns
Mar 23, 2018
42891bc
finalize stores when compute=False
Mar 23, 2018
b32dfeb
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
Apr 19, 2018
b1e4a3a
more docs, skip h5netcdf netcdf tests, raise informative error for h5…
Apr 19, 2018
11344f7
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
Apr 20, 2018
8f4bce3
cleanup whats-new
Apr 20, 2018
1ffa9e0
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
May 12, 2018
0d08445
reorg dask task graph when using compute=False and save_mfdataset
May 12, 2018
7f427da
move compute_false tests to DaskTests class
May 14, 2018
9152dfb
small doc/style fixes
May 15, 2018
2eb4f53
save api.py
May 15, 2018
db46b87
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
May 16, 2018
75b2431
Merge branch 'master' of github.com:pydata/xarray into feature/store_…
May 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions doc/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ Once you've manipulated a dask array, you can still write a dataset too big to
fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the
usual way.

.. ipython:: python

ds.to_netcdf('manipulated-example-data.nc')

By setting the ``compute`` argument to ``False``, :py:meth:`~xarray.Dataset.to_netcdf`
will return a dask delayed object that can be computed later.

.. ipython:: python

from dask.diagnostics import ProgressBar
# or distributed.progress when using the distributed scheduler
delayed_obj = ds.to_netcdf('manipulated-example-data.nc', compute=False)
with ProgressBar():
results = delayed_obj.compute()

.. note::

When using dask's distributed scheduler to write NETCDF4 files,
Expand Down
19 changes: 13 additions & 6 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ Enhancements
- ``plot.line()`` learned new kwargs: ``xincrease``, ``yincrease`` that change the direction of the respective axes.
By `Deepak Cherian <https://github.com/dcherian>`_.

- Added the ``parallel`` option to :py:func:`open_mfdataset`. This option uses
``dask.delayed`` to parallelize the open and preprocessing steps within
``open_mfdataset``. This is expected to provide performance improvements when
opening many files, particularly when used in conjunction with dask's
multiprocessing or distributed schedulers (:issue:`1981`).
By `Joe Hamman <https://github.com/jhamman>`_.

- New ``compute`` option in :py:meth:`~xarray.Dataset.to_netcdf`,
:py:meth:`~xarray.Dataset.to_zarr`, and :py:func:`~xarray.save_mfdataset` to
allow for the lazy computation of netCDF and zarr stores. This feature is
currently only supported by the netCDF4 and zarr backends. (:issue:`1784`).
By `Joe Hamman <https://github.com/jhamman>`_.

Bug fixes
~~~~~~~~~

Expand Down Expand Up @@ -104,12 +117,6 @@ The minor release includes a number of bug-fixes and backwards compatible enhanc
Enhancements
~~~~~~~~~~~~

- Added the ``parallel`` option to :py:func:`open_mfdataset`. This option uses
``dask.delayed`` to parallelize the open and preprocessing steps within
``open_mfdataset``. This is expected to provide performance improvements when
opening many files, particularly when used in conjunction with dask's
multiprocessing or distributed schedulers (:issue:`1981`).
By `Joe Hamman <https://github.com/jhamman>`_.
- :py:meth:`~xarray.DataArray.isin` and :py:meth:`~xarray.Dataset.isin` methods,
which test each value in the array for whether it is contained in the
supplied list, returning a bool array. See :ref:`selecting values with isin`
Expand Down
42 changes: 33 additions & 9 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ def _get_lock(engine, scheduler, format, path_or_file):
return lock


def _finalize_store(write, store):
""" Finalize this store by explicitly syncing and closing"""
del write # ensure writing is done first
store.sync()
store.close()


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True, autoclose=False,
concat_characters=True, decode_coords=True, engine=None,
Expand Down Expand Up @@ -620,7 +627,8 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,


def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
engine=None, writer=None, encoding=None, unlimited_dims=None):
engine=None, writer=None, encoding=None, unlimited_dims=None,
compute=True):
"""This function creates an appropriate datastore for writing a dataset to
disk as a netCDF file

Expand Down Expand Up @@ -680,19 +688,22 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
unlimited_dims = dataset.encoding.get('unlimited_dims', None)
try:
dataset.dump_to_store(store, sync=sync, encoding=encoding,
unlimited_dims=unlimited_dims)
unlimited_dims=unlimited_dims, compute=compute)
if path_or_file is None:
return target.getvalue()
finally:
if sync and isinstance(path_or_file, basestring):
store.close()

if not compute:
import dask
return dask.delayed(_finalize_store)(store.delayed_store, store)

if not sync:
return store


def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
engine=None):
engine=None, compute=True):
"""Write multiple datasets to disk as netCDF files simultaneously.

This function is intended for use with datasets consisting of dask.array
Expand Down Expand Up @@ -742,6 +753,9 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
default engine is chosen based on available dependencies, with a
preference for 'netcdf4' if writing to a file on disk.
See `Dataset.to_netcdf` for additional information.
compute: boolean
If true compute immediately, otherwise return a
``dask.delayed.Delayed`` object that can be computed later.

Examples
--------
Expand Down Expand Up @@ -769,11 +783,17 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
'datasets, paths and groups arguments to '
'save_mfdataset')

writer = ArrayWriter()
stores = [to_netcdf(ds, path, mode, format, group, engine, writer)
writer = ArrayWriter() if compute else None
stores = [to_netcdf(ds, path, mode, format, group, engine, writer,
compute=compute)
for ds, path, group in zip(datasets, paths, groups)]

if not compute:
import dask
return dask.delayed(stores)

try:
writer.sync()
delayed = writer.sync(compute=compute)
for store in stores:
store.sync()
finally:
Expand All @@ -782,7 +802,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,


def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
encoding=None):
encoding=None, compute=True):
"""This function creates an appropriate datastore for writing a dataset to
a zarr ztore

Expand All @@ -803,5 +823,9 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,

# I think zarr stores should always be sync'd immediately
# TODO: figure out how to properly handle unlimited_dims
dataset.dump_to_store(store, sync=True, encoding=encoding)
dataset.dump_to_store(store, sync=True, encoding=encoding, compute=compute)

if not compute:
import dask
return dask.delayed(_finalize_store)(store.delayed_store, store)
return store
12 changes: 8 additions & 4 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,23 @@ def add(self, source, target):
else:
target[...] = source

def sync(self):
def sync(self, compute=True):
if self.sources:
import dask.array as da
da.store(self.sources, self.targets, lock=self.lock)
delayed_store = da.store(self.sources, self.targets,
lock=self.lock, compute=compute,
flush=True)
self.sources = []
self.targets = []
return delayed_store


class AbstractWritableDataStore(AbstractDataStore):
def __init__(self, writer=None, lock=HDF5_LOCK):
if writer is None:
writer = ArrayWriter(lock=lock)
self.writer = writer
self.delayed_store = None

def encode(self, variables, attributes):
"""
Expand Down Expand Up @@ -318,11 +322,11 @@ def set_attribute(self, k, v): # pragma: no cover
def set_variable(self, k, v): # pragma: no cover
raise NotImplementedError

def sync(self):
def sync(self, compute=True):
if self._isopen and self._autoclose:
# datastore will be reopened during write
self.close()
self.writer.sync()
self.delayed_store = self.writer.sync(compute=compute)

def store_dataset(self, dataset):
"""
Expand Down
7 changes: 5 additions & 2 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,12 @@ def prepare_variable(self, name, variable, check_encoding=False,

return target, variable.data

def sync(self):
def sync(self, compute=True):
if not compute:
raise NotImplementedError(
'compute=False is not supported for the h5netcdf backend yet')
with self.ensure_open(autoclose=True):
super(H5NetCDFStore, self).sync()
super(H5NetCDFStore, self).sync(compute=compute)
self.ds.sync()

def close(self):
Expand Down
4 changes: 2 additions & 2 deletions xarray/backends/netCDF4_.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ def prepare_variable(self, name, variable, check_encoding=False,

return target, variable.data

def sync(self):
def sync(self, compute=True):
with self.ensure_open(autoclose=True):
super(NetCDF4DataStore, self).sync()
super(NetCDF4DataStore, self).sync(compute=compute)
self.ds.sync()

def close(self):
Expand Down
7 changes: 5 additions & 2 deletions xarray/backends/scipy_.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ def prepare_variable(self, name, variable, check_encoding=False,

return target, data

def sync(self):
def sync(self, compute=True):
if not compute:
raise NotImplementedError(
'compute=False is not supported for the scipy backend yet')
with self.ensure_open(autoclose=True):
super(ScipyDataStore, self).sync()
super(ScipyDataStore, self).sync(compute=compute)
self.ds.flush()

def close(self):
Expand Down
4 changes: 2 additions & 2 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ def store(self, variables, attributes, *args, **kwargs):
AbstractWritableDataStore.store(self, variables, attributes,
*args, **kwargs)

def sync(self):
self.writer.sync()
def sync(self, compute=True):
self.delayed_store = self.writer.sync(compute=compute)


def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
Expand Down
20 changes: 14 additions & 6 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def reset_coords(self, names=None, drop=False, inplace=False):
return obj

def dump_to_store(self, store, encoder=None, sync=True, encoding=None,
unlimited_dims=None):
unlimited_dims=None, compute=True):
"""Store dataset contents to a backends.*DataStore object."""
if encoding is None:
encoding = {}
Expand All @@ -1074,10 +1074,11 @@ def dump_to_store(self, store, encoder=None, sync=True, encoding=None,
store.store(variables, attrs, check_encoding,
unlimited_dims=unlimited_dims)
if sync:
store.sync()
store.sync(compute=compute)

def to_netcdf(self, path=None, mode='w', format=None, group=None,
engine=None, encoding=None, unlimited_dims=None):
engine=None, encoding=None, unlimited_dims=None,
compute=True):
"""Write dataset contents to a netCDF file.

Parameters
Expand Down Expand Up @@ -1136,16 +1137,20 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None,
By default, no dimensions are treated as unlimited dimensions.
Note that unlimited_dims may also be set via
``dataset.encoding['unlimited_dims']``.
compute: boolean
If true compute immediately, otherwise return a
``dask.delayed.Delayed`` object that can be computed later.
"""
if encoding is None:
encoding = {}
from ..backends.api import to_netcdf
return to_netcdf(self, path, mode, format=format, group=group,
engine=engine, encoding=encoding,
unlimited_dims=unlimited_dims)
unlimited_dims=unlimited_dims,
compute=compute)

def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
encoding=None):
encoding=None, compute=True):
"""Write dataset contents to a zarr group.

.. note:: Experimental
Expand All @@ -1167,6 +1172,9 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
Nested dictionary with variable names as keys and dictionaries of
variable specific encodings as values, e.g.,
``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}``
compute: boolean
If true compute immediately, otherwise return a
``dask.delayed.Delayed`` object that can be computed later.
"""
if encoding is None:
encoding = {}
Expand All @@ -1176,7 +1184,7 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
"and 'w-'.")
from ..backends.api import to_zarr
return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
group=group, encoding=encoding)
group=group, encoding=encoding, compute=compute)

def __unicode__(self):
return formatting.dataset_repr(self)
Expand Down
Loading