Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to force intake.open_netcdf(uri) to read all bytes up front with storage options #88

Closed
scottyhq opened this issue Oct 29, 2020 · 8 comments

Comments

@scottyhq
Copy link
Collaborator

While it's possible to read HDF files as file-like objects, it's not always a great idea because random access ends up being extremely slow (orders of magnitude in some cases). It's often better to just pull all the bytes of the file into a memory or temporary file. I'm trying to figure out an easily way to do this with intake.open_netcdf() via storage_options, but failing. There are some nice suggestions for HTTP here #56... what about S3FS? This issue is also related #86, but here I think it'd be useful to ignore dask for a minute and just deal with a single file.

The goal is to do the following efficiently (what goes into storage_options?) @martindurant @weiji14 perhaps you know how to accomplish this?

da = intake.open_netcdf(s3://data.h5, storage_options=?).to_dask()
print(da.h_li.mean())

Here are some timing details on different access patterns

import s3fs
import xarray as xr
import intake
import io

#64MB file with complicated group structure, with small internal chunking (bad for cloud storage)
url = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5' 

pull all the bytes (1 second)

%%time
s3 = s3fs.S3FileSystem(anon=True)
with s3.open(url) as f:
    memoryFile = io.BytesIO(f.read())
    da = xr.open_dataset(memoryFile, group='gt1l/land_ice_segments', engine='h5netcdf')
    print(da.h_li.mean())
<xarray.DataArray 'h_li' ()>
array(899.6408, dtype=float32)
CPU times: user 362 ms, sys: 278 ms, total: 640 ms
Wall time: 1.19 s

lazy initial read (16 seconds!)

%%time
s3 = s3fs.S3FileSystem(anon=True)
url = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5' 
with s3.open(url) as f:
    da = xr.open_dataset(f, group='gt1l/land_ice_segments', engine='h5netcdf')
    print(da.h_li.mean())
<xarray.DataArray 'h_li' ()>
array(899.6408, dtype=float32)
CPU times: user 3.51 s, sys: 4.06 s, total: 7.56 s
Wall time: 16.2 s
@weiji14
Copy link
Collaborator

weiji14 commented Oct 30, 2020

Hi @scottyhq! Not sure if this is what you want, but here's some code below to get you started. It uses simplecache so the HDF5 file will be 'downloaded' and persisted on the filesystem once (and that might be slow), but subsequent access should be fast and not require any internet connection. Probably best if you'll need to loop through all 6 laser beam groups.

import intake

# Setup parameters
urlpath: str = "simplecache::s3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5"
storage_options: dict = {
    "simplecache": dict(cache_storage="/tmp/ATL06", same_names=True)
}
xarray_kwargs: dict = dict(group="gt1l/land_ice_segments", engine="h5netcdf")

# Read data using intake
source: intake_xarray.netcdf.NetCDFSource = intake.open_netcdf(
    urlpath=urlpath, storage_options=storage_options, xarray_kwargs=xarray_kwargs
)
dataarray: xr.Dataset = source.to_dask()
print(dataarray.h_li.mean())

produces:

<xarray.DataArray 'h_li' ()>
array(899.6408, dtype=float32)

Didn't realize too that ITS Live has an ICESat-2 s3 bucket, is this documented somewhere or still an internal thing? Just curious because this opens up a whole lot of possibilities on IcePyx!

@martindurant
Copy link
Member

There is a caching type that does this, but it's not exposed: fsspec.caching.AllBytes.

With fsspec/filesystem_spec#462 , it should be possible to give default_cache_type="all" in storage_options and get the behaviour you are after. Please test!

@scottyhq
Copy link
Collaborator Author

thanks @weiji14 , that snippet is certainly helpful. The ITSLive I2 data (just ATL06) I don't think is documented, so I'd not depend on it. But I do know NSIDC is in the process of moving to hosting on S3 this year, so it's good to explore various ways to access the data - @martindurant's PR allows everything in RAM without writing to disk, so it will be interesting to test. I imagine there will be use cases for both approaches!

@scottyhq
Copy link
Collaborator Author

scottyhq commented Nov 20, 2020

I'm not sure this is quite working. I expect the following to work:

uri = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
da = intake.open_netcdf(uri, 
                        xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
                        storage_options=dict(anon=True, default_cache_type='all'),
                       ).to_dask()
print(da.h_li.mean())

But this leads to the following traceback:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-19-f7f55e6ac5c2> in <module>
      2 da = intake.open_netcdf(uri, 
      3                         xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
----> 4                         storage_options=dict(anon=True, default_cache_type='all'),
      5                        ).to_dask()
      6 print(da.h_li.mean())

~/miniconda3/envs/intake-stac-gui/lib/python3.7/site-packages/intake_xarray/base.py in to_dask(self)
     67     def to_dask(self):
     68         """Return xarray object where variables are dask arrays"""
---> 69         return self.read_chunked()
     70 
     71     def close(self):

~/miniconda3/envs/intake-stac-gui/lib/python3.7/site-packages/intake_xarray/base.py in read_chunked(self)
     42     def read_chunked(self):
     43         """Return xarray object (which will have chunks)"""
---> 44         self._load_metadata()
     45         return self._ds
     46 

~/miniconda3/envs/intake-stac-gui/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
    124         """load metadata only if needed"""
    125         if self._schema is None:
--> 126             self._schema = self._get_schema()
    127             self.datashape = self._schema.datashape
    128             self.dtype = self._schema.dtype

~/miniconda3/envs/intake-stac-gui/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self)
     16 
     17         if self._ds is None:
---> 18             self._open_dataset()
     19 
     20             metadata = {

~/miniconda3/envs/intake-stac-gui/lib/python3.7/site-packages/intake_xarray/netcdf.py in _open_dataset(self)
     77         else:
     78             _open_dataset = xr.open_dataset
---> 79         url = fsspec.open_local(url, **self.storage_options)
     80 
     81         self._ds = _open_dataset(url, chunks=self.chunks, **kwargs)

~/miniconda3/envs/intake-stac-gui/lib/python3.7/site-packages/fsspec/core.py in open_local(url, mode, **storage_options)
    459     if not getattr(of[0].fs, "local_file", False):
    460         raise ValueError(
--> 461             "open_local can only be used on a filesystem which"
    462             " has attribute local_file=True"
    463         )

ValueError: open_local can only be used on a filesystem which has attribute local_file=True

@scottyhq
Copy link
Collaborator Author

scottyhq commented Nov 20, 2020

Also, changing to a simplecache:: URI seems to bypass the anon=True keyword such that we get a ClientError: An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied even for this public s3 bucket.

uri = 'simplecache::s3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
da = intake.open_netcdf(uri, 
                        xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
                        storage_options=dict(anon=True, default_cache_type='all',
                                             simplecache=dict(cache_storage="/tmp/atl06", same_names=True),
                                            )
                       ).to_dask()
print(da.h_li.mean())

But, I'd prefer to avoid writing to disk if possible and just stream all the bytes into memory as described in the first comment of this issue.

@martindurant
Copy link
Member

seems to bypass the anon=True keyword

right, you need "s3": {"anon": True} in storage_options, else fsspec doesn't know if the argument is meant for s3 or simplecache.

Didn't #82 allow the use of file-likes for netcdf? Or does that logic need to be extended?

@scottyhq
Copy link
Collaborator Author

Thanks for the clarification @martindurant , I can confirm the following works, my remaining question is if cache_storage is not specified, is the data held in memory rather than writing to disk?

import intake
uri = 'simplecache::s3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
da = intake.open_netcdf(uri, 
                        xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
                        storage_options=dict(s3={'anon': True}, default_cache_type='all',
                                             #simplecache=dict(cache_storage="/tmp/atl06", same_names=True),
                                            )
                       ).to_dask()
print(da.h_li.mean())

Didn't #82 allow the use of file-likes for netcdf? Or does that logic need to be extended?

That PR only dealt with raster.py, didn't touch netcdf.py.

@martindurant
Copy link
Member

if cache_storage is not specified, is the data held in memory rather than writing to disk?

It is written to your temporary file store location, i.e., whatever tempfile.mkdtemp returns, as a file. This is usually on a disk, but might be held in memory - this is down to the OS configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants