diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 75bc953..ee08977 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -13,18 +13,18 @@ jobs: strategy: fail-fast: false matrix: - CONDA_ENV: [36, 37, 38, 37-upstream] + CONDA_ENV: [py36, py37, py38, py39, upstream] steps: - name: Checkout uses: actions/checkout@v2 - name: Setup Miniconda - uses: conda-incubator/setup-miniconda@v1 + uses: conda-incubator/setup-miniconda@v2 with: auto-update-conda: true auto-activate-base: false activate-environment: test_env - environment-file: ci/environment-py${{ matrix.CONDA_ENV }}.yml + environment-file: ci/environment-${{ matrix.CONDA_ENV }}.yml - name: Development Install Intake-Xarray shell: bash -l {0} @@ -35,4 +35,4 @@ jobs: - name: Run Tests shell: bash -l {0} run: | - pytest --verbose + pytest --verbose --ignore=intake_xarray/tests/test_network.py diff --git a/ci/environment-py36.yml b/ci/environment-py36.yml index 2522677..58ae5ab 100644 --- a/ci/environment-py36.yml +++ b/ci/environment-py36.yml @@ -4,14 +4,19 @@ channels: dependencies: - python=3.6 - aiohttp + - boto3 + - flask + - h5netcdf - intake - netcdf4 - pip - pydap - pytest - rasterio + - s3fs - scikit-image - xarray - zarr - pip: - rangehttpserver + - moto[s3] diff --git a/ci/environment-py37.yml b/ci/environment-py37.yml index 427f85f..72a3ddf 100644 --- a/ci/environment-py37.yml +++ b/ci/environment-py37.yml @@ -4,14 +4,19 @@ channels: dependencies: - python=3.7 - aiohttp + - boto3 + - flask + - h5netcdf - intake - netcdf4 - pip - pydap - pytest - rasterio + - s3fs - scikit-image - xarray - zarr - pip: - rangehttpserver + - moto[s3] diff --git a/ci/environment-py38.yml b/ci/environment-py38.yml index ef797e4..a0fe59b 100644 --- a/ci/environment-py38.yml +++ b/ci/environment-py38.yml @@ -4,14 +4,19 @@ channels: dependencies: - python=3.8 - aiohttp + - boto3 + - flask + - h5netcdf - intake - netcdf4 - pip - pydap - pytest - rasterio + - s3fs - scikit-image - xarray - zarr - pip: - rangehttpserver + - moto[s3] diff --git a/ci/environment-py39.yml b/ci/environment-py39.yml new file mode 100644 index 0000000..a43536a --- /dev/null +++ b/ci/environment-py39.yml @@ -0,0 +1,22 @@ +name: test_env +channels: + - conda-forge +dependencies: + - python=3.9 + - aiohttp + - boto3 + - flask + - h5netcdf + - intake + - netcdf4 + - pip + - pydap + - pytest + - rasterio + - s3fs + - scikit-image + - xarray + - zarr + - pip: + - rangehttpserver + - moto[s3] diff --git a/ci/environment-py37-upstream.yml b/ci/environment-upstream.yml similarity index 70% rename from ci/environment-py37-upstream.yml rename to ci/environment-upstream.yml index bd3f8be..b8177fc 100644 --- a/ci/environment-py37-upstream.yml +++ b/ci/environment-upstream.yml @@ -2,17 +2,22 @@ name: test_env channels: - conda-forge dependencies: - - python=3.7 + - python - aiohttp + - boto3 + - flask + - h5netcdf - netcdf4 - pip - pydap - pytest - rasterio + - s3fs - scikit-image - - xarray - zarr - pip: - git+https://github.com/intake/filesystem_spec.git - git+https://github.com/intake/intake.git + - git+https://github.com/pydata/xarray.git - rangehttpserver + - moto[s3] diff --git a/intake_xarray/netcdf.py b/intake_xarray/netcdf.py index 651bf5b..76d4e0b 100644 --- a/intake_xarray/netcdf.py +++ b/intake_xarray/netcdf.py @@ -37,6 +37,8 @@ class NetCDFSource(DataSourceMixin, PatternMixin): Whether to treat the path as a pattern (ie. ``data_{field}.nc``) and create new coodinates in the output corresponding to pattern fields. If str, is treated as pattern to match on. Default is True. + xarray_kwargs: dict + Additional xarray kwargs for xr.open_dataset() or xr.open_mfdataset(). storage_options: dict If using a remote fs (whether caching locally or not), these are the kwargs to pass to that FS. @@ -54,6 +56,10 @@ def __init__(self, urlpath, chunks=None, combine=None, concat_dim=None, self.storage_options = storage_options or {} self.xarray_kwargs = xarray_kwargs or {} self._ds = None + if isinstance(self.urlpath, list): + self._can_be_local = fsspec.utils.can_be_local(self.urlpath[0]) + else: + self._can_be_local = fsspec.utils.can_be_local(self.urlpath) super(NetCDFSource, self).__init__(metadata=metadata, **kwargs) def _open_dataset(self): @@ -76,7 +82,12 @@ def _open_dataset(self): kwargs.update(concat_dim=self.concat_dim) else: _open_dataset = xr.open_dataset - url = fsspec.open_local(url, **self.storage_options) + + if self._can_be_local: + url = fsspec.open_local(self.urlpath, **self.storage_options) + else: + # https://github.com/intake/filesystem_spec/issues/476#issuecomment-732372918 + url = fsspec.open(self.urlpath, **self.storage_options).open() self._ds = _open_dataset(url, chunks=self.chunks, **kwargs) diff --git a/intake_xarray/raster.py b/intake_xarray/raster.py index 5b7292c..7ad33c9 100644 --- a/intake_xarray/raster.py +++ b/intake_xarray/raster.py @@ -30,10 +30,10 @@ class RasterIOSource(DataSourceMixin, PatternMixin): - ``s3://data/landsat8_band{band}.tif`` - ``s3://data/{location}/landsat8_band{band}.tif`` - ``{{ CATALOG_DIR }}data/landsat8_{start_date:%Y%m%d}_band{band}.tif`` - chunks: int or dict + chunks: None or int or dict, optional Chunks is used to load the new dataset into dask arrays. ``chunks={}`` loads the dataset with dask using a single - chunk for all arrays. + chunk for all arrays. default `None` loads numpy arrays. path_as_pattern: bool or str, optional Whether to treat the path as a pattern (ie. ``data_{field}.tif``) and create new coodinates in the output corresponding to pattern @@ -41,7 +41,7 @@ class RasterIOSource(DataSourceMixin, PatternMixin): """ name = 'rasterio' - def __init__(self, urlpath, chunks, concat_dim='concat_dim', + def __init__(self, urlpath, chunks=None, concat_dim='concat_dim', xarray_kwargs=None, metadata=None, path_as_pattern=True, storage_options=None, **kwargs): self.path_as_pattern = path_as_pattern @@ -81,7 +81,9 @@ def _open_dataset(self): if self._can_be_local: files = fsspec.open_local(self.urlpath, **self.storage_options) else: + # pass URLs to delegate remote opening to rasterio library files = self.urlpath + #files = fsspec.open(self.urlpath, **self.storage_options).open() if isinstance(files, list): self._ds = self._open_files(files) else: @@ -115,11 +117,17 @@ def _get_schema(self): metadata[k] = v except TypeError: pass + + if hasattr(self._ds.data, 'npartitions'): + npart = self._ds.data.npartitions + else: + npart = None + self._schema = Schema( datashape=None, dtype=str(self._ds.dtype), shape=self._ds.shape, - npartitions=self._ds.data.npartitions, + npartitions=npart, extra_metadata=metadata) return self._schema diff --git a/intake_xarray/tests/test_network.py b/intake_xarray/tests/test_network.py new file mode 100644 index 0000000..4526b39 --- /dev/null +++ b/intake_xarray/tests/test_network.py @@ -0,0 +1,66 @@ +# Tests that read public data over the internet +import intake +import pytest +import xarray as xr +import s3fs +import gcsfs + +# RasterIOSource +def test_open_rasterio_http(): + prefix = 'https://landsat-pds.s3.us-west-2.amazonaws.com/L8/139/045' + image = 'LC81390452014295LGN00/LC81390452014295LGN00_B1.TIF' + url = f'{prefix}/{image}' + source = intake.open_rasterio(url, + chunks=dict(band=1)) + ds = source.to_dask() + assert isinstance(ds, xr.core.dataarray.DataArray) + + +def test_open_rasterio_s3(): + bucket = 's3://landsat-pds' + key = 'L8/139/045/LC81390452014295LGN00/LC81390452014295LGN00_B1.TIF' + url = f'{bucket}/{key}' + source = intake.open_rasterio(url, + chunks=dict(band=1), + storage_options = dict(anon=True)) + ds = source.to_dask() + assert isinstance(ds, xr.core.dataarray.DataArray) + + +# NETCDFSource +def test_open_netcdf_gs(): + bucket = 'gs://ldeo-glaciology' + key = 'bedmachine/BedMachineAntarctica_2019-11-05_v01.nc' + url = f'{bucket}/{key}' + source = intake.open_netcdf(url, + chunks=3000, + xarray_kwargs=dict(engine='h5netcdf'), + ) + ds = source.to_dask() + assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore) + assert isinstance(ds, xr.core.dataarray.Dataset) + +def test_open_netcdf_s3(): + bucket = 's3://its-live-data.jpl.nasa.gov' + key = 'icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5' + url = f'{bucket}/{key}' + source = intake.open_netcdf(url, + xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'), + storage_options=dict(anon=True), + ) + ds = source.to_dask() + assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore) + assert isinstance(ds, xr.core.dataarray.Dataset) + + +def test_open_netcdf_s3_simplecache(): + bucket = 's3://its-live-data.jpl.nasa.gov' + key = 'icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5' + url = f'simplecache::{bucket}/{key}' + source = intake.open_netcdf(url, + xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'), + storage_options=dict(s3={'anon': True}), + ) + ds = source.to_dask() + assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore) + assert isinstance(ds, xr.core.dataarray.Dataset) diff --git a/intake_xarray/tests/test_remote.py b/intake_xarray/tests/test_remote.py index a337535..8fa8a42 100644 --- a/intake_xarray/tests/test_remote.py +++ b/intake_xarray/tests/test_remote.py @@ -1,3 +1,5 @@ +# Tests for intake-server, local HTTP file server, local "S3" object server +import aiohttp import intake import os import pytest @@ -6,8 +8,10 @@ import time import xarray as xr import fsspec +import dask +import numpy +import s3fs -PORT = 8425 # for intake-server tests here = os.path.abspath(os.path.dirname(__file__)) cat_file = os.path.join(here, 'data', 'catalog.yaml') DIRECTORY = os.path.join(here, 'data') @@ -37,23 +41,202 @@ def data_server(): P.communicate() -def test_list(data_server): +def test_http_server_files(data_server): + test_files = ['RGB.byte.tif', 'example_1.nc', 'example_2.nc', 'little_green.tif', 'little_red.tif'] h = fsspec.filesystem("http") out = h.glob(data_server + '/') assert len(out) > 0 - assert data_server+'/RGB.byte.tif' in out + assert set([data_server+'/'+x for x in test_files]).issubset(set(out)) +# REMOTE GEOTIFF +def test_http_open_rasterio(data_server): + url = f'{data_server}/RGB.byte.tif' + source = intake.open_rasterio(url) + da = source.to_dask() + assert isinstance(da, xr.core.dataarray.DataArray) + + +def test_http_read_rasterio(data_server): + url = f'{data_server}/RGB.byte.tif' + source = intake.open_rasterio(url) + da = source.read() + assert da.attrs['crs'] == '+init=epsg:32618' + assert da.attrs['AREA_OR_POINT'] == 'Area' + assert da.dtype == 'uint8' + assert da.isel(band=2,x=300,y=500).values == 129 -def test_open_rasterio(data_server): + +def test_http_open_rasterio_dask(data_server): url = f'{data_server}/RGB.byte.tif' source = intake.open_rasterio(url, chunks={}) da = source.to_dask() assert isinstance(da, xr.core.dataarray.DataArray) + assert isinstance(da.data, dask.array.core.Array) + + +def test_http_open_rasterio_auth(data_server): + url = f'{data_server}/RGB.byte.tif' + auth = dict(client_kwargs={'auth': aiohttp.BasicAuth('USER', 'PASS')}) + # NOTE: if url startswith 'https' use 'https' instead of 'http' for storage_options + source = intake.open_rasterio(url, + storage_options=dict(http=auth)) + source_auth = source.storage_options['http'].get('client_kwargs').get('auth') + assert isinstance(source_auth, aiohttp.BasicAuth) + + +def test_http_read_rasterio_simplecache(data_server): + url = f'simplecache::{data_server}/RGB.byte.tif' + source = intake.open_rasterio(url, chunks={}) + da = source.to_dask() + assert isinstance(da, xr.core.dataarray.DataArray) + + +def test_http_read_rasterio_pattern(data_server): + url = [data_server+'/'+x for x in ('little_red.tif', 'little_green.tif')] + source = intake.open_rasterio(url, + path_as_pattern='{}/little_{color}.tif', + concat_dim='color', + chunks={}) + da = source.to_dask() + assert isinstance(da, xr.core.dataarray.DataArray) + assert set(da.color.data) == set(['red', 'green']) + assert da.shape == (2, 3, 64, 64) + + +# REMOTE NETCDF / HDF +def test_http_open_netcdf(data_server): + url = f'{data_server}/example_1.nc' + source = intake.open_netcdf(url) + ds = source.to_dask() + assert isinstance(ds, xr.core.dataset.Dataset) + assert isinstance(ds.temp.data, numpy.ndarray) + + +def test_http_read_netcdf(data_server): + url = f'{data_server}/example_1.nc' + source = intake.open_netcdf(url) + ds = source.read() + assert ds['rh'].isel(lat=0,lon=0,time=0).values.dtype == 'float32' + assert ds['rh'].isel(lat=0,lon=0,time=0).values == 0.5 + + +def test_http_read_netcdf_dask(data_server): + url = f'{data_server}/next_example_1.nc' + source = intake.open_netcdf(url, chunks={}, + xarray_kwargs=dict(engine='h5netcdf')) + ds = source.to_dask() + assert isinstance(ds._file_obj, xr.backends.h5netcdf_.H5NetCDFStore) + assert isinstance(ds, xr.core.dataset.Dataset) + assert isinstance(ds.temp.data, dask.array.core.Array) + + +def test_http_read_netcdf_simplecache(data_server): + url = f'simplecache::{data_server}/example_1.nc' + source = intake.open_netcdf(url, chunks={}) + ds = source.to_dask() + assert isinstance(ds, xr.core.dataset.Dataset) + assert isinstance(ds.temp.data, dask.array.core.Array) + + +# S3 +#based on: https://github.com/dask/s3fs/blob/master/s3fs/tests/test_s3fs.py +test_bucket_name = "test" +PORT_S3 = 8001 +endpoint_uri = "http://localhost:%s" % PORT_S3 +test_files = ['RGB.byte.tif', 'example_1.nc'] + +@pytest.fixture() +def s3_base(): + # writable local S3 system + import shlex + import subprocess + + proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % PORT_S3)) + + timeout = 5 + while timeout > 0: + try: + r = requests.get(endpoint_uri) + if r.ok: + break + except: + pass + timeout -= 0.1 + time.sleep(0.1) + yield + proc.terminate() + proc.wait() + + +@pytest.fixture(scope='function') +def aws_credentials(): + """Mocked AWS Credentials for moto.""" + os.environ['AWS_ACCESS_KEY_ID'] = 'testing' + os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing' + os.environ['AWS_SECURITY_TOKEN'] = 'testing' + os.environ['AWS_SESSION_TOKEN'] = 'testing' + + +@pytest.fixture() +def s3(s3_base, aws_credentials): + ''' anonymous access local s3 bucket for testing ''' + from botocore.session import Session + session = Session() + client = session.create_client("s3", endpoint_url=endpoint_uri) + client.create_bucket(Bucket=test_bucket_name, ACL="public-read") + + for file_name in [os.path.join(DIRECTORY,x) for x in test_files]: + with open(file_name, 'rb') as f: + data = f.read() + key = os.path.basename(file_name) + client.put_object(Bucket=test_bucket_name, Key=key, Body=data) + + # Make sure cache not being used + s3fs.S3FileSystem.clear_instance_cache() + s3 = s3fs.S3FileSystem(anon=True, client_kwargs={"endpoint_url": endpoint_uri}) + s3.invalidate_cache() + yield + + +def test_s3_list_files(s3): + s3 = s3fs.S3FileSystem(anon=True, client_kwargs={"endpoint_url": endpoint_uri}) + files = s3.ls(test_bucket_name) + assert len(files) > 0 + assert set([test_bucket_name+'/'+x for x in test_files]).issubset(set(files)) + + +def test_s3_read_rasterio(s3): + # Lots of GDAL Environment variables needed for this to work ! + # https://gdal.org/user/virtual_file_systems.html#vsis3-aws-s3-files + os.environ['AWS_NO_SIGN_REQUEST']='YES' + os.environ['AWS_S3_ENDPOINT'] = endpoint_uri.lstrip('http://') + os.environ['AWS_VIRTUAL_HOSTING']= 'FALSE' + os.environ['AWS_HTTPS']= 'NO' + os.environ['GDAL_DISABLE_READDIR_ON_OPEN']='EMPTY_DIR' + os.environ['CPL_CURL_VERBOSE']='YES' + url = f's3://{test_bucket_name}/RGB.byte.tif' + source = intake.open_rasterio(url) + da = source.read() + assert da.attrs['crs'] == '+init=epsg:32618' + assert da.attrs['AREA_OR_POINT'] == 'Area' + assert da.dtype == 'uint8' + assert da.isel(band=2,x=300,y=500).values == 129 + + +def test_s3_read_netcdf(s3): + url = f's3://{test_bucket_name}/example_1.nc' + s3options = dict(client_kwargs={"endpoint_url": endpoint_uri}) + source = intake.open_netcdf(url, + storage_options=s3options) + ds = source.read() + assert ds['rh'].isel(lat=0,lon=0,time=0).values.dtype == 'float32' + assert ds['rh'].isel(lat=0,lon=0,time=0).values == 0.5 # Remote catalogs with intake-server @pytest.fixture(scope='module') def intake_server(): + PORT = 8002 command = ['intake-server', '-p', str(PORT), cat_file] try: P = subprocess.Popen(command) @@ -72,7 +255,7 @@ def intake_server(): P.communicate() -def test_remote_netcdf(intake_server): +def test_intake_server_netcdf(intake_server): cat_local = intake.open_catalog(cat_file) cat = intake.open_catalog(intake_server) assert 'xarray_source' in cat @@ -88,7 +271,7 @@ def test_remote_netcdf(intake_server): cat_local.xarray_source.read()).all() -def test_remote_tiff(intake_server): +def test_intake_server_tiff(intake_server): pytest.importorskip('rasterio') cat_local = intake.open_catalog(cat_file) cat = intake.open_catalog(intake_server)