Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic for remote or local files in NetCDFSource #93

Merged
merged 7 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
strategy:
fail-fast: false
matrix:
CONDA_ENV: [36, 37, 38, 37-upstream]
CONDA_ENV: [36, 37, 38, 39, upstream]
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Miniconda
uses: conda-incubator/setup-miniconda@v1
uses: conda-incubator/setup-miniconda@v2
with:
auto-update-conda: true
auto-activate-base: false
Expand All @@ -35,4 +35,4 @@ jobs:
- name: Run Tests
shell: bash -l {0}
run: |
pytest --verbose
pytest --verbose --ignore=intake_xarray/tests/test_network.py
19 changes: 19 additions & 0 deletions ci/environment-network.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: test_env
channels:
- conda-forge
dependencies:
- python
- aiohttp
- fsspec
- gcsfs
- h5netcdf
- intake
- netcdf4
- pip
- pydap
- pytest
- rasterio
- s3fs
- scikit-image
- xarray
- zarr
1 change: 1 addition & 0 deletions ci/environment-py36.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ channels:
dependencies:
- python=3.6
- aiohttp
- h5netcdf
- intake
- netcdf4
- pip
Expand Down
1 change: 1 addition & 0 deletions ci/environment-py37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ channels:
dependencies:
- python=3.7
- aiohttp
- h5netcdf
- intake
- netcdf4
- pip
Expand Down
1 change: 1 addition & 0 deletions ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ channels:
dependencies:
- python=3.8
- aiohttp
- h5netcdf
- intake
- netcdf4
- pip
Expand Down
18 changes: 18 additions & 0 deletions ci/environment-py39.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=3.9
- aiohttp
- h5netcdf
- intake
- netcdf4
- pip
- pydap
- pytest
- rasterio
- scikit-image
- xarray
- zarr
- pip:
- rangehttpserver
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ name: test_env
channels:
- conda-forge
dependencies:
- python=3.7
- python
- aiohttp
- h5netcdf
- netcdf4
- pip
- pydap
- pytest
- rasterio
- scikit-image
- xarray
- zarr
- pip:
- git+https://github.com/intake/filesystem_spec.git
- git+https://github.com/intake/intake.git
- git+https://github.com/pydata/xarray.git
- rangehttpserver
14 changes: 13 additions & 1 deletion intake_xarray/netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class NetCDFSource(DataSourceMixin, PatternMixin):
Whether to treat the path as a pattern (ie. ``data_{field}.nc``)
and create new coodinates in the output corresponding to pattern
fields. If str, is treated as pattern to match on. Default is True.
xarray_kwargs: dict
Additional xarray kwargs for xr.open_dataset() or xr.open_mfdataset().
storage_options: dict
If using a remote fs (whether caching locally or not), these are
the kwargs to pass to that FS.
Expand All @@ -54,6 +56,10 @@ def __init__(self, urlpath, chunks=None, combine=None, concat_dim=None,
self.storage_options = storage_options or {}
self.xarray_kwargs = xarray_kwargs or {}
self._ds = None
if isinstance(self.urlpath, list):
self._can_be_local = fsspec.utils.can_be_local(self.urlpath[0])
else:
self._can_be_local = fsspec.utils.can_be_local(self.urlpath)
super(NetCDFSource, self).__init__(metadata=metadata, **kwargs)

def _open_dataset(self):
Expand All @@ -76,8 +82,14 @@ def _open_dataset(self):
kwargs.update(concat_dim=self.concat_dim)
else:
_open_dataset = xr.open_dataset
url = fsspec.open_local(url, **self.storage_options)

if self._can_be_local:
url = fsspec.open_local(self.urlpath, **self.storage_options)
else:
# https://github.com/intake/filesystem_spec/issues/476#issuecomment-732372918
url = fsspec.open(self.urlpath, **self.storage_options).open()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the key line. This is what I do to make http paths "openable" in my xarray / fsspec code. But @martindurant has suggested it's not a good practice to drag around open file objects without explicitly closing them.

I'm very curious to hear Martin's view of whether this is kosher.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it doesn't feel right, but does make the tests pass! seems like we could end up with lots of unclosed files. The only other idea that's coming to mind is having to add code to xarray such that if a OpenFile instance is passed, a context manager gets used... But I'm definitely out of my depth here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will keep hold of the buffer until the file instance is cleaner up, but that's OK; the random-access version of the http file-like does not keep a socket open and doesn't need explicitly to be closed. The overall file system instance does hold a session open, but that should clean up on interpreter shutdown without problem.

The only time I could see this being a problem is where the path is something like an SSH tunnel (sftp) - but the garbage collection of a file-like is always supposed to work the same as calling close() or exiting a context.

Short story: I think it's fine.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has introduced a bug when using a list of url.

Whilst I understand the motivation for moving towards file-like objects - I wonder if the NetCDFSource should defer to the backend drivers provided by XArray to resolve local/remote/opendap URLs. Since the recent changes to fsspec for async it seems that something is broke for opendap endpoints (see fsspec/filesystem_spec#525).

Regardless perhaps the code should use fsspec.open_files instead to allow for a list of URLs:

        if "*" in url or isinstance(url, list):
            _open_dataset = xr.open_mfdataset
            _is_list = True
            if self.pattern:
                kwargs.update(preprocess=self._add_path_to_ds)
            if self.combine is not None:
                if 'combine' in kwargs:
                    raise Exception("Setting 'combine' argument twice  in the catalog is invalid")
                kwargs.update(combine=self.combine)
            if self.concat_dim is not None:
                if 'concat_dim' in kwargs:
                    raise Exception("Setting 'concat_dim' argument twice  in the catalog is invalid")
                kwargs.update(concat_dim=self.concat_dim)
        else:
            _is_list = False
            _open_dataset = xr.open_dataset

        if self._can_be_local:
            url = fsspec.open_local(self.urlpath, **self.storage_options)
        else:
            # https://github.com/intake/filesystem_spec/issues/476#issuecomment-732372918            
            url = [f.open() for f in fsspec.open_files(self.urlpath, **self.storage_options)]
            if not _is_list:
                url = url[0]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xarray is not using fsspec for opendap endpoints, unless I am very confused

Totally agree that we should support a list, regardless.


print(kwargs)
self._ds = _open_dataset(url, chunks=self.chunks, **kwargs)

def _add_path_to_ds(self, ds):
Expand Down
15 changes: 11 additions & 4 deletions intake_xarray/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ class RasterIOSource(DataSourceMixin, PatternMixin):
- ``s3://data/landsat8_band{band}.tif``
- ``s3://data/{location}/landsat8_band{band}.tif``
- ``{{ CATALOG_DIR }}data/landsat8_{start_date:%Y%m%d}_band{band}.tif``
chunks: int or dict
chunks: None or int or dict, optional
Chunks is used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays.
chunk for all arrays. default `None` loads numpy arrays.
path_as_pattern: bool or str, optional
Whether to treat the path as a pattern (ie. ``data_{field}.tif``)
and create new coodinates in the output corresponding to pattern
fields. If str, is treated as pattern to match on. Default is True.
"""
name = 'rasterio'

def __init__(self, urlpath, chunks, concat_dim='concat_dim',
def __init__(self, urlpath, chunks=None, concat_dim='concat_dim',
xarray_kwargs=None, metadata=None, path_as_pattern=True,
storage_options=None, **kwargs):
self.path_as_pattern = path_as_pattern
Expand Down Expand Up @@ -82,6 +82,7 @@ def _open_dataset(self):
files = fsspec.open_local(self.urlpath, **self.storage_options)
else:
files = self.urlpath
#files = fsspec.open(self.urlpath, **self.storage_options).open()
if isinstance(files, list):
self._ds = self._open_files(files)
else:
Expand Down Expand Up @@ -115,11 +116,17 @@ def _get_schema(self):
metadata[k] = v
except TypeError:
pass

if hasattr(self._ds.data, 'npartitions'):
npart = self._ds.data.npartitions
else:
npart = None

self._schema = Schema(
datashape=None,
dtype=str(self._ds.dtype),
shape=self._ds.shape,
npartitions=self._ds.data.npartitions,
npartitions=npart,
extra_metadata=metadata)

return self._schema
66 changes: 66 additions & 0 deletions intake_xarray/tests/test_network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Tests that read public data over the internet
import intake
import pytest
import xarray as xr
import s3fs
import gcsfs

# RasterIOSource
def test_open_rasterio_http():
prefix = 'https://landsat-pds.s3.us-west-2.amazonaws.com/L8/139/045'
image = 'LC81390452014295LGN00/LC81390452014295LGN00_B1.TIF'
url = f'{prefix}/{image}'
source = intake.open_rasterio(url,
chunks=dict(band=1))
ds = source.to_dask()
assert isinstance(ds, xr.core.dataarray.DataArray)


def test_open_rasterio_s3():
bucket = 's3://landsat-pds'
key = 'L8/139/045/LC81390452014295LGN00/LC81390452014295LGN00_B1.TIF'
url = f'{bucket}/{key}'
source = intake.open_rasterio(url,
chunks=dict(band=1),
storage_options = dict(anon=True))
ds = source.to_dask()
assert isinstance(ds, xr.core.dataarray.DataArray)


# NETCDFSource
def test_open_netcdf_gs():
bucket = 'gs://ldeo-glaciology'
key = 'bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'
url = f'{bucket}/{key}'
source = intake.open_netcdf(url,
chunks=3000,
xarray_kwargs=dict(engine='h5netcdf'),
)
ds = source.to_dask()
assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore)
assert isinstance(ds, xr.core.dataarray.Dataset)

def test_open_netcdf_s3():
bucket = 's3://its-live-data.jpl.nasa.gov'
key = 'icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
url = f'{bucket}/{key}'
source = intake.open_netcdf(url,
xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
storage_options=dict(anon=True),
)
ds = source.to_dask()
assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore)
assert isinstance(ds, xr.core.dataarray.Dataset)


def test_open_netcdf_s3_simplecache():
bucket = 's3://its-live-data.jpl.nasa.gov'
key = 'icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
url = f'simplecache::{bucket}/{key}'
source = intake.open_netcdf(url,
xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
storage_options=dict(s3={'anon': True}),
)
ds = source.to_dask()
assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore)
assert isinstance(ds, xr.core.dataarray.Dataset)
93 changes: 90 additions & 3 deletions intake_xarray/tests/test_remote.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import aiohttp
import intake
import os
import pytest
Expand All @@ -6,6 +7,8 @@
import time
import xarray as xr
import fsspec
import dask
import numpy

PORT = 8425 # for intake-server tests
here = os.path.abspath(os.path.dirname(__file__))
Expand Down Expand Up @@ -37,18 +40,102 @@ def data_server():
P.communicate()


def test_list(data_server):
def test_list_server_files(data_server):
test_files = ['RGB.byte.tif', 'example_1.nc', 'example_2.nc', 'little_green.tif', 'little_red.tif']
h = fsspec.filesystem("http")
out = h.glob(data_server + '/')
assert len(out) > 0
assert data_server+'/RGB.byte.tif' in out

assert set([data_server+'/'+x for x in test_files]).issubset(set(out))

# REMOTE GEOTIFF
def test_open_rasterio(data_server):
url = f'{data_server}/RGB.byte.tif'
source = intake.open_rasterio(url)
da = source.read()
assert isinstance(da, xr.core.dataarray.DataArray)
assert isinstance(da.data, numpy.ndarray)


def test_open_rasterio_dask(data_server):
url = f'{data_server}/RGB.byte.tif'
source = intake.open_rasterio(url, chunks={})
da = source.to_dask()
assert isinstance(da, xr.core.dataarray.DataArray)
assert isinstance(da.data, dask.array.core.Array)


def test_read_rasterio(data_server):
url = f'{data_server}/RGB.byte.tif'
source = intake.open_rasterio(url, chunks={})
da = source.read()
assert da.attrs['crs'] == '+init=epsg:32618'
assert da.attrs['AREA_OR_POINT'] == 'Area'
assert da.dtype == 'uint8'
assert da.isel(band=2,x=300,y=500).values == 129


def test_open_rasterio_auth(data_server):
url = f'{data_server}/RGB.byte.tif'
auth = dict(client_kwargs={'auth': aiohttp.BasicAuth('USER', 'PASS')})
# NOTE: if url startswith 'https' use 'https' instead of 'http' for storage_options
source = intake.open_rasterio(url,
storage_options=dict(http=auth))
source_auth = source.storage_options['http'].get('client_kwargs').get('auth')
assert isinstance(source_auth, aiohttp.BasicAuth)


def test_open_rasterio_simplecache(data_server):
url = f'simplecache::{data_server}/RGB.byte.tif'
source = intake.open_rasterio(url, chunks={})
da = source.to_dask()
assert isinstance(da, xr.core.dataarray.DataArray)


def test_open_rasterio_pattern(data_server):
url = [data_server+'/'+x for x in ('little_red.tif', 'little_green.tif')]
source = intake.open_rasterio(url,
path_as_pattern='{}/little_{color}.tif',
concat_dim='color',
chunks={})
da = source.to_dask()
assert isinstance(da, xr.core.dataarray.DataArray)
assert set(da.color.data) == set(['red', 'green'])
assert da.shape == (2, 3, 64, 64)


# REMOTE NETCDF / HDF
def test_open_netcdf(data_server):
url = f'{data_server}/example_1.nc'
source = intake.open_netcdf(url)
ds = source.to_dask()
assert isinstance(ds, xr.core.dataset.Dataset)
assert isinstance(ds.temp.data, numpy.ndarray)


def test_read_netcdf(data_server):
url = f'{data_server}/example_1.nc'
source = intake.open_netcdf(url)
ds = source.read()
assert ds['rh'].isel(lat=0,lon=0,time=0).values.dtype == 'float32'
assert ds['rh'].isel(lat=0,lon=0,time=0).values == 0.5


def test_open_netcdf_dask(data_server):
url = f'{data_server}/next_example_1.nc'
source = intake.open_netcdf(url, chunks={},
xarray_kwargs=dict(engine='h5netcdf'))
ds = source.to_dask()
assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore)
assert isinstance(ds, xr.core.dataset.Dataset)
assert isinstance(ds.temp.data, dask.array.core.Array)


def test_open_netcdf_simplecache(data_server):
url = f'simplecache::{data_server}/example_1.nc'
source = intake.open_netcdf(url, chunks={})
ds = source.to_dask()
assert isinstance(ds, xr.core.dataset.Dataset)
assert isinstance(ds.temp.data, dask.array.core.Array)


# Remote catalogs with intake-server
Expand Down