Skip to content

Commit

Permalink
Check for aligned chunks when writing to existing variables (#8459)
Browse files Browse the repository at this point in the history
* Check for aligned chunks when writing to existing variables

* Update doc/whats-new.rst

Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add regression test for #8459

* Update whats-new

* Address Ryan's comment

* Update region typing

* Update test

---------

Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Deepak Cherian <deepak@cherian.net>
  • Loading branch information
4 people committed Mar 29, 2024
1 parent 852b7e6 commit ffb30a8
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 13 deletions.
5 changes: 4 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ v2024.03.0 (unreleased)

New Features
~~~~~~~~~~~~

- Partial writes to existing chunks with ``region`` or ``append_dim`` will now raise an error
(unless ``safe_chunks=False``); previously an error would only be raised on
new variables. (:pull:`8459`, :issue:`8371`, :issue:`8882`)
By `Maximilian Roos <https://github.com/max-sixty>`_.
- Grouped and resampling quantile calculations now use the vectorized algorithm in ``flox>=0.9.4`` if present.
By `Deepak Cherian <https://github.com/dcherian>`_.
- Do not broadcast in arithmetic operations when global option ``arithmetic_broadcast=False``
Expand Down
16 changes: 12 additions & 4 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
f"Writing this array in parallel with dask could lead to corrupted data."
)
if safe_chunks:
raise NotImplementedError(
raise ValueError(
base_error
+ " Consider either rechunking using `chunk()`, deleting "
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
Expand Down Expand Up @@ -707,6 +707,17 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
if v.encoding == {"_FillValue": None} and fill_value is None:
v.encoding = {}

# We need to do this for both new and existing variables to ensure we're not
# writing to a partial chunk, even though we don't use the `encoding` value
# when writing to an existing variable. See
# https://github.com/pydata/xarray/issues/8371 for details.
encoding = extract_zarr_variable_encoding(
v,
raise_on_invalid=check,
name=vn,
safe_chunks=self._safe_chunks,
)

if name in existing_keys:
# existing variable
# TODO: if mode="a", consider overriding the existing variable
Expand Down Expand Up @@ -737,9 +748,6 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
zarr_array = self.zarr_group[name]
else:
# new variable
encoding = extract_zarr_variable_encoding(
v, raise_on_invalid=check, name=vn, safe_chunks=self._safe_chunks
)
encoded_attrs = {}
# the magic for storing the hidden dimension data
encoded_attrs[DIMENSION_KEY] = dims
Expand Down
12 changes: 9 additions & 3 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4120,7 +4120,7 @@ def to_zarr(
compute: Literal[True] = True,
consolidated: bool | None = None,
append_dim: Hashable | None = None,
region: Mapping[str, slice] | None = None,
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
Expand All @@ -4140,7 +4140,7 @@ def to_zarr(
compute: Literal[False],
consolidated: bool | None = None,
append_dim: Hashable | None = None,
region: Mapping[str, slice] | None = None,
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
Expand All @@ -4158,7 +4158,7 @@ def to_zarr(
compute: bool = True,
consolidated: bool | None = None,
append_dim: Hashable | None = None,
region: Mapping[str, slice] | None = None,
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
Expand Down Expand Up @@ -4237,6 +4237,12 @@ def to_zarr(
in with ``region``, use a separate call to ``to_zarr()`` with
``compute=False``. See "Appending to existing Zarr stores" in
the reference documentation for full details.
Users are expected to ensure that the specified region aligns with
Zarr chunk boundaries, and that dask chunks are also aligned.
Xarray makes limited checks that these multiple chunk boundaries line up.
It is possible to write incomplete chunks and corrupt the data with this
option if you are not careful.
safe_chunks : bool, default: True
If True, only allow writes to when there is a many-to-one relationship
between Zarr chunks (specified in encoding) and Dask chunks.
Expand Down
6 changes: 6 additions & 0 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,12 @@ def to_zarr(
in with ``region``, use a separate call to ``to_zarr()`` with
``compute=False``. See "Appending to existing Zarr stores" in
the reference documentation for full details.
Users are expected to ensure that the specified region aligns with
Zarr chunk boundaries, and that dask chunks are also aligned.
Xarray makes limited checks that these multiple chunk boundaries line up.
It is possible to write incomplete chunks and corrupt the data with this
option if you are not careful.
safe_chunks : bool, default: True
If True, only allow writes to when there is a many-to-one relationship
between Zarr chunks (specified in encoding) and Dask chunks.
Expand Down
85 changes: 80 additions & 5 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2304,7 +2304,7 @@ def test_chunk_encoding_with_dask(self) -> None:
# should fail if encoding["chunks"] clashes with dask_chunks
badenc = ds.chunk({"x": 4})
badenc.var1.encoding["chunks"] = (6,)
with pytest.raises(NotImplementedError, match=r"named 'var1' would overlap"):
with pytest.raises(ValueError, match=r"named 'var1' would overlap"):
with self.roundtrip(badenc) as actual:
pass

Expand Down Expand Up @@ -2342,9 +2342,7 @@ def test_chunk_encoding_with_dask(self) -> None:
# but itermediate unaligned chunks are bad
badenc = ds.chunk({"x": (3, 5, 3, 1)})
badenc.var1.encoding["chunks"] = (3,)
with pytest.raises(
NotImplementedError, match=r"would overlap multiple dask chunks"
):
with pytest.raises(ValueError, match=r"would overlap multiple dask chunks"):
with self.roundtrip(badenc) as actual:
pass

Expand All @@ -2358,7 +2356,7 @@ def test_chunk_encoding_with_dask(self) -> None:
# TODO: remove this failure once synchronized overlapping writes are
# supported by xarray
ds_chunk4["var1"].encoding.update({"chunks": 5})
with pytest.raises(NotImplementedError, match=r"named 'var1' would overlap"):
with pytest.raises(ValueError, match=r"named 'var1' would overlap"):
with self.roundtrip(ds_chunk4) as actual:
pass
# override option
Expand Down Expand Up @@ -5753,3 +5751,80 @@ def test_zarr_region(tmp_path):

# Write without region
ds_transposed.to_zarr(tmp_path / "test.zarr", mode="r+")


@requires_zarr
@requires_dask
def test_zarr_region_chunk_partial(tmp_path):
"""
Check that writing to partial chunks with `region` fails, assuming `safe_chunks=False`.
"""
ds = (
xr.DataArray(np.arange(120).reshape(4, 3, -1), dims=list("abc"))
.rename("var1")
.to_dataset()
)

ds.chunk(5).to_zarr(tmp_path / "foo.zarr", compute=False, mode="w")
with pytest.raises(ValueError):
for r in range(ds.sizes["a"]):
ds.chunk(3).isel(a=[r]).to_zarr(
tmp_path / "foo.zarr", region=dict(a=slice(r, r + 1))
)


@requires_zarr
@requires_dask
def test_zarr_append_chunk_partial(tmp_path):
t_coords = np.array([np.datetime64("2020-01-01").astype("datetime64[ns]")])
data = np.ones((10, 10))

da = xr.DataArray(
data.reshape((-1, 10, 10)),
dims=["time", "x", "y"],
coords={"time": t_coords},
name="foo",
)
da.to_zarr(tmp_path / "foo.zarr", mode="w", encoding={"foo": {"chunks": (5, 5, 1)}})

new_time = np.array([np.datetime64("2021-01-01").astype("datetime64[ns]")])

da2 = xr.DataArray(
data.reshape((-1, 10, 10)),
dims=["time", "x", "y"],
coords={"time": new_time},
name="foo",
)
with pytest.raises(ValueError, match="encoding was provided"):
da2.to_zarr(
tmp_path / "foo.zarr",
append_dim="time",
mode="a",
encoding={"foo": {"chunks": (1, 1, 1)}},
)

# chunking with dask sidesteps the encoding check, so we need a different check
with pytest.raises(ValueError, match="Specified zarr chunks"):
da2.chunk({"x": 1, "y": 1, "time": 1}).to_zarr(
tmp_path / "foo.zarr", append_dim="time", mode="a"
)


@requires_zarr
@requires_dask
def test_zarr_region_chunk_partial_offset(tmp_path):
# https://github.com/pydata/xarray/pull/8459#issuecomment-1819417545
store = tmp_path / "foo.zarr"
data = np.ones((30,))
da = xr.DataArray(data, dims=["x"], coords={"x": range(30)}, name="foo").chunk(x=10)
da.to_zarr(store, compute=False)

da.isel(x=slice(10)).chunk(x=(10,)).to_zarr(store, region="auto")

da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(
store, safe_chunks=False, region="auto"
)

# This write is unsafe, and should raise an error, but does not.
# with pytest.raises(ValueError):
# da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(store, region="auto")

0 comments on commit ffb30a8

Please sign in to comment.