From cbeb70d99f391b015724fa75411e6860ed097a54 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sun, 30 Aug 2015 16:53:06 +0200 Subject: [PATCH 1/2] Use deterministic names for dask arrays from open_dataset This will allow xray users to take advantage of dask's nascent support for caching intermediate results (https://github.com/blaze/dask/pull/502). For example: In [1]: import xray In [2]: from dask.diagnostics.cache import Cache In [3]: c = Cache(5e7) In [4]: c.register() In [5]: ds = xray.open_mfdataset('/Users/shoyer/data/era-interim/2t/2014-*.nc', engine='scipy') In [6]: %time ds.sum().load() CPU times: user 2.72 s, sys: 2.7 s, total: 5.41 s Wall time: 3.85 s Out[6]: Dimensions: () Coordinates: *empty* Data variables: t2m float64 5.338e+10 In [7]: %time ds.mean().load() CPU times: user 5.31 s, sys: 1.86 s, total: 7.17 s Wall time: 1.81 s Out[7]: Dimensions: () Coordinates: *empty* Data variables: t2m float64 279.0 In [8]: %time ds.mean().load() CPU times: user 7.73 ms, sys: 2.73 ms, total: 10.5 ms Wall time: 8.45 ms Out[8]: Dimensions: () Coordinates: *empty* Data variables: t2m float64 279.0 --- xray/backends/api.py | 29 +++++++++++++++++++++++------ xray/core/dataset.py | 12 ++++++++++-- xray/core/variable.py | 9 ++------- xray/test/test_backends.py | 13 +++++++++++++ xray/test/test_dask.py | 6 +++++- 5 files changed, 53 insertions(+), 16 deletions(-) diff --git a/xray/backends/api.py b/xray/backends/api.py index 7d7d33d8079..9b405cb6da1 100644 --- a/xray/backends/api.py +++ b/xray/backends/api.py @@ -1,5 +1,6 @@ import sys import gzip +import os.path import threading from glob import glob from io import BytesIO @@ -114,10 +115,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True, used when reading data from netCDF files with the netcdf4 and h5netcdf engines to avoid issues with concurrent access when using dask's multithreaded backend. - drop_variables: string or iterable, optional + drop_variables: string or iterable, optional A variable or list of variables to exclude from being parsed from the dataset. This may be useful to drop variables with problems or - inconsistent values. + inconsistent values. Returns ------- @@ -139,9 +140,25 @@ def maybe_decode_store(store, lock=False): store, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords, drop_variables=drop_variables) + if chunks is not None: - ds = ds.chunk(chunks, lock=lock) - return ds + from dask.base import tokenize + if isinstance(filename_or_obj, basestring): + file_arg = os.path.getmtime(filename_or_obj) + else: + file_arg = filename_or_obj + token = tokenize(file_arg, group, decode_cf, mask_and_scale, + decode_times, concat_characters, + decode_coords, engine, chunks, lock, + drop_variables) + name_prefix = '%s:%s/' % (filename_or_obj, group or '') + ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token, + lock=lock) + ds2._file_obj = ds._file_obj + else: + ds2 = ds + + return ds2 if isinstance(filename_or_obj, backends.AbstractDataStore): store = filename_or_obj @@ -252,11 +269,11 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, if not paths: raise IOError('no files to open') - datasets = [open_dataset(p, engine=engine, **kwargs) for p in paths] if lock is None: lock = _default_lock(paths[0], engine) + datasets = [open_dataset(p, engine=engine, chunks=chunks or {}, lock=lock, + **kwargs) for p in paths] file_objs = [ds._file_obj for ds in datasets] - datasets = [ds.chunk(chunks, lock=lock) for ds in datasets] if preprocess is not None: datasets = [preprocess(ds) for ds in datasets] diff --git a/xray/core/dataset.py b/xray/core/dataset.py index 6b340e43da2..cd0ec1050f1 100644 --- a/xray/core/dataset.py +++ b/xray/core/dataset.py @@ -880,7 +880,7 @@ def chunks(self): chunks.update(new_chunks) return Frozen(SortedKeysDict(chunks)) - def chunk(self, chunks=None, lock=False): + def chunk(self, chunks=None, name_prefix='xray-', token=None, lock=False): """Coerce all arrays in this dataset into dask arrays with the given chunks. @@ -896,6 +896,10 @@ def chunk(self, chunks=None, lock=False): chunks : int or dict, optional Chunk sizes along each dimension, e.g., ``5`` or ``{'x': 5, 'y': 5}``. + name_prefix : str, optional + Prefix for the name of any new dask arrays. + token : str, optional + Token uniquely identifying this dataset. lock : optional Passed on to :py:func:`dask.array.from_array`, if the array is not already as dask array. @@ -904,6 +908,8 @@ def chunk(self, chunks=None, lock=False): ------- chunked : xray.Dataset """ + from dask.base import tokenize + if isinstance(chunks, Number): chunks = dict.fromkeys(self.dims, chunks) @@ -923,7 +929,9 @@ def maybe_chunk(name, var, chunks): if not chunks: chunks = None if var.ndim > 0: - return var.chunk(chunks, name=name, lock=lock) + token2 = tokenize(name, token if token else var._data) + name2 = '%s%s-%s' % (name_prefix, name, token2) + return var.chunk(chunks, name=name2, lock=lock) else: return var diff --git a/xray/core/variable.py b/xray/core/variable.py index f2612f81de8..570fc1a13a9 100644 --- a/xray/core/variable.py +++ b/xray/core/variable.py @@ -413,7 +413,7 @@ def chunks(self): _array_counter = itertools.count() - def chunk(self, chunks=None, name='', lock=False): + def chunk(self, chunks=None, name=None, lock=False): """Coerce this array's data into a dask arrays with the given chunks. If this variable is a non-dask array, it will be converted to dask @@ -450,17 +450,12 @@ def chunk(self, chunks=None, name='', lock=False): chunks = self.chunks or self.shape data = self._data - if isinstance(data, dask_array_type): + if isinstance(data, da.Array): data = data.rechunk(chunks) else: - if name: - name += '_' - name = 'xray_%s%s' % (name, next(self._array_counter)) - if utils.is_dict_like(chunks): chunks = tuple(chunks.get(n, s) for n, s in enumerate(self.shape)) - data = da.from_array(data, chunks, name=name, lock=lock) return type(self)(self.dims, data, self._attrs, self._encoding, diff --git a/xray/test/test_backends.py b/xray/test/test_backends.py index 6f097813c61..aa874d24a1d 100644 --- a/xray/test/test_backends.py +++ b/xray/test/test_backends.py @@ -871,6 +871,19 @@ def test_dask_roundtrip(self): with open_dataset(tmp2) as on_disk: self.assertDatasetIdentical(data, on_disk) + def test_deterministic_names(self): + with create_tmp_file() as tmp: + data = create_test_data() + data.to_netcdf(tmp) + with open_mfdataset(tmp) as ds: + original_names = dict((k, v.data.name) for k, v in ds.items()) + with open_mfdataset(tmp) as ds: + repeat_names = dict((k, v.data.name) for k, v in ds.items()) + for var_name, dask_name in original_names.items(): + self.assertIn(var_name, dask_name) + self.assertIn(tmp, dask_name) + self.assertEqual(original_names, repeat_names) + @requires_scipy_or_netCDF4 @requires_pydap diff --git a/xray/test/test_dask.py b/xray/test/test_dask.py index 80ad5e54bfa..e5d43be46cb 100644 --- a/xray/test/test_dask.py +++ b/xray/test/test_dask.py @@ -178,10 +178,14 @@ def setUp(self): self.eager_array = DataArray(self.values, dims=('x', 'y'), name='foo') self.lazy_array = DataArray(self.data, dims=('x', 'y'), name='foo') - def test_chunk(self): + def test_rechunk(self): chunked = self.eager_array.chunk({'x': 2}).chunk({'y': 2}) self.assertEqual(chunked.chunks, ((2,) * 2, (2,) * 3)) + def test_new_chunk(self): + chunked = self.eager_array.chunk() + self.assertTrue(chunked.data.name.startswith('xray-foo-')) + def test_lazy_dataset(self): lazy_ds = Dataset({'foo': (('x', 'y'), self.data)}) self.assertIsInstance(lazy_ds.foo.variable.data, da.Array) From 2ff6ccee78f95dfe8cf74c7820a1a8ea1a84ee5f Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 5 Sep 2015 12:50:11 -0700 Subject: [PATCH 2/2] Document deterministic names --- doc/dask.rst | 20 ++++++++++---------- doc/whats-new.rst | 5 +++++ xray/backends/api.py | 7 ++++++- xray/core/dataset.py | 6 +++++- xray/core/pycompat.py | 1 + 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/doc/dask.rst b/doc/dask.rst index 6f78f020b1c..4f650c024e1 100644 --- a/doc/dask.rst +++ b/doc/dask.rst @@ -11,9 +11,9 @@ benefits of using dask are sufficiently strong that we expect that dask may become a requirement for a future version of xray. For a full example of how to use xray's dask integration, read the -`blog post introducing xray + dask`_. +`blog post introducing xray and dask`_. -.. _blog post introducing xray + dask: http://continuum.io/blog/xray-dask +.. _blog post introducing xray and dask: http://continuum.io/blog/xray-dask What is a dask array? --------------------- @@ -143,10 +143,10 @@ Explicit conversion by wrapping a DataArray with ``np.asarray`` also works: [ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00], ... -With the current versions of xray and dask, there is no automatic conversion -of eager numpy arrays to dask arrays, nor automatic alignment of chunks when -performing operations between dask arrays with different chunk sizes. You will -need to explicitly chunk each array to ensure compatibility. With xray, both +With the current version of dask, there is no automatic alignment of chunks when +performing operations between dask arrays with different chunk sizes. If your +computation involves multiple dask arrays with different chunks, you may need to +explicitly rechunk each array to ensure compatibility. With xray, both converting data to a dask arrays and converting the chunk sizes of dask arrays is done with the :py:meth:`~xray.Dataset.chunk` method: @@ -166,16 +166,16 @@ You can view the size of existing chunks on an array by viewing the rechunked.chunks -If there are not consistent chunksizes between all the ararys in a dataset +If there are not consistent chunksizes between all the arrays in a dataset along a particular dimension, an exception is raised when you try to access ``.chunks``. .. note:: In the future, we would like to enable automatic alignment of dask - chunksizes and automatic conversion of numpy arrays to dask (but not the - other way around). We might also require that all arrays in a dataset - share the same chunking alignment. None of these are currently done. + chunksizes (but not the other way around). We might also require that all + arrays in a dataset share the same chunking alignment. Neither of these + are currently done. NumPy ufuncs like ``np.sin`` currently only work on eagerly evaluated arrays (this will change with the next major NumPy release). We have provided diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 2e1b787e3d7..e9e85e8e329 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -12,6 +12,8 @@ What's New v0.6.1 ------ +The minimum required version of dask for use with xray is now version 0.6. + API Changes ~~~~~~~~~~~ @@ -28,6 +30,9 @@ Enhancements attributes to Dataset and DataArray (:issue:`553`). - More informative error message with :py:meth:`~xray.Dataset.from_dataframe` if the frame has duplicate columns. +- xray now uses deterministic names for dask arrays it creates or opens from + disk. This allows xray users to take advantage of dask's nascent support for + caching intermediate computation results. See :issue:`555` for an example. Bug fixes ~~~~~~~~~ diff --git a/xray/backends/api.py b/xray/backends/api.py index 9b405cb6da1..1c5f67dce1d 100644 --- a/xray/backends/api.py +++ b/xray/backends/api.py @@ -142,7 +142,12 @@ def maybe_decode_store(store, lock=False): drop_variables=drop_variables) if chunks is not None: - from dask.base import tokenize + try: + from dask.base import tokenize + except ImportError: + import dask # raise the usual error if dask is entirely missing + raise ImportError('xray requires dask version 0.6 or newer') + if isinstance(filename_or_obj, basestring): file_arg = os.path.getmtime(filename_or_obj) else: diff --git a/xray/core/dataset.py b/xray/core/dataset.py index cd0ec1050f1..eb199edc55b 100644 --- a/xray/core/dataset.py +++ b/xray/core/dataset.py @@ -908,7 +908,11 @@ def chunk(self, chunks=None, name_prefix='xray-', token=None, lock=False): ------- chunked : xray.Dataset """ - from dask.base import tokenize + try: + from dask.base import tokenize + except ImportError: + import dask # raise the usual error if dask is entirely missing + raise ImportError('xray requires dask version 0.6 or newer') if isinstance(chunks, Number): chunks = dict.fromkeys(self.dims, chunks) diff --git a/xray/core/pycompat.py b/xray/core/pycompat.py index e4dc4d0352b..aad2ed468e5 100644 --- a/xray/core/pycompat.py +++ b/xray/core/pycompat.py @@ -1,5 +1,6 @@ import numpy as np import sys +from distutils.version import LooseVersion PY3 = sys.version_info[0] >= 3