Skip to content

Commit

Permalink
to_hdf5 method
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed May 8, 2015
1 parent 1435aab commit 58c6b37
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
58 changes: 29 additions & 29 deletions dask/array/core.py
Expand Up @@ -652,6 +652,34 @@ def __array__(self, dtype=None, **kwargs):
def store(self, target, **kwargs):
return store([self], [target], **kwargs)

def to_hdf5(self, filename, datapath, **kwargs):
""" Store array in HDF5 file
>>> x.to_hdf5('myfile.hdf5', '/x') # doctest: +SKIP
Optionally provide arguments as though to ``h5py.File.create_dataset``
>>> x.to_hdf5('myfile.hdf5', '/x', compression='lzf', shuffle=True) # doctest: +SKIP
See also:
da.store
h5py.File.create_dataset
"""
import h5py
with h5py.File(filename) as f:
if 'chunks' not in kwargs:
kwargs['chunks'] = tuple([c[0] for c in self.chunks])
d = f.require_dataset(datapath, shape=self.shape, dtype=self.dtype, **kwargs)

slices = slices_from_chunks(self.chunks)

name = next(names)
dsk = dict(((name,) + t[1:], (write_hdf5_chunk, filename, datapath, slc, t))
for t, slc in zip(core.flatten(self._keys()), slices))

myget = kwargs.get('get', get)
myget(merge(dsk, self.dask), list(dsk.keys()))

@wraps(compute)
def compute(self, **kwargs):
result, = compute(self, **kwargs)
Expand Down Expand Up @@ -1626,36 +1654,8 @@ def unique(x):
return np.unique(np.concatenate(parts))


def to_hdf5(x, fn, datapath, **kwargs):
import h5py
f = h5py.File(fn)
if 'chunks' not in kwargs:
kwargs['chunks'] = tuple([c[0] for c in x.chunks])
d = f.require_dataset(x.shape, x.dtype, **kwargs)

f.close()

locs = [[0] + list(accumulate(add, bl)) for bl in arr.chunks]

name = next(names)
dsk = dict(((name,

def write_hdf5_chunk(fn, datapath, index, data):
import h5py
with h5py.File(fn) as f:
d = f[datapath]
d[index] = data


def insert_to_ooc(out, arr):

locs = [[0] + list(accumulate(add, bl)) for bl in arr.chunks]

def store(x, *args):
with lock:
ind = tuple([slice(loc[i], loc[i+1]) for i, loc in zip(args, locs)])
out[ind] = np.asanyarray(x)
return None

name = 'store-%s' % arr.name
return dict(((name,) + t[1:], (store, t) + t[1:])
for t in core.flatten(arr._keys()))
2 changes: 1 addition & 1 deletion dask/array/tests/test_array_core.py
Expand Up @@ -8,7 +8,7 @@
import dask
import dask.array as da
from dask.array.core import *
from dask.utils import raises, ignoring
from dask.utils import raises, ignoring, tmpfile


inc = lambda x: x + 1
Expand Down

0 comments on commit 58c6b37

Please sign in to comment.