Skip to content

Commit

Permalink
Use deterministic names for dask arrays from open_dataset
Browse files Browse the repository at this point in the history
This will allow xray users to take advantage of dask's nascent support for
caching intermediate results (dask/dask#502).

For example:

	In [1]: import xray

	In [2]: from dask.diagnostics.cache import Cache

	In [3]: c = Cache(5e7)

	In [4]: c.register()

	In [5]: ds = xray.open_mfdataset('/Users/shoyer/data/era-interim/2t/2014-*.nc', engine='scipy')

	In [6]: %time ds.sum().load()
	CPU times: user 2.72 s, sys: 2.7 s, total: 5.41 s
	Wall time: 3.85 s
	Out[6]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 5.338e+10

	In [7]: %time ds.mean().load()
	CPU times: user 5.31 s, sys: 1.86 s, total: 7.17 s
	Wall time: 1.81 s
	Out[7]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 279.0

	In [8]: %time ds.mean().load()
	CPU times: user 7.73 ms, sys: 2.73 ms, total: 10.5 ms
	Wall time: 8.45 ms
	Out[8]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 279.0
  • Loading branch information
shoyer committed Aug 31, 2015
1 parent 8847ede commit a7b03ab
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 16 deletions.
29 changes: 23 additions & 6 deletions xray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import gzip
import os.path
import threading
from glob import glob
from io import BytesIO
Expand Down Expand Up @@ -114,10 +115,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
used when reading data from netCDF files with the netcdf4 and h5netcdf
engines to avoid issues with concurrent access when using dask's
multithreaded backend.
drop_variables: string or iterable, optional
drop_variables: string or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
inconsistent values.
Returns
-------
Expand All @@ -139,9 +140,25 @@ def maybe_decode_store(store, lock=False):
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
concat_characters=concat_characters, decode_coords=decode_coords,
drop_variables=drop_variables)

if chunks is not None:
ds = ds.chunk(chunks, lock=lock)
return ds
from dask.base import tokenize
if isinstance(filename_or_obj, basestring):
file_arg = os.path.getmtime(filename_or_obj)
else:
file_arg = filename_or_obj
token = tokenize(file_arg, group, decode_cf, mask_and_scale,
decode_times, concat_characters,
decode_coords, engine, chunks, lock,
drop_variables)
name_prefix = '%s:%s/' % (filename_or_obj, group or '')
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token,
lock=lock)
ds2._file_obj = ds._file_obj
else:
ds2 = ds

return ds2

if isinstance(filename_or_obj, backends.AbstractDataStore):
store = filename_or_obj
Expand Down Expand Up @@ -252,11 +269,11 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
if not paths:
raise IOError('no files to open')

datasets = [open_dataset(p, engine=engine, **kwargs) for p in paths]
if lock is None:
lock = _default_lock(paths[0], engine)
datasets = [open_dataset(p, engine=engine, chunks=chunks or {}, lock=lock,
**kwargs) for p in paths]
file_objs = [ds._file_obj for ds in datasets]
datasets = [ds.chunk(chunks, lock=lock) for ds in datasets]

if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]
Expand Down
12 changes: 10 additions & 2 deletions xray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ def chunks(self):
chunks.update(new_chunks)
return Frozen(SortedKeysDict(chunks))

def chunk(self, chunks=None, lock=False):
def chunk(self, chunks=None, name_prefix='xray-', token=None, lock=False):
"""Coerce all arrays in this dataset into dask arrays with the given
chunks.
Expand All @@ -896,6 +896,10 @@ def chunk(self, chunks=None, lock=False):
chunks : int or dict, optional
Chunk sizes along each dimension, e.g., ``5`` or
``{'x': 5, 'y': 5}``.
name_prefix : str, optional
Prefix for the name of any new dask arrays.
token : str, optional
Token uniquely identifying this dataset.
lock : optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
Expand All @@ -904,6 +908,8 @@ def chunk(self, chunks=None, lock=False):
-------
chunked : xray.Dataset
"""
from dask.base import tokenize

if isinstance(chunks, Number):
chunks = dict.fromkeys(self.dims, chunks)

Expand All @@ -923,7 +929,9 @@ def maybe_chunk(name, var, chunks):
if not chunks:
chunks = None
if var.ndim > 0:
return var.chunk(chunks, name=name, lock=lock)
token2 = tokenize(name, token if token else var._data)
name2 = '%s%s-%s' % (name_prefix, name, token2)
return var.chunk(chunks, name=name2, lock=lock)
else:
return var

Expand Down
9 changes: 2 additions & 7 deletions xray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def chunks(self):

_array_counter = itertools.count()

def chunk(self, chunks=None, name='', lock=False):
def chunk(self, chunks=None, name=None, lock=False):
"""Coerce this array's data into a dask arrays with the given chunks.
If this variable is a non-dask array, it will be converted to dask
Expand Down Expand Up @@ -450,17 +450,12 @@ def chunk(self, chunks=None, name='', lock=False):
chunks = self.chunks or self.shape

data = self._data
if isinstance(data, dask_array_type):
if isinstance(data, da.Array):
data = data.rechunk(chunks)
else:
if name:
name += '_'
name = 'xray_%s%s' % (name, next(self._array_counter))

if utils.is_dict_like(chunks):
chunks = tuple(chunks.get(n, s)
for n, s in enumerate(self.shape))

data = da.from_array(data, chunks, name=name, lock=lock)

return type(self)(self.dims, data, self._attrs, self._encoding,
Expand Down
13 changes: 13 additions & 0 deletions xray/test/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,19 @@ def test_dask_roundtrip(self):
with open_dataset(tmp2) as on_disk:
self.assertDatasetIdentical(data, on_disk)

def test_deterministic_names(self):
with create_tmp_file() as tmp:
data = create_test_data()
data.to_netcdf(tmp)
with open_mfdataset(tmp) as ds:
original_names = dict((k, v.data.name) for k, v in ds.items())
with open_mfdataset(tmp) as ds:
repeat_names = dict((k, v.data.name) for k, v in ds.items())
for var_name, dask_name in original_names.items():
self.assertIn(var_name, dask_name)
self.assertIn(tmp, dask_name)
self.assertEqual(original_names, repeat_names)


@requires_scipy_or_netCDF4
@requires_pydap
Expand Down
6 changes: 5 additions & 1 deletion xray/test/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,14 @@ def setUp(self):
self.eager_array = DataArray(self.values, dims=('x', 'y'), name='foo')
self.lazy_array = DataArray(self.data, dims=('x', 'y'), name='foo')

def test_chunk(self):
def test_rechunk(self):
chunked = self.eager_array.chunk({'x': 2}).chunk({'y': 2})
self.assertEqual(chunked.chunks, ((2,) * 2, (2,) * 3))

def test_new_chunk(self):
chunked = self.eager_array.chunk()
self.assertTrue(chunked.data.name.startswith('xray-foo-'))

def test_lazy_dataset(self):
lazy_ds = Dataset({'foo': (('x', 'y'), self.data)})
self.assertIsInstance(lazy_ds.foo.variable.data, da.Array)
Expand Down

0 comments on commit a7b03ab

Please sign in to comment.