Skip to content

Commit

Permalink
Merge pull request #78 from tethys-ts/dev
Browse files Browse the repository at this point in the history
minor performance improvements and upgraded hdf5tools
  • Loading branch information
mullenkamp committed Nov 5, 2022
2 parents 50dd6d5 + d9ba7e3 commit a1c1ec7
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 257 deletions.
4 changes: 2 additions & 2 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% set name = "tethysts" %}
{% set version = "4.5.4" %}
{% set version = "4.5.6" %}
# {% set sha256 = "ae2cc83fb5a75e8dc3e1b2c2137deea412c8a4c7c9acca52bf4ec59de52a80c9" %}

# sha256 is the prefered checksum -- you can get it for a file with:
Expand Down Expand Up @@ -44,7 +44,7 @@ requirements:
- requests
- shapely
- tethys-data-models >=0.4.11
- hdf5tools >=0.0.7
- hdf5tools >=0.1.4
- s3tethys >=0.0.4

test:
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
name = 'tethysts'
main_package = 'tethysts'
datasets = 'datasets/time_series'
version = '4.5.4'
version = '4.5.6'
descrip = 'tethys time series S3 extraction'

# The below code is for readthedocs. To have sphinx/readthedocs interact with
Expand All @@ -19,7 +19,7 @@
if os.environ.get('READTHEDOCS', False) == 'True':
INSTALL_REQUIRES = []
else:
INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.0.7', 's3tethys>=0.0.4']
INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.4', 's3tethys>=0.0.4']

# Get the long description from the README file
with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f:
Expand Down
31 changes: 0 additions & 31 deletions tethysts/datasets/__init__.py

This file was deleted.

43 changes: 22 additions & 21 deletions tethysts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import copy
# from multiprocessing.pool import ThreadPool
import concurrent.futures
from tethysts.utils import read_json_zstd, spatial_query, get_nearest_from_extent, public_remote_key, chunk_filters, download_results, make_run_date_key, update_nested, results_concat
from tethysts import utils
# import utils
from s3tethys import get_object_s3, s3_client, decompress_stream_to_object
from typing import List, Union
import tethys_data_models as tdm
Expand Down Expand Up @@ -98,10 +99,10 @@ def __init__(self, remotes: List[tdm.base.Remote] = None, cache: Union[pathlib.P
_ = self.get_datasets(remotes)

elif remotes is None:
resp = requests.get(public_remote_key)
resp = requests.get(utils.public_remote_key)
resp.raise_for_status()

remotes = read_json_zstd(resp.content)
remotes = utils.read_json_zstd(resp.content)
_ = self.get_datasets(remotes)

elif remotes != 'pass':
Expand Down Expand Up @@ -251,7 +252,7 @@ def get_stations(self,
stn_obj = get_object_s3(**remote)
stn_list = orjson.loads(decompress_stream_to_object(stn_obj, 'zstd').read())
stn_dict = {s['station_id']: s for s in stn_list if isinstance(s, dict)}
update_nested(self._stations, dataset_id, vd, stn_dict)
utils.update_nested(self._stations, dataset_id, vd, stn_dict)
except:
print('No stations.json.zst file in S3 bucket')
return None
Expand All @@ -266,7 +267,7 @@ def get_stations(self,
stn_dict = {s_id: s for s_id, s in stn_dict.items() if pd.Timestamp(s['time_range']['to_date']) <= to_date1}

## Spatial query
stn_ids = spatial_query(stn_dict, geometry, lat, lon, distance)
stn_ids = utils.spatial_query(stn_dict, geometry, lat, lon, distance)

if isinstance(stn_ids, list):
stn_list1 = [stn_dict[s] for s in stn_ids]
Expand All @@ -289,7 +290,7 @@ def _get_stns_rc_key(self, dataset_id: str, key_name, version_date: str = None):
# Check version_date
version_date = self._get_version_date(dataset_id, version_date)

vd_key = make_run_date_key(version_date)
vd_key = utils.make_run_date_key(version_date)
stn_key = self._key_patterns[system_version][key_name].format(dataset_id=dataset_id, version_date=vd_key)

return stn_key
Expand Down Expand Up @@ -319,7 +320,7 @@ def _get_results_chunks(self, dataset_id: str, version_date: str = None):

rc_list = orjson.loads(decompress_stream_to_object(stn_obj, 'zstd').read())

update_nested(self._results_chunks, dataset_id, version_date, rc_list)
utils.update_nested(self._results_chunks, dataset_id, version_date, rc_list)

return rc_list

Expand Down Expand Up @@ -433,7 +434,7 @@ def get_results(self,
bands: Union[List[int], int] = None,
squeeze_dims: bool = False,
output_path: Union[str, pathlib.Path] = None,
compression: str = 'gzip',
compression: str = 'lzf',
threads: int = 30,
# include_chunk_vars: bool = False
):
Expand Down Expand Up @@ -513,14 +514,14 @@ def get_results(self,
stn_dict = self._stations[dataset_id][vd]

# Run the spatial query
stn_ids = spatial_query(stn_dict, geometry, lat, lon, None)
stn_ids = utils.spatial_query(stn_dict, geometry, lat, lon, None)
else:
raise ValueError('A station_id, point geometry or a combination of lat and lon must be passed.')

## Get results chunks
rc_list = self._get_results_chunks(dataset_id, vd)

chunks = chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands)
chunks = utils.chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands)

if chunks:

Expand All @@ -530,39 +531,39 @@ def get_results(self,
if not 'public_url' in remote:
s3 = s3_client(remote['connection_config'], threads)
remote['s3'] = s3

futures = []
for chunk in chunks:
remote['chunk'] = chunk
remote['from_date'] = from_date
remote['to_date'] = to_date
f = executor.submit(download_results, **remote)
f = executor.submit(utils.download_results, **remote)
futures.append(f)
runs = concurrent.futures.wait(futures)

results_list = [r.result() for r in runs[0]]

## Clear xarray cache...because it loves caching everything...
## This is to ensure that xarray will open the file rather than opening a cache
## The next xarray version should have this issue fixed:
## https://github.com/pydata/xarray/pull/4879
xr.backends.file_manager.FILE_CACHE.clear()

## combine results
xr3 = results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, from_mod_date=from_mod_date, to_mod_date=to_mod_date, compression=compression)
xr3 = utils.results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, compression=compression)

## Convert to new version
attrs = xr3.attrs.copy()
if 'version' in attrs:
attrs['system_version'] = attrs.pop('version')

## Extra spatial query if data are stored in blocks
if ('grid' in result_type) and ((geom_type == 'Point') or (isinstance(lat, float) and isinstance(lon, float))):
xr3 = get_nearest_from_extent(xr3, geometry, lat, lon)
xr3 = utils.get_nearest_from_extent(xr3, geometry, lat, lon)

## Filters
xr3.attrs['version_date'] = pd.Timestamp(vd).tz_localize(None).isoformat()

if squeeze_dims:
xr3 = xr3.squeeze_dims()

Expand Down
Binary file removed tethysts/tests/test1.h5
Binary file not shown.

0 comments on commit a1c1ec7

Please sign in to comment.