Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writing to regions with unaligned chunks can lose data #8371

Closed
5 tasks done
max-sixty opened this issue Oct 25, 2023 · 20 comments · Fixed by #8459
Closed
5 tasks done

Writing to regions with unaligned chunks can lose data #8371

max-sixty opened this issue Oct 25, 2023 · 20 comments · Fixed by #8459
Labels
bug topic-zarr Related to zarr storage library

Comments

@max-sixty
Copy link
Collaborator

What happened?

Writing with region with chunks that aren't aligned can lose data.

I've recreated an example below. While it's unlikely that folks are passing different values to .chunk for the template vs. the regions, I had an "auto" chunk, which can then set different chunk values.

(FWIW, this was fairly painful, and I managed to lose a lot of time by not noticing this, and then not really considering this could happen as I was trying to debug. I think we should really strive to ensure that we don't lose data / incorrectly report that we've successfully written data...)

What did you expect to happen?

If there's a risk of data loss, raise an error...

Minimal Complete Verifiable Example

ds = xr.DataArray(np.arange(120).reshape(4,3,-1),dims=list("abc")).rename('var1').to_dataset().chunk(2)

ds

# <xarray.Dataset>
# Dimensions:  (a: 4, b: 3, c: 10)
# Dimensions without coordinates: a, b, c
# Data variables:
#     var1     (a, b, c) int64 dask.array<chunksize=(2, 2, 2), meta=np.ndarray>

def write(ds):
    ds.chunk(5).to_zarr('foo.zarr', compute=False, mode='w')
    for r in (range(ds.sizes['a'])):
        ds.chunk(3).isel(a=[r]).to_zarr('foo.zarr', region=dict(a=slice(r, r+1)))


def read(ds):
    result = xr.open_zarr('foo.zarr')
    assert result.compute().identical(ds)
    print(result.chunksizes, ds.chunksizes)

write(ds); read(ds)

# AssertionError

xr.open_zarr('foo.zarr').compute()['var1']

<xarray.DataArray 'var1' (a: 4, b: 3, c: 10)>
array([[[  0,   0,   0,   3,   4,   5,   0,   0,   0,   9],
        [  0,   0,   0,  13,  14,  15,   0,   0,   0,  19],
        [  0,   0,   0,  23,  24,  25,   0,   0,   0,  29]],

       [[ 30,  31,  32,   0,   0,  35,  36,  37,  38,   0],
        [ 40,  41,  42,   0,   0,  45,  46,  47,  48,   0],
        [ 50,  51,  52,   0,   0,  55,  56,  57,  58,   0]],

       [[ 60,  61,  62,   0,   0,  65,   0,   0,   0,  69],
        [ 70,  71,  72,   0,   0,  75,   0,   0,   0,  79],
        [ 80,  81,  82,   0,   0,  85,   0,   0,   0,  89]],

       [[  0,   0,   0,  93,  94,  95,  96,  97,  98,   0],
        [  0,   0,   0, 103, 104, 105, 106, 107, 108,   0],
        [  0,   0,   0, 113, 114, 115, 116, 117, 118,   0]]])
Dimensions without coordinates: a, b, c

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.
  • Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

No response

Anything else we need to know?

No response

Environment

INSTALLED VERSIONS

commit: ccc8f99
python: 3.9.18 (main, Aug 24 2023, 21:19:58)
[Clang 14.0.3 (clang-1403.0.22.14.1)]
python-bits: 64
OS: Darwin
OS-release: 22.6.0
machine: arm64
processor: arm
byteorder: little
LC_ALL: en_US.UTF-8
LANG: None
LOCALE: ('en_US', 'UTF-8')
libhdf5: None
libnetcdf: None

xarray: 2023.10.2.dev10+gccc8f998
pandas: 2.1.1
numpy: 1.25.2
scipy: 1.11.1
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.16.0
cftime: None
nc_time_axis: None
PseudoNetCDF: None
iris: None
bottleneck: None
dask: 2023.4.0
distributed: 2023.7.1
matplotlib: 3.5.1
cartopy: None
seaborn: None
numbagg: 0.2.3.dev30+gd26e29e
fsspec: 2021.11.1
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: 0.9.19
setuptools: 68.1.2
pip: 23.2.1
conda: None
pytest: 7.4.0
mypy: 1.6.0
IPython: 8.15.0
sphinx: 4.3.2

@max-sixty max-sixty added bug needs triage Issue that has not been reviewed by xarray team member topic-zarr Related to zarr storage library and removed needs triage Issue that has not been reviewed by xarray team member labels Oct 25, 2023
@jhamman
Copy link
Member

jhamman commented Oct 25, 2023

@max-sixty - thanks for the clear bug description. I wonder if we are opening the chunks with the wrong mode. We should be appending to existing chunks, but it almost seems like we are overwriting existing data (I could have the missing data pattern wrong though).

@max-sixty
Copy link
Collaborator Author

max-sixty commented Oct 25, 2023

If we're appending to existing chunks, is that safe to run in parallel? (even though that's not the issue here)

Should we raise an error if a region doesn't fully encompass a chunk that it's writing to?

@max-sixty
Copy link
Collaborator Author

IIUC, mode='r+' — the default mode when region is set:

  • doesn't allow changing metadata
  • but does allow writing partial chunks
  • and any data that's not in that chunk will be set to the empty value
  • and so it's inherently unsafe — both when running concurrently, but also serially?
  • Is there any reason to allow writing a partial chunk in mode='r+'?

If we instead append to an existing chunk, as @jhamman suggested, then this would safe serially but not concurrently? i.e. does mode='a' retain the parts of the chunk that aren't in the source data in the target? (e.g. by copying them from the target before writing a combined chunk?)

@shoyer
Copy link
Member

shoyer commented Oct 26, 2023

Writing a partial chunk in either mode='r+' or mode='a' will rewrite the entire chunk, by first downloading the existing chunk and then overwriting everything. It's safe in serial code, but not if doing concurrent writes.

Given that purpose of mode='r+' is to have a safe way to write in parallel, I would support making it an error to write a partial chunk in mode='r+'. If users want to write partial chunks, they can use mode='a', which is already non-threadsafe.

@max-sixty
Copy link
Collaborator Author

Writing a partial chunk in either mode='r+' or mode='a' will rewrite the entire chunk, by first downloading the existing chunk and then overwriting everything. It's safe in serial code, but not if doing concurrent writes.

OK great!

Though it seems that writing with region doesn't work that way — the MVCE above is writing serially and is overwriting data. Or is there another explanation?

Given that purpose of mode='r+' is to have a safe way to write in parallel, I would support making it an error to write a partial chunk in mode='r+'. If users want to write partial chunks, they can use mode='a', which is already non-threadsafe.

Yes, that makes sense to me — i.e. if running concurrently, writing with r+ is required, otherwise a is fine; not much use in discriminating between metadata and data for that purpose...

@shoyer
Copy link
Member

shoyer commented Oct 26, 2023

Though it seems that writing with region doesn't work that way — the MVCE above is writing serially and is overwriting data. Or is there another explanation?

Oh indeed, there is something else buggy going on here, too! I'm not sure it's related to region, though.

@jhamman
Copy link
Member

jhamman commented Oct 26, 2023

I wonder if we can reproduce this using the zarr-python api directly? I suspect not but it would be good to check.

@jhamman
Copy link
Member

jhamman commented Oct 31, 2023

FWIW, I was able to reproduce the behavior @max-sixty demonstrated with the latest Xarray/Zarr versions. The equivalent pattern is not reproducible using Zarr-Python.

@dcherian
Copy link
Contributor

dcherian commented Nov 15, 2023

the MVCE above is writing serially and is overwriting data.

I believe this is using dask's threaded scheduler to write in parallel. The test passes in serial (run by specifying dask.config.set(scheduler="sync")), and looks like racing writes for "threads" and "processes"

@jhamman did you run with dask or in serial with only Zarr-python.

@max-sixty
Copy link
Collaborator Author

max-sixty commented Nov 15, 2023

Great find @dcherian !

If that's the case, we could:

  • Check for overlapping unaligned writes in a single .to_zarr call, raising an error if they exist
  • Restrict unaligned writes to mode="a", so that writing to mutually exclusive parts of a dataset concurrently can't trigger this

@dcherian
Copy link
Contributor

Great! PR?

@max-sixty
Copy link
Collaborator Author

Great! PR?

Yup — easier to suggest the thing that write the code! I think realistically I'm less likely to get to this soon relative to working the numbagg stuff through...

@jhamman
Copy link
Member

jhamman commented Nov 16, 2023

@jhamman did you run with dask or in serial with only Zarr-python.

I ran with the threaded scheduler but without trying this again, I'm skeptical this is the problem. The region writes are done one at a time in a for-loop. This should work!

@max-sixty
Copy link
Collaborator Author

I realize I was claiming it was serial, but I think @dcherian might be right, since if we only chunk on a, then it works fine:

diff --git a/xarray/core/x.py b/xarray/core/x.py
index 749f228a..b2053f5f 100644
--- a/xarray/core/x.py
+++ b/xarray/core/x.py
@@ -13,9 +13,9 @@
 
 
 def write(ds):
-    ds.chunk(5).to_zarr("foo.zarr", compute=False, mode="w")
+    ds.chunk(a=5).to_zarr("foo.zarr", compute=False, mode="w")
     for r in range(ds.sizes["a"]):
-        ds.chunk(3).isel(a=[r]).to_zarr("foo.zarr", region=dict(a=slice(r, r + 1)))
+        ds.chunk(a=3).isel(a=[r]).to_zarr("foo.zarr", region=dict(a=slice(r, r + 1)))
 
 
 def read(ds):

...so possibly the dask scheduler is writing multiple chunks along the other dims.

(though I still think this is not great, that a .to_zarr call can lose data...)

@jhamman
Copy link
Member

jhamman commented Nov 16, 2023

You may want to try throwing zarr's ThreadSynchronizer in the mix to see if that resolves things here. I believe you can pass this to to_zarr(..., synchronizer=ThreadSynchronizer())

@max-sixty
Copy link
Collaborator Author

You may want to try throwing zarr's ThreadSynchronizer in the mix to see if that resolves things here. I believe you can pass this to to_zarr(..., synchronizer=ThreadSynchronizer())

This also makes it work!


Possibly we should add the check to safe_chunks — i.e. read the layout from the zarr store, and raise unless safe_chunks is False (by default it's True)...

@rabernat
Copy link
Contributor

We already have a lot of logic in place to verify that we don't do these sort of unaligned writes, e.g.

# the hard case
# DESIGN CHOICE: do not allow multiple dask chunks on a single zarr chunk
# this avoids the need to get involved in zarr synchronization / locking
# From zarr docs:
# "If each worker in a parallel computation is writing to a separate
# region of the array, and if region boundaries are perfectly aligned
# with chunk boundaries, then no synchronization is required."
# TODO: incorporate synchronizer to allow writes from multiple dask
# threads
if var_chunks and enc_chunks_tuple:
for zchunk, dchunks in zip(enc_chunks_tuple, var_chunks):
for dchunk in dchunks[:-1]:
if dchunk % zchunk:
base_error = (
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for "
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. "
f"Writing this array in parallel with dask could lead to corrupted data."
)
if safe_chunks:
raise NotImplementedError(
base_error
+ " Consider either rechunking using `chunk()`, deleting "
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
)

Is the issue here that writing with regions or appending bypasses these checks?

@max-sixty
Copy link
Collaborator Author

OK you got me, thanks for getting this close enough — #8459 fixes it. I'm not sure it's the best conceptual design, but it does fix it...

@dcherian
Copy link
Contributor

dcherian commented Nov 16, 2023

I'm not sure what the action item is. The MVCE succeeds in serial, or if the appropriate synchronizer is used. Erroring would be backwards-incompatible, wouldn't it?

@max-sixty
Copy link
Collaborator Author

I'm not sure what the action item is. The MVCE succeeds in serial, or if the appropriate synchronizer is used. Erroring would be backwards-incompatible, wouldn't it?

But is the existing behavior intentional? The most common case of using region are distributed workers, where it's much less common to use a synchronizer (and would be really non-performant on many distributed filesystems). At the moment, the existing behavior will lose data.

To get the old behavior, someone can pass safe_chunks=False. Or is the claim that we should ignore safe_chunks for existing arrays?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug topic-zarr Related to zarr storage library
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants