Skip to content

Commit

Permalink
Merge pull request earthobservations#169 from panodata/improve-caching
Browse files Browse the repository at this point in the history
Add TTL-based persistent caching using dogpile.cache
  • Loading branch information
gutzbenj committed Sep 21, 2020
2 parents daee29d + 47e6863 commit eaca235
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Expand Up @@ -4,6 +4,7 @@ Changelog
Development
===========

- Add TTL-based persistent caching using dogpile.cache
- Add ``example/radolan.py`` and adjust documentation

0.7.0 (16.09.2020)
Expand Down
67 changes: 44 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Expand Up @@ -78,8 +78,10 @@ dateparser = "^0.7.4"
beautifulsoup4 = "^4.9.1"
requests = "^2.24.0"
python-dateutil = "^2.8.0"
"dogpile.cache" = "^1.0.2"
appdirs = "^1.4.4"

importlib_metadata = {version = "1.6.1", python = "<3.8"}
importlib_metadata = {version = "1.7.0", python = "<3.8"}

# Optional dependencies aka. "extras"
ipython = { version = "^7.10.1", optional = true }
Expand Down
47 changes: 47 additions & 0 deletions wetterdienst/additionals/cache.py
@@ -0,0 +1,47 @@
import os
import logging
import platform

import appdirs
from dogpile.cache import make_region


log = logging.getLogger()

# Python on Windows has no "fcntl", which is required by the dbm backend.
# TODO: Make backend configurable, e.g. better use Redis.
platform = platform.system()
backend = "dogpile.cache.dbm"
if platform == "Windows":
backend = "dogpile.cache.memory"

# Compute cache directory.
cache_dir = appdirs.user_cache_dir(appname="wetterdienst")
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
log.info("Cache directory is %s", cache_dir)

# Define cache regions.
metaindex_cache = make_region().configure(
backend,
expiration_time=1 * 60,
arguments={"filename": os.path.join(cache_dir, "metaindex_1min.dbm")},
)

fileindex_cache_five_minutes = make_region().configure(
backend,
expiration_time=5 * 60,
arguments={"filename": os.path.join(cache_dir, "fileindex_5min.dbm")},
)

fileindex_cache_one_hour = make_region().configure(
backend,
expiration_time=60 * 60,
arguments={"filename": os.path.join(cache_dir, "fileindex_60min.dbm")},
)

payload_cache_one_hour = make_region().configure(
backend,
expiration_time=60 * 60,
arguments={"filename": os.path.join(cache_dir, "payload_60min.dbm")},
)
6 changes: 2 additions & 4 deletions wetterdienst/data_collection.py
Expand Up @@ -125,7 +125,7 @@ def collect_climate_observations_data(

continue

log.info(f"Data for {request_string} will be collected from internet.")
log.info(f"Acquiring observations data for {request_string}")

remote_files = create_file_list_for_climate_observations(
[station_id], parameter, time_resolution, period_type
Expand Down Expand Up @@ -273,9 +273,7 @@ def collect_radolan_data(

continue
except FileNotFoundError:
log.info(
f"RADOLAN data for {str(date_time)} will be collected from internet"
)
log.info(f"Acquiring RADOLAN data for {str(date_time)}")

remote_radolan_file_path = create_filepath_for_radolan(
date_time, time_resolution
Expand Down
19 changes: 10 additions & 9 deletions wetterdienst/download/download.py
Expand Up @@ -9,10 +9,10 @@
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from functools import lru_cache

from requests.exceptions import InvalidURL

from wetterdienst.additionals.cache import payload_cache_one_hour
from wetterdienst.constants.access_credentials import DWDCDCBase
from wetterdienst.download.download_services import download_file_from_dwd
from wetterdienst.enumerations.datetime_format_enumeration import DatetimeFormat
Expand All @@ -32,16 +32,12 @@ def download_climate_observations_data_parallel(
"""

with ThreadPoolExecutor() as executor:
files_in_bytes = executor.map(
_download_climate_observations_data_parallel, remote_files
)
files_in_bytes = executor.map(_download_climate_observations_data, remote_files)

return list(zip(remote_files, files_in_bytes))


def _download_climate_observations_data_parallel(
remote_file: Union[str, Path]
) -> BytesIO:
def _download_climate_observations_data(remote_file: Union[str, Path]) -> BytesIO:
"""
This function downloads the station data for which the link is
provided by the 'select_dwd' function. It checks the shortened filepath (just
Expand All @@ -56,6 +52,11 @@ def _download_climate_observations_data_parallel(
stores data on local file system
"""
return BytesIO(__download_climate_observations_data(remote_file=remote_file))


@payload_cache_one_hour.cache_on_arguments()
def __download_climate_observations_data(remote_file: Union[str, Path]) -> bytes:
try:
zip_file = download_file_from_dwd(remote_file, DWDCDCBase.CLIMATE_OBSERVATIONS)
except InvalidURL as e:
Expand All @@ -74,7 +75,7 @@ def _download_climate_observations_data_parallel(
for file in archive_files:
# If found file load file in bytes, close zipfile and return bytes
if file.startswith(PRODUCT_FILE_IDENTIFIER):
file_in_bytes = BytesIO(zip_file_opened.open(file).read())
file_in_bytes = zip_file_opened.open(file).read()

zip_file_opened.close()

Expand Down Expand Up @@ -113,7 +114,7 @@ def download_radolan_data(
return _extract_radolan_data(date_time, archive_in_bytes)


@lru_cache(maxsize=750)
@payload_cache_one_hour.cache_on_arguments()
def _download_radolan_data(remote_radolan_filepath: str) -> BytesIO:
"""
Function (cached) that downloads the RADOLAN file
Expand Down
7 changes: 6 additions & 1 deletion wetterdienst/download/download_services.py
@@ -1,6 +1,7 @@
"""
**DWD download utilities**
"""
import logging
from io import BytesIO
from pathlib import PurePosixPath
from typing import Union
Expand All @@ -9,6 +10,8 @@
from wetterdienst.download.https_handling import create_dwd_session
from wetterdienst.file_path_handling.path_handling import build_dwd_cdc_data_path

logger = logging.getLogger(__name__)


def download_file_from_dwd(
filepath: Union[PurePosixPath, str], cdc_base: DWDCDCBase
Expand All @@ -24,7 +27,9 @@ def download_file_from_dwd(
"""
dwd_session = create_dwd_session()

r = dwd_session.get(build_dwd_cdc_data_path(filepath, cdc_base))
url = build_dwd_cdc_data_path(filepath, cdc_base)
logger.info(f"Downloading resource {url}")
r = dwd_session.get(url)
r.raise_for_status()

return BytesIO(r.content)

0 comments on commit eaca235

Please sign in to comment.