diff --git a/doc/dask.rst b/doc/dask.rst index 8fc0f655023..2d4beea4f70 100644 --- a/doc/dask.rst +++ b/doc/dask.rst @@ -100,6 +100,21 @@ Once you've manipulated a dask array, you can still write a dataset too big to fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the usual way. +.. ipython:: python + + ds.to_netcdf('manipulated-example-data.nc') + +By setting the ``compute`` argument to ``False``, :py:meth:`~xarray.Dataset.to_netcdf` +will return a dask delayed object that can be computed later. + +.. ipython:: python + + from dask.diagnostics import ProgressBar + # or distributed.progress when using the distributed scheduler + delayed_obj = ds.to_netcdf('manipulated-example-data.nc', compute=False) + with ProgressBar(): + results = delayed_obj.compute() + .. note:: When using dask's distributed scheduler to write NETCDF4 files, diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 0d9e75ba940..1b696c4486d 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -69,6 +69,19 @@ Enhancements - ``plot.line()`` learned new kwargs: ``xincrease``, ``yincrease`` that change the direction of the respective axes. By `Deepak Cherian `_. +- Added the ``parallel`` option to :py:func:`open_mfdataset`. This option uses + ``dask.delayed`` to parallelize the open and preprocessing steps within + ``open_mfdataset``. This is expected to provide performance improvements when + opening many files, particularly when used in conjunction with dask's + multiprocessing or distributed schedulers (:issue:`1981`). + By `Joe Hamman `_. + +- New ``compute`` option in :py:meth:`~xarray.Dataset.to_netcdf`, + :py:meth:`~xarray.Dataset.to_zarr`, and :py:func:`~xarray.save_mfdataset` to + allow for the lazy computation of netCDF and zarr stores. This feature is + currently only supported by the netCDF4 and zarr backends. (:issue:`1784`). + By `Joe Hamman `_. + Bug fixes ~~~~~~~~~ @@ -104,12 +117,6 @@ The minor release includes a number of bug-fixes and backwards compatible enhanc Enhancements ~~~~~~~~~~~~ -- Added the ``parallel`` option to :py:func:`open_mfdataset`. This option uses - ``dask.delayed`` to parallelize the open and preprocessing steps within - ``open_mfdataset``. This is expected to provide performance improvements when - opening many files, particularly when used in conjunction with dask's - multiprocessing or distributed schedulers (:issue:`1981`). - By `Joe Hamman `_. - :py:meth:`~xarray.DataArray.isin` and :py:meth:`~xarray.Dataset.isin` methods, which test each value in the array for whether it is contained in the supplied list, returning a bool array. See :ref:`selecting values with isin` diff --git a/xarray/backends/api.py b/xarray/backends/api.py index b8cfa3c926a..dec63a85d6e 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -144,6 +144,13 @@ def _get_lock(engine, scheduler, format, path_or_file): return lock +def _finalize_store(write, store): + """ Finalize this store by explicitly syncing and closing""" + del write # ensure writing is done first + store.sync() + store.close() + + def open_dataset(filename_or_obj, group=None, decode_cf=True, mask_and_scale=True, decode_times=True, autoclose=False, concat_characters=True, decode_coords=True, engine=None, @@ -620,7 +627,8 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT, def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None, - engine=None, writer=None, encoding=None, unlimited_dims=None): + engine=None, writer=None, encoding=None, unlimited_dims=None, + compute=True): """This function creates an appropriate datastore for writing a dataset to disk as a netCDF file @@ -680,19 +688,22 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None, unlimited_dims = dataset.encoding.get('unlimited_dims', None) try: dataset.dump_to_store(store, sync=sync, encoding=encoding, - unlimited_dims=unlimited_dims) + unlimited_dims=unlimited_dims, compute=compute) if path_or_file is None: return target.getvalue() finally: if sync and isinstance(path_or_file, basestring): store.close() + if not compute: + import dask + return dask.delayed(_finalize_store)(store.delayed_store, store) + if not sync: return store - def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, - engine=None): + engine=None, compute=True): """Write multiple datasets to disk as netCDF files simultaneously. This function is intended for use with datasets consisting of dask.array @@ -742,6 +753,9 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, default engine is chosen based on available dependencies, with a preference for 'netcdf4' if writing to a file on disk. See `Dataset.to_netcdf` for additional information. + compute: boolean + If true compute immediately, otherwise return a + ``dask.delayed.Delayed`` object that can be computed later. Examples -------- @@ -769,11 +783,17 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, 'datasets, paths and groups arguments to ' 'save_mfdataset') - writer = ArrayWriter() - stores = [to_netcdf(ds, path, mode, format, group, engine, writer) + writer = ArrayWriter() if compute else None + stores = [to_netcdf(ds, path, mode, format, group, engine, writer, + compute=compute) for ds, path, group in zip(datasets, paths, groups)] + + if not compute: + import dask + return dask.delayed(stores) + try: - writer.sync() + delayed = writer.sync(compute=compute) for store in stores: store.sync() finally: @@ -782,7 +802,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, - encoding=None): + encoding=None, compute=True): """This function creates an appropriate datastore for writing a dataset to a zarr ztore @@ -803,5 +823,9 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, # I think zarr stores should always be sync'd immediately # TODO: figure out how to properly handle unlimited_dims - dataset.dump_to_store(store, sync=True, encoding=encoding) + dataset.dump_to_store(store, sync=True, encoding=encoding, compute=compute) + + if not compute: + import dask + return dask.delayed(_finalize_store)(store.delayed_store, store) return store diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 7d8aa8446a2..2961838e85f 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -264,12 +264,15 @@ def add(self, source, target): else: target[...] = source - def sync(self): + def sync(self, compute=True): if self.sources: import dask.array as da - da.store(self.sources, self.targets, lock=self.lock) + delayed_store = da.store(self.sources, self.targets, + lock=self.lock, compute=compute, + flush=True) self.sources = [] self.targets = [] + return delayed_store class AbstractWritableDataStore(AbstractDataStore): @@ -277,6 +280,7 @@ def __init__(self, writer=None, lock=HDF5_LOCK): if writer is None: writer = ArrayWriter(lock=lock) self.writer = writer + self.delayed_store = None def encode(self, variables, attributes): """ @@ -318,11 +322,11 @@ def set_attribute(self, k, v): # pragma: no cover def set_variable(self, k, v): # pragma: no cover raise NotImplementedError - def sync(self): + def sync(self, compute=True): if self._isopen and self._autoclose: # datastore will be reopened during write self.close() - self.writer.sync() + self.delayed_store = self.writer.sync(compute=compute) def store_dataset(self, dataset): """ diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index d34fa2d9267..f9e2b3dece1 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -212,9 +212,12 @@ def prepare_variable(self, name, variable, check_encoding=False, return target, variable.data - def sync(self): + def sync(self, compute=True): + if not compute: + raise NotImplementedError( + 'compute=False is not supported for the h5netcdf backend yet') with self.ensure_open(autoclose=True): - super(H5NetCDFStore, self).sync() + super(H5NetCDFStore, self).sync(compute=compute) self.ds.sync() def close(self): diff --git a/xarray/backends/netCDF4_.py b/xarray/backends/netCDF4_.py index a0f6cbcdd33..14061a0fb08 100644 --- a/xarray/backends/netCDF4_.py +++ b/xarray/backends/netCDF4_.py @@ -439,9 +439,9 @@ def prepare_variable(self, name, variable, check_encoding=False, return target, variable.data - def sync(self): + def sync(self, compute=True): with self.ensure_open(autoclose=True): - super(NetCDF4DataStore, self).sync() + super(NetCDF4DataStore, self).sync(compute=compute) self.ds.sync() def close(self): diff --git a/xarray/backends/scipy_.py b/xarray/backends/scipy_.py index ee2c0fbf106..cd84431f6b7 100644 --- a/xarray/backends/scipy_.py +++ b/xarray/backends/scipy_.py @@ -219,9 +219,12 @@ def prepare_variable(self, name, variable, check_encoding=False, return target, data - def sync(self): + def sync(self, compute=True): + if not compute: + raise NotImplementedError( + 'compute=False is not supported for the scipy backend yet') with self.ensure_open(autoclose=True): - super(ScipyDataStore, self).sync() + super(ScipyDataStore, self).sync(compute=compute) self.ds.flush() def close(self): diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 83dcbd9a172..343690eaabd 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -342,8 +342,8 @@ def store(self, variables, attributes, *args, **kwargs): AbstractWritableDataStore.store(self, variables, attributes, *args, **kwargs) - def sync(self): - self.writer.sync() + def sync(self, compute=True): + self.delayed_store = self.writer.sync(compute=compute) def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index bdb2bf86990..a9ec8c16866 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -1055,7 +1055,7 @@ def reset_coords(self, names=None, drop=False, inplace=False): return obj def dump_to_store(self, store, encoder=None, sync=True, encoding=None, - unlimited_dims=None): + unlimited_dims=None, compute=True): """Store dataset contents to a backends.*DataStore object.""" if encoding is None: encoding = {} @@ -1074,10 +1074,11 @@ def dump_to_store(self, store, encoder=None, sync=True, encoding=None, store.store(variables, attrs, check_encoding, unlimited_dims=unlimited_dims) if sync: - store.sync() + store.sync(compute=compute) def to_netcdf(self, path=None, mode='w', format=None, group=None, - engine=None, encoding=None, unlimited_dims=None): + engine=None, encoding=None, unlimited_dims=None, + compute=True): """Write dataset contents to a netCDF file. Parameters @@ -1136,16 +1137,20 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None, By default, no dimensions are treated as unlimited dimensions. Note that unlimited_dims may also be set via ``dataset.encoding['unlimited_dims']``. + compute: boolean + If true compute immediately, otherwise return a + ``dask.delayed.Delayed`` object that can be computed later. """ if encoding is None: encoding = {} from ..backends.api import to_netcdf return to_netcdf(self, path, mode, format=format, group=group, engine=engine, encoding=encoding, - unlimited_dims=unlimited_dims) + unlimited_dims=unlimited_dims, + compute=compute) def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, - encoding=None): + encoding=None, compute=True): """Write dataset contents to a zarr group. .. note:: Experimental @@ -1167,6 +1172,9 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., ``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}`` + compute: boolean + If true compute immediately, otherwise return a + ``dask.delayed.Delayed`` object that can be computed later. """ if encoding is None: encoding = {} @@ -1176,7 +1184,7 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, "and 'w-'.") from ..backends.api import to_zarr return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer, - group=group, encoding=encoding) + group=group, encoding=encoding, compute=compute) def __unicode__(self): return formatting.dataset_repr(self) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 2d4e5c0f261..95d92cd8b8a 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -159,8 +159,8 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={}, # The save/open methods may be overwritten below def save(self, dataset, path, **kwargs): - dataset.to_netcdf(path, engine=self.engine, format=self.file_format, - **kwargs) + return dataset.to_netcdf(path, engine=self.engine, + format=self.file_format, **kwargs) @contextlib.contextmanager def open(self, path, **kwargs): @@ -709,7 +709,7 @@ def test_roundtrip_endian(self): # should still pass though. assert_identical(ds, actual) - if isinstance(self, NetCDF4DataTest): + if self.engine == 'netcdf4': ds['z'].encoding['endian'] = 'big' with pytest.raises(NotImplementedError): with self.roundtrip(ds) as actual: @@ -902,7 +902,8 @@ def test_open_group(self): open_dataset(tmp_file, group=(1, 2, 3)) def test_open_subgroup(self): - # Create a netCDF file with a dataset within a group within a group + # Create a netCDF file with a dataset stored within a group within a + # group with create_tmp_file() as tmp_file: rootgrp = nc4.Dataset(tmp_file, 'w') foogrp = rootgrp.createGroup('foo') @@ -1232,7 +1233,7 @@ def create_store(self): yield backends.ZarrStore.open_group(store_target, mode='w') def save(self, dataset, store_target, **kwargs): - dataset.to_zarr(store=store_target, **kwargs) + return dataset.to_zarr(store=store_target, **kwargs) @contextlib.contextmanager def open(self, store_target, **kwargs): @@ -1419,6 +1420,19 @@ def test_append_overwrite_values(self): def test_append_with_invalid_dim_raises(self): super(CFEncodedDataTest, self).test_append_with_invalid_dim_raises() + def test_to_zarr_compute_false_roundtrip(self): + from dask.delayed import Delayed + + original = create_test_data().chunk() + + with self.create_zarr_target() as store: + delayed_obj = self.save(original, store, compute=False) + assert isinstance(delayed_obj, Delayed) + delayed_obj.compute() + + with self.open(store) as actual: + assert_identical(original, actual) + @requires_zarr class ZarrDictStoreTest(BaseZarrTest, TestCase): @@ -2227,6 +2241,36 @@ def test_dataarray_compute(self): self.assertTrue(computed._in_memory) assert_allclose(actual, computed, decode_bytes=False) + def test_to_netcdf_compute_false_roundtrip(self): + from dask.delayed import Delayed + + original = create_test_data().chunk() + + with create_tmp_file() as tmp_file: + # dataset, path, **kwargs): + delayed_obj = self.save(original, tmp_file, compute=False) + assert isinstance(delayed_obj, Delayed) + delayed_obj.compute() + + with self.open(tmp_file) as actual: + assert_identical(original, actual) + + def test_save_mfdataset_compute_false_roundtrip(self): + from dask.delayed import Delayed + + original = Dataset({'foo': ('x', np.random.randn(10))}).chunk() + datasets = [original.isel(x=slice(5)), + original.isel(x=slice(5, 10))] + with create_tmp_file() as tmp1: + with create_tmp_file() as tmp2: + delayed_obj = save_mfdataset(datasets, [tmp1, tmp2], + engine=self.engine, compute=False) + assert isinstance(delayed_obj, Delayed) + delayed_obj.compute() + with open_mfdataset([tmp1, tmp2], + autoclose=self.autoclose) as actual: + assert_identical(actual, original) + class DaskTestAutocloseTrue(DaskTest): autoclose = True @@ -2348,7 +2392,7 @@ def open(self, path, **kwargs): yield ds def save(self, dataset, path, **kwargs): - dataset.to_netcdf(path, engine='scipy', **kwargs) + return dataset.to_netcdf(path, engine='scipy', **kwargs) def test_weakrefs(self): example = Dataset({'foo': ('x', np.arange(5.0))})