Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow simplecache #14

Merged
merged 18 commits into from
Feb 16, 2021
1 change: 1 addition & 0 deletions ci/environment-upstream-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies:
- xarray
- pydap
- aiohttp
- h5netcdf
- pip:
- git+https://github.com/intake/intake.git
- git+https://github.com/Unidata/siphon.git
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ dependencies:
- xarray
- pydap
- aiohttp
- h5netcdf
37 changes: 29 additions & 8 deletions intake_thredds/cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,28 @@
class ThreddsCatalog(Catalog):
name = 'thredds_cat'

def __init__(self, url, **kwargs):
def __init__(self, url, driver='opendap', **kwargs):
self.url = url
self.driver = driver
super().__init__(**kwargs)

def _load(self):
from siphon.catalog import TDSCatalog

self.cat = TDSCatalog(self.url)
if 'simplecache::' in self.url:
if self.driver == 'netcdf':
self.cache = True
self.url_no_simplecache = self.url.replace('simplecache::', '')
self.metadata.update({'cache': 'simplecache::'})
else:
raise ValueError(
f'simplecache requires driver="netcdf", found driver="{self.driver}".'
)
else:
self.cache = False
self.url_no_simplecache = self.url

self.cat = TDSCatalog(self.url_no_simplecache)
self.name = self.cat.catalog_name
self.metadata.update(self.cat.metadata)

Expand All @@ -26,24 +40,31 @@ def _load(self):
{'url': r.href},
[],
[],
{},
self.metadata,
None,
catalog=self,
)
for r in self.cat.catalog_refs.values()
}

# data entries (only those with opendap links)
def access_urls(ds, self):
if self.driver == 'opendap':
driver_for_access_urls = 'OPENDAP'
elif self.driver == 'netcdf':
driver_for_access_urls = 'HTTPServer'
url = ds.access_urls[driver_for_access_urls]
if 'cache' in self.metadata.keys():
url = f'{self.metadata["cache"]}{url}'
return url

self._entries.update(
{
ds.name: LocalCatalogEntry(
ds.name,
'THREDDS data',
# 'netcdf',
'opendap',
self.driver,
True,
# {'urlpath': ds.access_urls['HTTPServer'], 'chunks': None},
{'urlpath': ds.access_urls['OPENDAP'], 'chunks': None},
{'urlpath': access_urls(ds, self), 'chunks': None},
[],
[],
{},
Expand Down
11 changes: 8 additions & 3 deletions intake_thredds/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class THREDDSMergedSource(DataSourceMixin):
name = 'thredds_merged'
partition_access = True

def __init__(self, url, path, progressbar=True, metadata=None):
def __init__(self, url, path, driver='opendap', progressbar=True, metadata=None):
"""

Parameters
Expand All @@ -26,6 +26,8 @@ def __init__(self, url, path, progressbar=True, metadata=None):
path : list of str
Subcats to follow; include glob characters (*, ?) in here for
matching
driver : str
Select driver to access data. Choose from 'netcdf' and 'opendap'.
progressbar : bool
If True, will print a progress bar. Requires `tqdm <https://github.com/tqdm/tqdm>`__
to be installed.
Expand All @@ -34,7 +36,10 @@ def __init__(self, url, path, progressbar=True, metadata=None):
"""
super(THREDDSMergedSource, self).__init__(metadata=metadata)
self.urlpath = url
if 'simplecache' in url:
self.metadata.update({'cache': 'simplecache::'})
self.path = path
self.driver = driver
self._ds = None
self.progressbar = progressbar
if self.progressbar and tqdm is None:
Expand All @@ -44,11 +49,11 @@ def _open_dataset(self):
import xarray as xr

if self._ds is None:
cat = ThreddsCatalog(self.urlpath)
cat = ThreddsCatalog(self.urlpath, driver=self.driver)
for i in range(len(self.path)):
part = self.path[i]
if '*' not in part and '?' not in part:
cat = cat[part]()
cat = cat[part](driver=self.driver)
else:
break
path = self.path[i:]
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ intake-xarray>=0.3
intake>=0.6.0
pydap
siphon
h5netcdf
51 changes: 43 additions & 8 deletions tests/test_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

@pytest.fixture(scope='module')
def thredds_cat_url():
"""Single file thredds catalog."""
return 'https://psl.noaa.gov/thredds/catalog/Datasets/noaa.ersst/catalog.xml'


def test_init_catalog(thredds_cat_url):
def test_ThreddsCatalog_init_catalog(thredds_cat_url):
"""Test initialization of ThreddsCatalog."""
cat = intake.open_thredds_cat(thredds_cat_url)
assert isinstance(cat, intake.catalog.Catalog)
assert cat.metadata == cat.cat.metadata
Expand All @@ -21,18 +23,51 @@ def test_init_catalog(thredds_cat_url):
assert 'random_attribute' in cat.metadata


def test_entry(thredds_cat_url):
cat = intake.open_thredds_cat(thredds_cat_url)
entry = cat['err.mnmean.v3.nc']
assert isinstance(entry, intake_xarray.opendap.OpenDapSource)
@pytest.mark.parametrize('driver', ['netcdf', 'opendap'])
def test_ThreddsCatalog(thredds_cat_url, driver):
"""Test entry.to_dask() is xr.Dataset and allows opendap and netcdf as source."""
cat = intake.open_thredds_cat(thredds_cat_url, driver=driver)
entry = cat['sst.mon.19712000.ltm.v3.nc']
if driver == 'opendap':
assert isinstance(entry, intake_xarray.opendap.OpenDapSource)
elif driver == 'netcdf':
assert isinstance(entry, intake_xarray.netcdf.NetCDFSource)
d = entry.describe()
assert d['name'] == 'err.mnmean.v3.nc'
assert d['name'] == 'sst.mon.19712000.ltm.v3.nc'
assert d['container'] == 'xarray'
assert d['plugin'] == ['opendap']
assert d['plugin'] == [driver]
if driver == 'opendap':
loc = 'dodsC'
elif driver == 'netcdf':
loc = 'fileServer'
assert (
d['args']['urlpath']
== 'https://psl.noaa.gov/thredds/dodsC/Datasets/noaa.ersst/err.mnmean.v3.nc'
== f'https://psl.noaa.gov/thredds/{loc}/Datasets/noaa.ersst/sst.mon.19712000.ltm.v3.nc'
)
ds = entry(chunks={}).to_dask()
assert isinstance(ds, xr.Dataset)


def test_ThreddsCatalog_simplecache_netcdf(thredds_cat_url):
"""Test that ThreddsCatalog allows simplecache:: in url if netcdf as source."""
import os

import fsspec

fsspec.config.conf['simplecache'] = {'cache_storage': 'my_caching_folder', 'same_names': True}
cat = intake.open_thredds_cat(f'simplecache::{thredds_cat_url}', driver='netcdf')
entry = cat['sst.mon.19712000.ltm.v3.nc']
ds = entry(chunks={}).to_dask()
assert isinstance(ds, xr.Dataset)
# test files present
cached_file = 'my_caching_folder/sst.mon.19712000.ltm.v3.nc'
assert os.path.exists(cached_file)
os.remove(cached_file)
assert not os.path.exists(cached_file)


def test_ThreddsCatalog_simplecache_fails_opendap(thredds_cat_url):
"""Test that ThreddsCatalog simplecache:: in url with opendap."""
with pytest.raises(ValueError) as e:
intake.open_thredds_cat(f'simplecache::{thredds_cat_url}', driver='opendap')
assert 'simplecache requires driver="netcdf"' in str(e.value)
99 changes: 84 additions & 15 deletions tests/test_source.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,98 @@
import intake
import pytest
import xarray as xr


@pytest.fixture(scope='module')
def thredds_cat_url():
return 'http://dap.nci.org.au/thredds/catalog.xml'


def test_thredds_merge(thredds_cat_url):
def THREDDSMergedSource_cat():
"""THREDDSMergedSource looping through folders."""
thredds_cat_url = 'https://psl.noaa.gov/thredds/catalog.xml'
paths = [
'eMAST TERN',
'eMAST TERN - files',
'ASCAT',
'ASCAT_v1-0_soil-moisture_daily_0-05deg_2007-2011',
'00000000',
'*12.nc', # to speed up only takes all december files
'Datasets',
'ncep.reanalysis.dailyavgs',
'surface',
'air*sig995*194*.nc', # todo: convert . to * ?
]
cat = intake.open_thredds_merged(thredds_cat_url, paths)
assert cat.urlpath == thredds_cat_url
assert cat.path == paths
return cat


@pytest.fixture(scope='module')
def THREDDSMergedSource_cat_short_url():
return 'https://psl.noaa.gov/thredds/catalog/Datasets/ncep.reanalysis.dailyavgs/surface/catalog.xml'


@pytest.fixture(scope='module')
def THREDDSMergedSource_cat_short_path():
return ['air.sig995*194*.nc'] # todo: convert . to * ?


@pytest.fixture(scope='module')
def THREDDSMergedSource_cat_short(
THREDDSMergedSource_cat_short_url, THREDDSMergedSource_cat_short_path
):
"""THREDDSMergedSource without the looping faster."""
cat = intake.open_thredds_merged(
THREDDSMergedSource_cat_short_url, THREDDSMergedSource_cat_short_path
)
assert cat.urlpath == THREDDSMergedSource_cat_short_url
assert cat.path == THREDDSMergedSource_cat_short_path
return cat


@pytest.fixture(scope='module')
def THREDDSMergedSource_cat_short_simplecache(
THREDDSMergedSource_cat_short_url, THREDDSMergedSource_cat_short_path
):
"""Without the looping faster."""
return intake.open_thredds_merged(
f'simplecache::{THREDDSMergedSource_cat_short_url}',
THREDDSMergedSource_cat_short_path,
driver='netcdf',
)


def test_THREDDSMergedSource(THREDDSMergedSource_cat):
cat = THREDDSMergedSource_cat
ds = cat.to_dask()
assert dict(ds.dims) == {'lat': 681, 'lon': 841, 'time': 155}
assert dict(ds.dims) == {'lat': 73, 'lon': 144, 'time': 731}
d = cat.discover()
assert set(d['metadata']['coords']) == set(('lat', 'lon', 'time'))
assert set(d['metadata']['data_vars'].keys()) == set(
['crs', 'lwe_thickness_of_soil_moisture_content']
)
assert set(d['metadata']['data_vars'].keys()) == set(['air'])


def test_THREDDSMergedSource_long_short(THREDDSMergedSource_cat, THREDDSMergedSource_cat_short):
ds = THREDDSMergedSource_cat.to_dask()
ds_short = THREDDSMergedSource_cat_short.to_dask()
xr.testing.assert_equal(ds, ds_short) # TODO: down load data only compare dims, coords, size


def test_THREDDSMergedSource_simplecache_netcdf(THREDDSMergedSource_cat_short_simplecache):
"""Test that THREDDSMergedSource allows simplecache:: in url if netcdf as source."""
import os

import fsspec

cache_storage = 'my_caching_folder'
fsspec.config.conf['simplecache'] = {'cache_storage': cache_storage, 'same_names': True}
cat = THREDDSMergedSource_cat_short_simplecache
ds = cat.to_dask()
assert isinstance(ds, xr.Dataset)
# test files present
cached_files = ['air.sig995.1948.nc', 'air.sig995.1949.nc']
for f in cached_files:
cached_file = os.path.join(cache_storage, f)
assert os.path.exists(cached_file)
os.remove(cached_file)
assert not os.path.exists(cached_file)


def test_THREDDSMergedSource_simplecache_fails_opendap(THREDDSMergedSource_cat_short_url):
"""Test that THREDDSMergedSource simplecache:: in url with opendap."""
with pytest.raises(ValueError) as e:
intake.open_thredds_cat(
f'simplecache::{THREDDSMergedSource_cat_short_url}', driver='opendap'
)
assert 'simplecache requires driver="netcdf"' in str(e.value)
aaronspring marked this conversation as resolved.
Show resolved Hide resolved