Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

open_mfdataset: support for multiple zarr datasets #3668

Closed
dmedv opened this issue Jan 7, 2020 · 14 comments · Fixed by #4187
Closed

open_mfdataset: support for multiple zarr datasets #3668

dmedv opened this issue Jan 7, 2020 · 14 comments · Fixed by #4187
Labels
enhancement topic-zarr Related to zarr storage library

Comments

@dmedv
Copy link

dmedv commented Jan 7, 2020

I am running calculations on a remote Dask cluster. Some of the data is only available on the workers, not on the client. It is already possible to have an xarray dataset that "points" to a remote NetCDF data collection by using the parallel option with xarray.open_mfdataset() like this:

from dask.distributed import Client
import xarray as xr

client = Client('<dask_scheduler_ip>:<port>')
ds = xr.open_mfdataset(remote_nc_file_paths, combine='by_coords', parallel=True)

Then it will use dask.delayed and, for example, the following simple mean calculation will be distributed between the workers, the result returned to the client:

ds['Temp'].mean().compute()

Unfortunately, I cannot do the same thing with zarr, because open_mfdataset() does not support it, and open_zarr() does not have an option to utilize dask.delayed. Would it be possible to add dask.delayed support to the zarr backend? Or, perhaps, I am missing something, and there is another better way to work with zarr data on a remote Dask cluster?

Output of xr.show_versions():

INSTALLED VERSIONS
------------------
commit: None
python: 3.6.7 |Anaconda custom (64-bit)| (default, Oct 23 2018, 19:16:44) 
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 3.10.0-862.2.3.el7.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: None
LOCALE: None.None
libhdf5: 1.10.4
libnetcdf: 4.6.3

xarray: 0.14.1
pandas: 0.25.3
numpy: 1.17.3
scipy: 1.4.1
netCDF4: 1.5.3
pydap: None
h5netcdf: None
h5py: 2.8.0
Nio: None
zarr: 2.3.2
cftime: 1.0.4.2
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.2.1
dask: 2.9.1
distributed: 2.9.1
matplotlib: 3.1.2
cartopy: None
seaborn: 0.9.0
numbagg: None
setuptools: 40.4.3
pip: 18.1
conda: 4.8.0
pytest: 3.8.2
IPython: 7.0.1
sphinx: 1.8.1
@dcherian
Copy link
Contributor

dcherian commented Jan 8, 2020

You can use the pseudocode here: https://xarray.pydata.org/en/stable/io.html#reading-multi-file-datasets and change open_dataset to open_zarr and then things should work (if I understand things correctly)

@dcherian dcherian changed the title Using dask.delayed to work with zarr data on a remote Dask cluster open_mfdataset: support for multiple zarr datasets Jan 8, 2020
@rabernat
Copy link
Contributor

rabernat commented Jan 8, 2020

Hi @dmedv -- thanks a lot for raising this issue here!

One clarification question: is there just a single zarr store you are trying to read? Or are you trying to combine multiple stores, like open_mfdataset does with multiple netcdf files?

Some of the data is only available on the workers, not on the client.

Can you provide more detail about how the zarr data is distributed across the different workers and client.

@dmedv
Copy link
Author

dmedv commented Jan 8, 2020

@rabernat
Each Dask worker is running on its own machine. The data that I am trying to work with is distributed among workers, but all of it is accessible from any individual worker via cross-mounted NFS shares, so this works like a shared data storage, basically. None of that data is available on the client.

For now, I'm trying to open just a single zarr store. I have only mentioned open_mfdataset as an example, because it has this parallel option, unlike open_dataset or open_zarr. This is really not about combining multiple datasets, but about working with data on a remote Dask cluster. Sorry, if I haven't made it absolutely clear from the start.

@dcherian
You mean this code?

def modify(ds):
    # modify ds here
    return ds


# this is basically what open_mfdataset does
open_kwargs = dict(decode_cf=True, decode_times=False)
open_tasks = [dask.delayed(xr.open_dataset)(f, **open_kwargs) for f in file_names]
tasks = [dask.delayed(modify)(task) for task in open_tasks]
datasets = dask.compute(tasks)  # get a list of xarray.Datasets
combined = xr.combine_nested(datasets)  # or some combination of concat, merge

In case of a single data source, I think, it can be condensed into this:

open_kwargs = dict(
    decode_cf=True, 
    decode_times=False
)
ds = dask.compute(dask.delayed(xr.open_dataset)(file_name, **open_kwargs))[0]

But it doesn't work quite as I expected, either with zarr, or with NetCDF. First I'll have to explain what I get with open_dataset and a NetCDF file. The code above runs, but when I try to do calculations on the obtained dataset, for example

ds['Temp'].mean().compute()

I get

FileNotFoundError: [Errno 2] No such file or directory

on the client. Only if I wrap it in dask.delayed again, it will run properly:

dask.compute(dask.delayed(ds['Temp'].mean)())

So, this approach is not fully equivalent to what open_mfdataset does, and unfortunately that doesn't work for me, because I would like to be able to use the xarray dataset transparently, without having to program Dask explicitly.

If I add chunks={} to open_kwargs, similar to this line in the open_mfdataset implementation https://github.com/pydata/xarray/blob/v0.14.1/xarray/backends/api.py#L885 , then it starts behaving exactly like open_mfdataset and I can use the dataset transparently. I don't quite understand what's going on there, but so far so good.

Now, back to zarr:

ds = dask.compute(dask.delayed(xr.open_zarr)(zarr_dataset_path, **open_kwargs))[0]

doesn't run at all, regardless of the chunks setting, giving me

ValueError: group not found at path ''

so I don't even get a dataset object. Seems that something is quite different in the zarr backend implementation. I haven't had the chance to look at the code carefully yet, but I will do so in the next few days.

Sorry for this long-winded explanation, I hope it clarifies what I'm trying to achieve here.

@dmedv
Copy link
Author

dmedv commented Jan 9, 2020

Here is the stacktrace (somewhat abbreviated). Looks like a deserialization problem. As far as I can see from the Dask status dashboard and worker logs, open_zarr does finish normally on the worker. Just in case, I ran client.get_versions(check=True), and it didn't show any library mismatches.

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95\x92\x13\x01\x00\x00\x00\x00\x00\x8c\x13xarray.core.dataset\x94\x8c\x07Dataset\x94\x93\x94)\x81\x94 ...

...

KeyErrorTraceback (most recent call last)
~/miniconda3/lib/python3.6/site-packages/zarr/hierarchy.py in __init__(self, store, path, read_only, chunk_store, cache_attrs, synchronizer)
    109             mkey = self._key_prefix + group_meta_key
--> 110             meta_bytes = store[mkey]
    111         except KeyError:

~/miniconda3/lib/python3.6/site-packages/zarr/storage.py in __getitem__(self, key)
    726         else:
--> 727             raise KeyError(key)
    728 

KeyError: '.zgroup'

During handling of the above exception, another exception occurred:

ValueErrorTraceback (most recent call last)
<ipython-input-60-5c7db35096c7> in <module>
      6     chunks={}
      7 )
----> 8 ds = dask.compute(dask.delayed(_xr.open_zarr)('/sciserver/filedb02-01/ocean/LLC4320/SST',**open_kwargs))[0]

...

~/miniconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in loads(x)
     57 def loads(x):
     58     try:
---> 59         return pickle.loads(x)
     60     except Exception:
     61         logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

~/miniconda3/lib/python3.6/site-packages/zarr/hierarchy.py in __setstate__(self, state)
    269 
    270     def __setstate__(self, state):
--> 271         self.__init__(*state)
    272 
    273     def _item_path(self, item):

~/miniconda3/lib/python3.6/site-packages/zarr/hierarchy.py in __init__(self, store, path, read_only, chunk_store, cache_attrs, synchronizer)
    110             meta_bytes = store[mkey]
    111         except KeyError:
--> 112             err_group_not_found(path)
    113         else:
    114             meta = decode_group_metadata(meta_bytes)

~/miniconda3/lib/python3.6/site-packages/zarr/errors.py in err_group_not_found(path)
     27 
     28 def err_group_not_found(path):
---> 29     raise ValueError('group not found at path %r' % path)
     30 
     31 

ValueError: group not found at path ''

@dmedv
Copy link
Author

dmedv commented Jan 9, 2020

I tried to do serialization/deserialization by hand:

  • logged in to one of the Dask worker, loaded zarr data locally using open_zarr, pickled the resulting dataset
ds = xr.open_zarr("/sciserver/filedb02-01/ocean/LLC4320/SST")
pickle.dump(ds, open("/home/dask/zarr.p", "wb"))
  • copied the pickle file to the client, tried to unpickle it
ds = pickle.load(open("zarr.p", "rb"))

It failed with the same error:

UnpicklingErrorTraceback (most recent call last)
<ipython-input-77-4809dc01c404> in <module>
----> 1 a = pickle.loads(s)

UnpicklingError: pickle data was truncated

import pickle, xarray
pickle.load(open("zarr.p", "rb"))
zarr = pickle.load(open("zarr.p", "rb"))

KeyErrorTraceback (most recent call last)
~/miniconda3/lib/python3.6/site-packages/zarr/hierarchy.py in __init__(self, store, path, read_only, chunk_store, cache_attrs, synchronizer)
    109             mkey = self._key_prefix + group_meta_key
--> 110             meta_bytes = store[mkey]
    111         except KeyError:

~/miniconda3/lib/python3.6/site-packages/zarr/storage.py in __getitem__(self, key)
    726         else:
--> 727             raise KeyError(key)
    728 

KeyError: '.zgroup'

During handling of the above exception, another exception occurred:

ValueErrorTraceback (most recent call last)
<ipython-input-83-cd9f4ae936eb> in <module>
----> 1 zarr = pickle.load(open("zarr.p", "rb"))

~/miniconda3/lib/python3.6/site-packages/zarr/hierarchy.py in __setstate__(self, state)
    269 
    270     def __setstate__(self, state):
--> 271         self.__init__(*state)
    272 
    273     def _item_path(self, item):

~/miniconda3/lib/python3.6/site-packages/zarr/hierarchy.py in __init__(self, store, path, read_only, chunk_store, cache_attrs, synchronizer)
    110             meta_bytes = store[mkey]
    111         except KeyError:
--> 112             err_group_not_found(path)
    113         else:
    114             meta = decode_group_metadata(meta_bytes)

~/miniconda3/lib/python3.6/site-packages/zarr/errors.py in err_group_not_found(path)
     27 
     28 def err_group_not_found(path):
---> 29     raise ValueError('group not found at path %r' % path)
     30 
     31 

ValueError: group not found at path ''

I then tried the same thing with a NetCDF dataset, and it worked fine. Also, the pickle file for NetCDF was much smaller. So I guess in the case of zarr dataset there is some initialization code that tries to open the zarr files when the dataset object gets deserialized on the client, and of course it cannot, because there is no data on the client. That explains a lot... although I'm still not sure if xarray was ever intended to be used that way. Maybe I'm trying to do a completely wrong thing here?

@rabernat
Copy link
Contributor

rabernat commented Jan 9, 2020

Thanks for these detailed reports!

The scenario you are describing--trying to open a file that is not accessible at all from the client--is certainly not something we ever considered when designing this. It is a miracle to me that it does work with netCDF.

I think you are on track with the serialization diagnostics. I believe that @jhamman has the best understanding of this topic. He implemented the parallel mode in open_mfdataset. Perhaps he can give some suggestions.

In the meantime, it seems worth asking the obvious question...how hard would it be to mount the NFS volume on the client? That would avoid having to go down this route.

@dmedv
Copy link
Author

dmedv commented Jan 9, 2020

@rabernat
Fair enough. In our case it would be possible to mount NFS shares on the client, and if all else fails I will do exactly that. However, from architectural perspective, that would make the whole system a bit more tightly coupled than I would like, and it's easy to imagine other use-cases, where mounting data on the client would not be possible. Also, the ability to work with remote data using just xarray and dask, the way it already works with NetCDF, looks pretty neat, even if unintentional, and I am inclined to pursue that route at least a bit further.

@jhamman
Copy link
Member

jhamman commented Jan 10, 2020

The scenario you are describing--trying to open a file that is not accessible at all from the client--is certainly not something we ever considered when designing this. It is a miracle to me that it does work with netCDF.

True. I think its fair to say that the behavior you are enjoying (accessing data that the client cannot see) is the exception, not the rule. I expect there are many places in our backends that will not support this functionality at present.

The motivation for implementing the parallel feature was simply to shard the fileIO time when opening large collections (>10k) of netcdf files.

Ironically, this dask issue also popped up and has some significant overlap here: dask/dask#5769

In both of these cases, the desire is for the worker to open the file (or zarr dataset), construct the underlying dask arrays, and return the meta object. This requires the object to be fully pickle-able and for any references to be maintained. It is possible, as indicated by your traceback, that the zarr backend is trying to reference the zgroup file and its not there. The logical place to start would be to look into why we can't pickle xarray datasets that come from zarr stores.

@jhamman
Copy link
Member

jhamman commented Jan 10, 2020

Also, @dmedv, can you add the output of xr.show_versions() to your original post?

@dmedv
Copy link
Author

dmedv commented Jan 12, 2020

I did another experiment: copied the metadata to the client (.zgroup, .zarray, and .zattrs files only), preserving the directory structure. That worked, i.e. I could run calculations with remote data by wrapping them inside dask.delayed. I guess if the metadata could be cached in the object, that would solve my problem.

@dmedv
Copy link
Author

dmedv commented Jan 12, 2020

Zarr documentation is not entirely clear on whether metadata gets pickled or not with zarr.storage.DirectoryStore: https://zarr.readthedocs.io/en/stable/tutorial.html#pickle-support

but the code shows that the metadata is read from a file upon __init__, and I guess xarray is simply relying on zarr's own serialization, and there is no easy way to bypass it.

See
https://github.com/zarr-developers/zarr-python/blob/v2.4.0/zarr/hierarchy.py#L113
and
https://github.com/zarr-developers/zarr-python/blob/v2.4.0/zarr/storage.py#L785-L791

I think at this point I will just give up and mount the necessary directories on the client, but at least I have a much better understanding of the issue now.

Feel free to close if you think there's nothing else that can/should be done in xarray code about it.

@jhamman
Copy link
Member

jhamman commented Jan 13, 2020

@dmedv and @rabernat - after thinking about this a bit more and reviewing the links in the last post, I'm pretty sure we're bumping into a bug in zarray's directory store pickle support. It would be nice to confirm this with some zarr-only tests but I don't see why the store needs to reference the zgroup files when the object is unpickled.

@dmedv
Copy link
Author

dmedv commented Jan 13, 2020

@jhamman I did already confirm it with a zarr-only test, pickling and unpickling a zarr group object. I get the same error as with an xarray dataset: ValueError: group not found at path ''

Not sure if we can call it a bug though. According to the storage specification https://zarr.readthedocs.io/en/stable/spec/v2.html#storage for a group to exist a .zgroup key must exist under the corresponding logical path, so in the case of DirectoryStore it's natural to check if a .zgroup file exists at group object creation time.

@rabernat
Copy link
Contributor

rabernat commented Jan 13, 2020

It would be wonderful if we could translate this complex xarray issue into a minimally simple zarr issue. Then the zarr devs can decide whether this use case is compatible with the zarr spec or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement topic-zarr Related to zarr storage library
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants