Skip to content

Commit

Permalink
Add options for zarr array definition (#48)
Browse files Browse the repository at this point in the history
* Pass options for zarr arrays #46

* Adding test for reserved options

* Rename option validation function

Co-authored-by: Eric Czech <eric@related.vc>
  • Loading branch information
eric-czech and rs-gh-sa committed Sep 21, 2020
1 parent 0eb27a1 commit 8917e20
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 16 deletions.
2 changes: 2 additions & 0 deletions rechunker/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def rechunking_plan(
Original chunk shape (must be in form (5, 10, 20), no irregular chunks)
target_chunks : Tuple
Target chunk shape (must be in form (5, 10, 20), no irregular chunks)
itemsize: int
Number of bytes used to represent a single array element
max_mem : Int
Maximum permissible chunk memory size, measured in units of itemsize
consolidate_reads: bool, optional
Expand Down
69 changes: 62 additions & 7 deletions rechunker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,17 @@ def _get_dims_from_zarr_array(z_array):
return z_array.attrs["_ARRAY_DIMENSIONS"]


def _zarr_empty(shape, store_or_group, chunks, dtype, name=None):
def _zarr_empty(shape, store_or_group, chunks, dtype, name=None, **kwargs):
# wrapper that maybe creates the array within a group
if name is not None:
assert isinstance(store_or_group, zarr.hierarchy.Group)
return store_or_group.empty(name, shape=shape, chunks=chunks, dtype=dtype)
return store_or_group.empty(
name, shape=shape, chunks=chunks, dtype=dtype, **kwargs
)
else:
return zarr.empty(shape, chunks=chunks, dtype=dtype, store=store_or_group)
return zarr.empty(
shape, chunks=chunks, dtype=dtype, store=store_or_group, **kwargs
)


def _get_executor(name: str) -> Executor:
Expand Down Expand Up @@ -178,7 +182,9 @@ def rechunk(
target_chunks,
max_mem,
target_store,
target_options=None,
temp_store=None,
temp_options=None,
executor: Union[str, Executor] = "dask",
) -> Rechunked:
"""
Expand Down Expand Up @@ -212,9 +218,17 @@ def rechunk(
target_store : str, MutableMapping, or zarr.Store object
The location in which to store the final, rechunked result.
Will be passed directly to :py:meth:`zarr.creation.create`
target_options: Dict, optional
Additional keyword arguments used to create target arrays.
See :py:meth:`zarr.creation.create` for arguments available.
Must not include any of [``shape``, ``chunks``, ``dtype``, ``store``].
temp_store : str, MutableMapping, or zarr.Store object, optional
Location of temporary store for intermediate data. Can be deleted
once rechunking is complete.
temp_options: Dict, optional
Additional keyword arguments used to create intermediate arrays.
See :py:meth:`zarr.creation.create` for arguments available.
Must not include any of [``shape``, ``chunks``, ``dtype``, ``store``].
executor: str or rechunker.types.Executor
Implementation of the execution engine for copying between zarr arrays.
Supplying a custom Executor is currently even more experimental than the
Expand All @@ -228,14 +242,26 @@ def rechunk(
if isinstance(executor, str):
executor = _get_executor(executor)
copy_spec, intermediate, target = _setup_rechunk(
source, target_chunks, max_mem, target_store, temp_store
source=source,
target_chunks=target_chunks,
max_mem=max_mem,
target_store=target_store,
target_options=target_options,
temp_store=temp_store,
temp_options=temp_options,
)
plan = executor.prepare_plan(copy_spec)
return Rechunked(executor, plan, source, intermediate, target)


def _setup_rechunk(
source, target_chunks, max_mem, target_store, temp_store=None,
source,
target_chunks,
max_mem,
target_store,
target_options=None,
temp_store=None,
temp_options=None,
):
if isinstance(source, zarr.hierarchy.Group):
if not isinstance(target_chunks, dict):
Expand All @@ -257,7 +283,9 @@ def _setup_rechunk(
array_target_chunks,
max_mem,
target_group,
target_options=target_options,
temp_store_or_group=temp_group,
temp_options=temp_options,
name=array_name,
)
copy_specs.append(copy_spec)
Expand All @@ -271,7 +299,9 @@ def _setup_rechunk(
target_chunks,
max_mem,
target_store,
target_options=target_options,
temp_store_or_group=temp_store,
temp_options=temp_options,
)
intermediate = copy_spec.intermediate.array
target = copy_spec.write.array
Expand All @@ -281,14 +311,29 @@ def _setup_rechunk(
raise ValueError("Source must be a Zarr Array or Group, or a Dask Array.")


def _validate_options(options):
if not options:
return
for k in ["shape", "chunks", "dtype", "store", "name"]:
if k in options:
raise ValueError(
f"Optional array arguments must not include {k} (provided {k}={options[k]}). "
"Values for this property are managed internally."
)


def _setup_array_rechunk(
source_array,
target_chunks,
max_mem,
target_store_or_group,
target_options=None,
temp_store_or_group=None,
temp_options=None,
name=None,
) -> CopySpec:
_validate_options(target_options)
_validate_options(temp_options)
shape = source_array.shape
source_chunks = (
source_array.chunksize
Expand Down Expand Up @@ -333,7 +378,12 @@ def _setup_array_rechunk(
write_chunks = tuple(int(x) for x in write_chunks)

target_array = _zarr_empty(
shape, target_store_or_group, target_chunks, dtype, name=name
shape,
target_store_or_group,
target_chunks,
dtype,
name=name,
**(target_options or {}),
)
try:
target_array.attrs.update(source_array.attrs)
Expand All @@ -346,7 +396,12 @@ def _setup_array_rechunk(
# do intermediate store
assert temp_store_or_group is not None
int_array = _zarr_empty(
shape, temp_store_or_group, int_chunks, dtype, name=name
shape,
temp_store_or_group,
int_chunks,
dtype,
name=name,
**(temp_options or {}),
)

read_proxy = ArrayProxy(source_array, read_chunks)
Expand Down
77 changes: 68 additions & 9 deletions tests/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import importlib
import pytest

from pathlib import Path
import zarr
import dask.array as dsa
import dask
Expand Down Expand Up @@ -186,25 +187,30 @@ def test_rechunk_group(tmp_path, executor):


@pytest.fixture(params=["Array", "Group"])
def rechunked(tmp_path, request):
def rechunked_fn(tmp_path, request):
if request.param == "Group":
store_source = str(tmp_path / "source.zarr")
group = zarr.group(store_source)
group.attrs["foo"] = "bar"
# 800 byte chunks
a = group.ones("a", shape=(5, 10, 20), chunks=(1, 10, 20), dtype="f4")
a.attrs["foo"] = "bar"
b = group.ones("b", shape=(20,), chunks=(10,), dtype="f4")
b = group.ones("b", shape=(8000,), chunks=(100,), dtype="f4")
b.attrs["foo"] = "bar"

target_store = str(tmp_path / "target.zarr")
temp_store = str(tmp_path / "temp.zarr")

max_mem = 1600 # should force a two-step plan for a
target_chunks = {"a": (5, 10, 4), "b": (20,)}
max_mem = 16000 # should force a two-step plan for b
target_chunks = {"a": (5, 10, 4), "b": (4000,)}

rechunked = api.rechunk(
group, target_chunks, max_mem, target_store, temp_store=temp_store
rechunked_fn = partial(
api.rechunk,
group,
target_chunks,
max_mem,
target_store,
temp_store=temp_store,
)
else:
shape = (8000, 8000)
Expand All @@ -227,10 +233,20 @@ def rechunked(tmp_path, request):
target_store = str(tmp_path / "target.zarr")
temp_store = str(tmp_path / "temp.zarr")

rechunked = api.rechunk(
source_array, target_chunks, max_mem, target_store, temp_store=temp_store
rechunked_fn = partial(
api.rechunk,
source_array,
target_chunks,
max_mem,
target_store,
temp_store=temp_store,
)
return rechunked
return rechunked_fn


@pytest.fixture()
def rechunked(rechunked_fn):
return rechunked_fn()


def test_repr(rechunked):
Expand All @@ -241,6 +257,49 @@ def test_repr(rechunked):
assert all(thing in repr_str for thing in ["Source", "Intermediate", "Target"])


def test_rechunk_option_overwrite(rechunked_fn):
rechunked_fn().execute()
# TODO: make this match more reliable based on outcome of
# https://github.com/zarr-developers/zarr-python/issues/605
with pytest.raises(ValueError, match=r"path .* contains an array"):
rechunked_fn().execute()
rechunked = rechunked_fn(
temp_options=dict(overwrite=True), target_options=dict(overwrite=True)
)
rechunked.execute()


def test_rechunk_option_compression(rechunked_fn):
def rechunk(compressor):
rechunked = rechunked_fn(
temp_options=dict(overwrite=True, compressor=compressor),
target_options=dict(overwrite=True, compressor=compressor),
)
rechunked.execute()
return sum(
file.stat().st_size
for file in Path(rechunked._target.store.path).rglob("*")
)

size_uncompressed = rechunk(None)
size_compressed = rechunk(
zarr.Blosc(cname="zstd", clevel=9, shuffle=zarr.Blosc.SHUFFLE)
)
assert size_compressed < size_uncompressed


def test_rechunk_reserved_option(rechunked_fn):
for o in ["shape", "chunks", "dtype", "store", "name"]:
with pytest.raises(
ValueError, match=f"Optional array arguments must not include {o}"
):
rechunked_fn(temp_options={o: True})
with pytest.raises(
ValueError, match=f"Optional array arguments must not include {o}"
):
rechunked_fn(target_options={o: True})


def test_repr_html(rechunked):
rechunked._repr_html_() # no exceptions

Expand Down

0 comments on commit 8917e20

Please sign in to comment.