Skip to content

Commit

Permalink
Overhaul file index creation based on caching
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Jun 21, 2020
1 parent adc824d commit 95aae9a
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 162 deletions.
6 changes: 4 additions & 2 deletions README.md
Expand Up @@ -34,7 +34,7 @@ Third those variables are also available in different tenses, which are:
The toolset provides different functions which are:

- metadata_for_dwd_data
- create_file_list_for_dwd_server
- create_file_list_for_dwd_server (+ reset_file_index_cache)
- download_dwd_data
- parse_dwd_data
- get_nearest_station
Expand All @@ -46,7 +46,9 @@ All those functions have one same argument which is **folder**. It can be used t

**metadata_for_dwd_data** is used to discover what data for a set of parameters (**var**, **res**, **per**) is available, specificly which stations can be found for the requested variable, resolution and period. Also it can be defined by **write_file**, if the resulting **DataFrame** should be written as **csv** to the given folder. **write_file** is a boolean value. Furthermore with **create_new_filelist**, by default set to **False**, the function can be forced to retrieve a new list of files from the ftp server, which is usually avoided if there's already a file existing in the explicit folder.

**create_file_list_for_dwd_server** is used with the help of the **information of the metadata** to retrieve filelinks to files that represent a **set of parameters** in combination with the requested **statid**. Here also **create_new_filelist** can be set to **True**, if the user is sure that the **file to a certain statid** is available but somehow the old **filelist** doesn't contain a corresponding information.
**create_file_list_for_dwd_server** is used with the help of the **information of the metadata** to retrieve filelinks to files that represent a **set of parameters** in combination with the requested **statid**. Here also **create_new_file_index** can be set to **True**, if the user is sure that the **file to a certain station id** is available but somehow the old **file index** doesn't contain a corresponding information.

**reset_file_index_cache** can be used to remove old file index in order to get the latest update of files in the remote server

**download_dwd_data** is used with the created filelinks of select_dwd to **download and store** the data in the given folder. Therefor it connects with the ftp and writes the corresponding file to the harddisk as defined. Furthermore it returns the local filelink or to be clear the link where the file is saved on the harddrive.

Expand Down
3 changes: 3 additions & 0 deletions python_dwd/__init__.py
@@ -1,6 +1,9 @@
from python_dwd.version import __version__

from python_dwd.metadata_dwd import metadata_for_dwd_data
from python_dwd.file_path_handling.file_list_creation import \
create_file_list_for_dwd_server
from python_dwd.file_path_handling.file_index_creation import reset_file_index_cache
from python_dwd.download.download import download_dwd_data
from python_dwd.parsing_data.parse_data_from_files import parse_dwd_data
from python_dwd.additionals.geo_location import get_nearest_station
Expand Down
25 changes: 12 additions & 13 deletions python_dwd/data_collection.py
@@ -1,14 +1,15 @@
""" Data collection pipeline """
import logging
from pathlib import Path
from typing import List, Union, Optional
from typing import List, Union
import pandas as pd

from python_dwd.constants.column_name_mapping import GERMAN_TO_ENGLISH_COLUMNS_MAPPING_HUMANIZED
from python_dwd.enumerations.parameter_enumeration import Parameter
from python_dwd.enumerations.period_type_enumeration import PeriodType
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution
from python_dwd.constants.access_credentials import DWD_FOLDER_MAIN
from python_dwd.file_path_handling.file_index_creation import reset_file_index_cache
from python_dwd.file_path_handling.file_list_creation import create_file_list_for_dwd_server
from python_dwd.download.download import download_dwd_data
from python_dwd.parsing_data.parse_data_from_files import parse_dwd_data
Expand All @@ -25,16 +26,14 @@ def collect_dwd_data(station_ids: List[int],
prefer_local: bool = False,
parallel_download: bool = False,
write_file: bool = False,
create_new_filelist: bool = False,
humanize_column_names: bool = False,
run_download_only: bool = False) -> Optional[pd.DataFrame]:
create_new_file_index: bool = False,
humanize_column_names: bool = False) -> pd.DataFrame:
"""
Function that organizes the complete pipeline of data collection, either
from the internet or from a local file. It therefor goes through every given
station id and, given by the parameters, either tries to get data from local
store and/or if fails tries to get data from the internet. Finally if wanted
it will try to store the data in a hdf file.
Args:
station_ids: station ids that are trying to be loaded
parameter: parameter as enumeration
Expand All @@ -44,17 +43,20 @@ def collect_dwd_data(station_ids: List[int],
prefer_local: boolean for if local data should be preferred
parallel_download: boolean if to use parallel download when downloading files
write_file: boolean if to write data to local storage
create_new_filelist: boolean if to create a new filelist for the data selection
create_new_file_index: boolean if to create a new file index for the data selection
humanize_column_names: boolean to yield column names better for human consumption
run_download_only: boolean to run only the download and storing process
Returns:
a pandas DataFrame with all the data given by the station ids
"""
if create_new_file_index:
reset_file_index_cache()

parameter = Parameter(parameter)
time_resolution = TimeResolution(time_resolution)
period_type = PeriodType(period_type)

# todo check parameters and if combination not existing, print something and return empty DataFrame

# List for collected pandas DataFrames per each station id
data = []
for station_id in set(station_ids):
Expand All @@ -77,7 +79,7 @@ def collect_dwd_data(station_ids: List[int],
log.info(f"Data for {request_string} will be collected from internet.")

remote_files = create_file_list_for_dwd_server(
[station_id], parameter, time_resolution, period_type, folder, create_new_filelist)
station_id, parameter, time_resolution, period_type)

filenames_and_files = download_dwd_data(remote_files, parallel_download)

Expand All @@ -88,10 +90,7 @@ def collect_dwd_data(station_ids: List[int],
station_data, station_id, parameter, time_resolution, period_type, folder)

data.append(station_data)

if run_download_only:
return None


data = pd.concat(data)

# Assign meaningful column names (humanized).
Expand Down
13 changes: 8 additions & 5 deletions python_dwd/dwd_station_request.py
Expand Up @@ -13,9 +13,9 @@
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution
from python_dwd.additionals.functions import check_parameters, cast_to_list
from python_dwd.exceptions.start_date_end_date_exception import StartDateEndDateError

from python_dwd.constants.access_credentials import DWD_FOLDER_MAIN
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns
from python_dwd.file_path_handling.file_index_creation import reset_file_index_cache

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,7 +117,7 @@ def collect_data(self,
write_file: bool = False,
folder: Union[str, Path] = DWD_FOLDER_MAIN,
parallel_download: bool = False,
create_new_filelist: bool = False) -> Generator[pd.DataFrame, None, None]:
create_new_file_index: bool = False) -> Generator[pd.DataFrame, None, None]:
"""
Method to collect data for a defined request. The function is build as generator in
order to not cloak the memory thus if the user wants the data as one pandas DataFrame
Expand All @@ -127,13 +127,16 @@ def collect_data(self,
Args:
prefer_local: definition if data should rather be taken from a local source
write_file: should data be written to a local file
folder: place where filelists (and station data) are stored
folder: place where file lists (and station data) are stored
parallel_download: definition if data is downloaded in parallel
create_new_filelist: definition if the fileindex should be recreated
create_new_file_index: definition if the file index should be recreated
Returns:
via a generator per station a pandas.DataFrame
"""
if create_new_file_index:
reset_file_index_cache()

for station_id in self.station_ids:
df_of_station_id = pd.DataFrame()

Expand All @@ -147,7 +150,7 @@ def collect_data(self,
prefer_local=prefer_local,
parallel_download=parallel_download,
write_file=write_file,
create_new_filelist=create_new_filelist,
create_new_file_index=False,
humanize_column_names=self.humanize_column_names
)

Expand Down
84 changes: 84 additions & 0 deletions python_dwd/file_path_handling/file_index_creation.py
@@ -0,0 +1,84 @@
""" file index creation for available DWD station data """
from pathlib import PurePosixPath
import re
from functools import lru_cache
import ftplib
import pandas as pd

from python_dwd.constants.access_credentials import DWD_PATH, DWD_SERVER
from python_dwd.constants.metadata import ARCHIVE_FORMAT, STATID_REGEX
from python_dwd.download.ftp_handling import FTP
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns
from python_dwd.enumerations.parameter_enumeration import Parameter
from python_dwd.enumerations.period_type_enumeration import PeriodType
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution


@lru_cache(maxsize=None)
def create_file_index_for_dwd_server(parameter: Parameter,
time_resolution: TimeResolution,
period_type: PeriodType) -> pd.DataFrame:
"""
Function (cached) to create a file index of the DWD station data. The file index
is created for an individual set of parameters.
Args:
parameter: parameter of Parameter enumeration
time_resolution: time resolution of TimeResolution enumeration
period_type: period type of PeriodType enumeration
Returns:
file index in a pandas.DataFrame with sets of parameters and station id
"""
server_path = PurePosixPath(DWD_PATH) / time_resolution.value / \
parameter.value / period_type.value

# todo: replace with global requests.Session creating the index
try:
with FTP(DWD_SERVER) as ftp:
ftp.login()
files_server = ftp.list_files(
remote_path=str(server_path), also_subfolders=True)

except ftplib.all_errors as e:
raise e("Creating file index currently not possible.")

files_server = pd.DataFrame(
files_server, columns=[DWDMetaColumns.FILENAME.value], dtype='str')

# Add parameters
files_server[DWDMetaColumns.PARAMETER.value] = parameter.value
files_server[DWDMetaColumns.TIME_RESOLUTION.value] = time_resolution.value
files_server[DWDMetaColumns.PERIOD_TYPE.value] = period_type.value

# Filter for .zip files
files_server = files_server[files_server.FILENAME.str.endswith(
ARCHIVE_FORMAT)]

files_server.loc[:, DWDMetaColumns.FILENAME.value] = files_server.loc[:, DWDMetaColumns.FILENAME.value].\
str.replace(DWD_PATH + '/', '')

file_names = files_server.loc[:, DWDMetaColumns.FILENAME.value].str.split("/").apply(
lambda strings: strings[-1])

files_server.loc[:, DWDMetaColumns.STATION_ID.value] = file_names.apply(
lambda x: re.findall(STATID_REGEX, x).pop(0))

files_server.loc[:, DWDMetaColumns.STATION_ID.value] = files_server.loc[:, DWDMetaColumns.STATION_ID.value].\
astype(int)

files_server = files_server.sort_values(
by=[DWDMetaColumns.STATION_ID.value, DWDMetaColumns.FILENAME.value])

selected_file_index_columns = [
DWDMetaColumns.PARAMETER.value,
DWDMetaColumns.TIME_RESOLUTION.value,
DWDMetaColumns.PERIOD_TYPE.value,
DWDMetaColumns.STATION_ID.value,
DWDMetaColumns.FILENAME.value
]

return files_server.loc[:, selected_file_index_columns]


def reset_file_index_cache():
""" Function to reset the cached file index for all kinds of parameters """
create_file_index_for_dwd_server.cache_clear()
61 changes: 16 additions & 45 deletions python_dwd/file_path_handling/file_list_creation.py
@@ -1,75 +1,46 @@
""" file list creation for requested files """
from pathlib import Path
from typing import List, Union
from typing import Union
import pandas as pd

from python_dwd.additionals.functions import check_parameters
from python_dwd.additionals.helpers import create_fileindex
from python_dwd.constants.access_credentials import DWD_FOLDER_MAIN, DWD_FOLDER_METADATA
from python_dwd.constants.metadata import FILELIST_NAME, DATA_FORMAT
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns
from python_dwd.enumerations.parameter_enumeration import Parameter
from python_dwd.enumerations.period_type_enumeration import PeriodType
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution
from python_dwd.file_path_handling.file_index_creation import create_file_index_for_dwd_server, \
reset_file_index_cache


def create_file_list_for_dwd_server(station_ids: List[int],
def create_file_list_for_dwd_server(station_id: Union[str, int],
parameter: Union[Parameter, str],
time_resolution: Union[TimeResolution, str],
period_type: Union[PeriodType, str],
folder: str = DWD_FOLDER_MAIN,
create_new_filelist=False) -> pd.DataFrame:
create_new_file_index: bool = False) -> pd.DataFrame:
"""
Function for selecting datafiles (links to archives) for given
station_ids, parameter, time_resolution and period_type under consideration of a
created list of files that are
available online.
Args:
station_ids: id(s) for the weather station to ask for data
station_id: id for the weather station to ask for data
parameter: observation measure
time_resolution: frequency/granularity of measurement interval
period_type: recent or historical files
folder:
create_new_filelist: boolean for checking existing file list or not
create_new_file_index: set if new file index is created
Returns:
List of path's to file
"""
if create_new_file_index:
reset_file_index_cache()

parameter = Parameter(parameter)
time_resolution = TimeResolution(time_resolution)
period_type = PeriodType(period_type)

# Check type of function parameters
station_ids = [int(statid) for statid in station_ids]

# Check for the combination of requested parameters
check_parameters(parameter=parameter,
time_resolution=time_resolution,
period_type=period_type)

# Create name of fileslistfile
filelist_local = f'{FILELIST_NAME}_{parameter.value}_' \
f'{time_resolution.value}_{period_type.value}'

# Create filepath to filelist in folder
filelist_local_path = Path(folder,
DWD_FOLDER_METADATA,
filelist_local)

filelist_local_path = f"{filelist_local_path}{DATA_FORMAT}"

if create_new_filelist or not Path(filelist_local_path).is_file():
create_fileindex(parameter=parameter,
time_resolution=time_resolution,
period_type=period_type,
folder=folder)
file_index = create_file_index_for_dwd_server(
parameter, time_resolution, period_type)

filelist = pd.read_csv(filepath_or_buffer=filelist_local_path,
sep=",",
dtype={DWDMetaColumns.FILEID.value: int,
DWDMetaColumns.STATION_ID.value: int,
DWDMetaColumns.FILENAME.value: str})
file_index = file_index[
file_index[DWDMetaColumns.STATION_ID.value] == int(station_id)
]

return filelist.loc[filelist[DWDMetaColumns.STATION_ID.value].isin(station_ids), :]
return file_index.loc[:, [DWDMetaColumns.FILENAME.value]]

0 comments on commit 95aae9a

Please sign in to comment.