Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use deterministic names for dask arrays from open_dataset #555

Merged
merged 2 commits into from
Sep 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions doc/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ benefits of using dask are sufficiently strong that we expect that dask may
become a requirement for a future version of xray.

For a full example of how to use xray's dask integration, read the
`blog post introducing xray + dask`_.
`blog post introducing xray and dask`_.

.. _blog post introducing xray + dask: http://continuum.io/blog/xray-dask
.. _blog post introducing xray and dask: http://continuum.io/blog/xray-dask

What is a dask array?
---------------------
Expand Down Expand Up @@ -143,10 +143,10 @@ Explicit conversion by wrapping a DataArray with ``np.asarray`` also works:
[ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00],
...

With the current versions of xray and dask, there is no automatic conversion
of eager numpy arrays to dask arrays, nor automatic alignment of chunks when
performing operations between dask arrays with different chunk sizes. You will
need to explicitly chunk each array to ensure compatibility. With xray, both
With the current version of dask, there is no automatic alignment of chunks when
performing operations between dask arrays with different chunk sizes. If your
computation involves multiple dask arrays with different chunks, you may need to
explicitly rechunk each array to ensure compatibility. With xray, both
converting data to a dask arrays and converting the chunk sizes of dask arrays
is done with the :py:meth:`~xray.Dataset.chunk` method:

Expand All @@ -166,16 +166,16 @@ You can view the size of existing chunks on an array by viewing the

rechunked.chunks

If there are not consistent chunksizes between all the ararys in a dataset
If there are not consistent chunksizes between all the arrays in a dataset
along a particular dimension, an exception is raised when you try to access
``.chunks``.

.. note::

In the future, we would like to enable automatic alignment of dask
chunksizes and automatic conversion of numpy arrays to dask (but not the
other way around). We might also require that all arrays in a dataset
share the same chunking alignment. None of these are currently done.
chunksizes (but not the other way around). We might also require that all
arrays in a dataset share the same chunking alignment. Neither of these
are currently done.

NumPy ufuncs like ``np.sin`` currently only work on eagerly evaluated arrays
(this will change with the next major NumPy release). We have provided
Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ What's New
v0.6.1
------

The minimum required version of dask for use with xray is now version 0.6.

API Changes
~~~~~~~~~~~

Expand All @@ -28,6 +30,9 @@ Enhancements
attributes to Dataset and DataArray (:issue:`553`).
- More informative error message with :py:meth:`~xray.Dataset.from_dataframe`
if the frame has duplicate columns.
- xray now uses deterministic names for dask arrays it creates or opens from
disk. This allows xray users to take advantage of dask's nascent support for
caching intermediate computation results. See :issue:`555` for an example.

Bug fixes
~~~~~~~~~
Expand Down
34 changes: 28 additions & 6 deletions xray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import gzip
import os.path
import threading
from glob import glob
from io import BytesIO
Expand Down Expand Up @@ -114,10 +115,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
used when reading data from netCDF files with the netcdf4 and h5netcdf
engines to avoid issues with concurrent access when using dask's
multithreaded backend.
drop_variables: string or iterable, optional
drop_variables: string or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
inconsistent values.

Returns
-------
Expand All @@ -139,9 +140,30 @@ def maybe_decode_store(store, lock=False):
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
concat_characters=concat_characters, decode_coords=decode_coords,
drop_variables=drop_variables)

if chunks is not None:
ds = ds.chunk(chunks, lock=lock)
return ds
try:
from dask.base import tokenize
except ImportError:
import dask # raise the usual error if dask is entirely missing
raise ImportError('xray requires dask version 0.6 or newer')

if isinstance(filename_or_obj, basestring):
file_arg = os.path.getmtime(filename_or_obj)
else:
file_arg = filename_or_obj
token = tokenize(file_arg, group, decode_cf, mask_and_scale,
decode_times, concat_characters,
decode_coords, engine, chunks, lock,
drop_variables)
name_prefix = '%s:%s/' % (filename_or_obj, group or '')
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token,
lock=lock)
ds2._file_obj = ds._file_obj
else:
ds2 = ds

return ds2

if isinstance(filename_or_obj, backends.AbstractDataStore):
store = filename_or_obj
Expand Down Expand Up @@ -252,11 +274,11 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
if not paths:
raise IOError('no files to open')

datasets = [open_dataset(p, engine=engine, **kwargs) for p in paths]
if lock is None:
lock = _default_lock(paths[0], engine)
datasets = [open_dataset(p, engine=engine, chunks=chunks or {}, lock=lock,
**kwargs) for p in paths]
file_objs = [ds._file_obj for ds in datasets]
datasets = [ds.chunk(chunks, lock=lock) for ds in datasets]

if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]
Expand Down
16 changes: 14 additions & 2 deletions xray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ def chunks(self):
chunks.update(new_chunks)
return Frozen(SortedKeysDict(chunks))

def chunk(self, chunks=None, lock=False):
def chunk(self, chunks=None, name_prefix='xray-', token=None, lock=False):
"""Coerce all arrays in this dataset into dask arrays with the given
chunks.

Expand All @@ -896,6 +896,10 @@ def chunk(self, chunks=None, lock=False):
chunks : int or dict, optional
Chunk sizes along each dimension, e.g., ``5`` or
``{'x': 5, 'y': 5}``.
name_prefix : str, optional
Prefix for the name of any new dask arrays.
token : str, optional
Token uniquely identifying this dataset.
lock : optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
Expand All @@ -904,6 +908,12 @@ def chunk(self, chunks=None, lock=False):
-------
chunked : xray.Dataset
"""
try:
from dask.base import tokenize
except ImportError:
import dask # raise the usual error if dask is entirely missing
raise ImportError('xray requires dask version 0.6 or newer')

if isinstance(chunks, Number):
chunks = dict.fromkeys(self.dims, chunks)

Expand All @@ -923,7 +933,9 @@ def maybe_chunk(name, var, chunks):
if not chunks:
chunks = None
if var.ndim > 0:
return var.chunk(chunks, name=name, lock=lock)
token2 = tokenize(name, token if token else var._data)
name2 = '%s%s-%s' % (name_prefix, name, token2)
return var.chunk(chunks, name=name2, lock=lock)
else:
return var

Expand Down
1 change: 1 addition & 0 deletions xray/core/pycompat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
import sys
from distutils.version import LooseVersion

PY3 = sys.version_info[0] >= 3

Expand Down
9 changes: 2 additions & 7 deletions xray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def chunks(self):

_array_counter = itertools.count()

def chunk(self, chunks=None, name='', lock=False):
def chunk(self, chunks=None, name=None, lock=False):
"""Coerce this array's data into a dask arrays with the given chunks.

If this variable is a non-dask array, it will be converted to dask
Expand Down Expand Up @@ -450,17 +450,12 @@ def chunk(self, chunks=None, name='', lock=False):
chunks = self.chunks or self.shape

data = self._data
if isinstance(data, dask_array_type):
if isinstance(data, da.Array):
data = data.rechunk(chunks)
else:
if name:
name += '_'
name = 'xray_%s%s' % (name, next(self._array_counter))

if utils.is_dict_like(chunks):
chunks = tuple(chunks.get(n, s)
for n, s in enumerate(self.shape))

data = da.from_array(data, chunks, name=name, lock=lock)

return type(self)(self.dims, data, self._attrs, self._encoding,
Expand Down
13 changes: 13 additions & 0 deletions xray/test/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,19 @@ def test_dask_roundtrip(self):
with open_dataset(tmp2) as on_disk:
self.assertDatasetIdentical(data, on_disk)

def test_deterministic_names(self):
with create_tmp_file() as tmp:
data = create_test_data()
data.to_netcdf(tmp)
with open_mfdataset(tmp) as ds:
original_names = dict((k, v.data.name) for k, v in ds.items())
with open_mfdataset(tmp) as ds:
repeat_names = dict((k, v.data.name) for k, v in ds.items())
for var_name, dask_name in original_names.items():
self.assertIn(var_name, dask_name)
self.assertIn(tmp, dask_name)
self.assertEqual(original_names, repeat_names)


@requires_scipy_or_netCDF4
@requires_pydap
Expand Down
6 changes: 5 additions & 1 deletion xray/test/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,14 @@ def setUp(self):
self.eager_array = DataArray(self.values, dims=('x', 'y'), name='foo')
self.lazy_array = DataArray(self.data, dims=('x', 'y'), name='foo')

def test_chunk(self):
def test_rechunk(self):
chunked = self.eager_array.chunk({'x': 2}).chunk({'y': 2})
self.assertEqual(chunked.chunks, ((2,) * 2, (2,) * 3))

def test_new_chunk(self):
chunked = self.eager_array.chunk()
self.assertTrue(chunked.data.name.startswith('xray-foo-'))

def test_lazy_dataset(self):
lazy_ds = Dataset({'foo': (('x', 'y'), self.data)})
self.assertIsInstance(lazy_ds.foo.variable.data, da.Array)
Expand Down