Skip to content

Commit

Permalink
Merge pull request #93 from scottyhq/http-netcdf
Browse files Browse the repository at this point in the history
Add logic for remote or local files in NetCDFSource
  • Loading branch information
martindurant committed Nov 24, 2020
2 parents ddff413 + dc2ec1f commit 2f4bfb3
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 16 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
CONDA_ENV: [36, 37, 38, 37-upstream]
CONDA_ENV: [py36, py37, py38, py39, upstream]
steps:
- name: Checkout
uses: actions/checkout@v2
Expand All @@ -24,7 +24,7 @@ jobs:
auto-update-conda: true
auto-activate-base: false
activate-environment: test_env
environment-file: ci/environment-py${{ matrix.CONDA_ENV }}.yml
environment-file: ci/environment-${{ matrix.CONDA_ENV }}.yml

- name: Development Install Intake-Xarray
shell: bash -l {0}
Expand All @@ -35,4 +35,4 @@ jobs:
- name: Run Tests
shell: bash -l {0}
run: |
pytest --verbose
pytest --verbose --ignore=intake_xarray/tests/test_network.py
5 changes: 5 additions & 0 deletions ci/environment-py36.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ channels:
dependencies:
- python=3.6
- aiohttp
- boto3
- flask
- h5netcdf
- intake
- netcdf4
- pip
- pydap
- pytest
- rasterio
- s3fs
- scikit-image
- xarray
- zarr
- pip:
- rangehttpserver
- moto[s3]
5 changes: 5 additions & 0 deletions ci/environment-py37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ channels:
dependencies:
- python=3.7
- aiohttp
- boto3
- flask
- h5netcdf
- intake
- netcdf4
- pip
- pydap
- pytest
- rasterio
- s3fs
- scikit-image
- xarray
- zarr
- pip:
- rangehttpserver
- moto[s3]
5 changes: 5 additions & 0 deletions ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ channels:
dependencies:
- python=3.8
- aiohttp
- boto3
- flask
- h5netcdf
- intake
- netcdf4
- pip
- pydap
- pytest
- rasterio
- s3fs
- scikit-image
- xarray
- zarr
- pip:
- rangehttpserver
- moto[s3]
22 changes: 22 additions & 0 deletions ci/environment-py39.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=3.9
- aiohttp
- boto3
- flask
- h5netcdf
- intake
- netcdf4
- pip
- pydap
- pytest
- rasterio
- s3fs
- scikit-image
- xarray
- zarr
- pip:
- rangehttpserver
- moto[s3]
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ name: test_env
channels:
- conda-forge
dependencies:
- python=3.7
- python
- aiohttp
- boto3
- flask
- h5netcdf
- netcdf4
- pip
- pydap
- pytest
- rasterio
- s3fs
- scikit-image
- xarray
- zarr
- pip:
- git+https://github.com/intake/filesystem_spec.git
- git+https://github.com/intake/intake.git
- git+https://github.com/pydata/xarray.git
- rangehttpserver
- moto[s3]
13 changes: 12 additions & 1 deletion intake_xarray/netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class NetCDFSource(DataSourceMixin, PatternMixin):
Whether to treat the path as a pattern (ie. ``data_{field}.nc``)
and create new coodinates in the output corresponding to pattern
fields. If str, is treated as pattern to match on. Default is True.
xarray_kwargs: dict
Additional xarray kwargs for xr.open_dataset() or xr.open_mfdataset().
storage_options: dict
If using a remote fs (whether caching locally or not), these are
the kwargs to pass to that FS.
Expand All @@ -54,6 +56,10 @@ def __init__(self, urlpath, chunks=None, combine=None, concat_dim=None,
self.storage_options = storage_options or {}
self.xarray_kwargs = xarray_kwargs or {}
self._ds = None
if isinstance(self.urlpath, list):
self._can_be_local = fsspec.utils.can_be_local(self.urlpath[0])
else:
self._can_be_local = fsspec.utils.can_be_local(self.urlpath)
super(NetCDFSource, self).__init__(metadata=metadata, **kwargs)

def _open_dataset(self):
Expand All @@ -76,7 +82,12 @@ def _open_dataset(self):
kwargs.update(concat_dim=self.concat_dim)
else:
_open_dataset = xr.open_dataset
url = fsspec.open_local(url, **self.storage_options)

if self._can_be_local:
url = fsspec.open_local(self.urlpath, **self.storage_options)
else:
# https://github.com/intake/filesystem_spec/issues/476#issuecomment-732372918
url = fsspec.open(self.urlpath, **self.storage_options).open()

self._ds = _open_dataset(url, chunks=self.chunks, **kwargs)

Expand Down
16 changes: 12 additions & 4 deletions intake_xarray/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ class RasterIOSource(DataSourceMixin, PatternMixin):
- ``s3://data/landsat8_band{band}.tif``
- ``s3://data/{location}/landsat8_band{band}.tif``
- ``{{ CATALOG_DIR }}data/landsat8_{start_date:%Y%m%d}_band{band}.tif``
chunks: int or dict
chunks: None or int or dict, optional
Chunks is used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays.
chunk for all arrays. default `None` loads numpy arrays.
path_as_pattern: bool or str, optional
Whether to treat the path as a pattern (ie. ``data_{field}.tif``)
and create new coodinates in the output corresponding to pattern
fields. If str, is treated as pattern to match on. Default is True.
"""
name = 'rasterio'

def __init__(self, urlpath, chunks, concat_dim='concat_dim',
def __init__(self, urlpath, chunks=None, concat_dim='concat_dim',
xarray_kwargs=None, metadata=None, path_as_pattern=True,
storage_options=None, **kwargs):
self.path_as_pattern = path_as_pattern
Expand Down Expand Up @@ -81,7 +81,9 @@ def _open_dataset(self):
if self._can_be_local:
files = fsspec.open_local(self.urlpath, **self.storage_options)
else:
# pass URLs to delegate remote opening to rasterio library
files = self.urlpath
#files = fsspec.open(self.urlpath, **self.storage_options).open()
if isinstance(files, list):
self._ds = self._open_files(files)
else:
Expand Down Expand Up @@ -115,11 +117,17 @@ def _get_schema(self):
metadata[k] = v
except TypeError:
pass

if hasattr(self._ds.data, 'npartitions'):
npart = self._ds.data.npartitions
else:
npart = None

self._schema = Schema(
datashape=None,
dtype=str(self._ds.dtype),
shape=self._ds.shape,
npartitions=self._ds.data.npartitions,
npartitions=npart,
extra_metadata=metadata)

return self._schema
66 changes: 66 additions & 0 deletions intake_xarray/tests/test_network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Tests that read public data over the internet
import intake
import pytest
import xarray as xr
import s3fs
import gcsfs

# RasterIOSource
def test_open_rasterio_http():
prefix = 'https://landsat-pds.s3.us-west-2.amazonaws.com/L8/139/045'
image = 'LC81390452014295LGN00/LC81390452014295LGN00_B1.TIF'
url = f'{prefix}/{image}'
source = intake.open_rasterio(url,
chunks=dict(band=1))
ds = source.to_dask()
assert isinstance(ds, xr.core.dataarray.DataArray)


def test_open_rasterio_s3():
bucket = 's3://landsat-pds'
key = 'L8/139/045/LC81390452014295LGN00/LC81390452014295LGN00_B1.TIF'
url = f'{bucket}/{key}'
source = intake.open_rasterio(url,
chunks=dict(band=1),
storage_options = dict(anon=True))
ds = source.to_dask()
assert isinstance(ds, xr.core.dataarray.DataArray)


# NETCDFSource
def test_open_netcdf_gs():
bucket = 'gs://ldeo-glaciology'
key = 'bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'
url = f'{bucket}/{key}'
source = intake.open_netcdf(url,
chunks=3000,
xarray_kwargs=dict(engine='h5netcdf'),
)
ds = source.to_dask()
assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore)
assert isinstance(ds, xr.core.dataarray.Dataset)

def test_open_netcdf_s3():
bucket = 's3://its-live-data.jpl.nasa.gov'
key = 'icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
url = f'{bucket}/{key}'
source = intake.open_netcdf(url,
xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
storage_options=dict(anon=True),
)
ds = source.to_dask()
assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore)
assert isinstance(ds, xr.core.dataarray.Dataset)


def test_open_netcdf_s3_simplecache():
bucket = 's3://its-live-data.jpl.nasa.gov'
key = 'icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
url = f'simplecache::{bucket}/{key}'
source = intake.open_netcdf(url,
xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
storage_options=dict(s3={'anon': True}),
)
ds = source.to_dask()
assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore)
assert isinstance(ds, xr.core.dataarray.Dataset)

0 comments on commit 2f4bfb3

Please sign in to comment.