forked from earthobservations/wetterdienst
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Overhaul file index creation based on caching
- Loading branch information
Showing
11 changed files
with
178 additions
and
162 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
""" file index creation for available DWD station data """ | ||
from pathlib import PurePosixPath | ||
import re | ||
from functools import lru_cache | ||
import ftplib | ||
import pandas as pd | ||
|
||
from python_dwd.constants.access_credentials import DWD_PATH, DWD_SERVER | ||
from python_dwd.constants.metadata import ARCHIVE_FORMAT, STATID_REGEX | ||
from python_dwd.download.ftp_handling import FTP | ||
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns | ||
from python_dwd.enumerations.parameter_enumeration import Parameter | ||
from python_dwd.enumerations.period_type_enumeration import PeriodType | ||
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution | ||
|
||
|
||
@lru_cache(maxsize=None) | ||
def create_file_index_for_dwd_server(parameter: Parameter, | ||
time_resolution: TimeResolution, | ||
period_type: PeriodType) -> pd.DataFrame: | ||
""" | ||
Function (cached) to create a file index of the DWD station data. The file index | ||
is created for an individual set of parameters. | ||
Args: | ||
parameter: parameter of Parameter enumeration | ||
time_resolution: time resolution of TimeResolution enumeration | ||
period_type: period type of PeriodType enumeration | ||
Returns: | ||
file index in a pandas.DataFrame with sets of parameters and station id | ||
""" | ||
server_path = PurePosixPath(DWD_PATH) / time_resolution.value / \ | ||
parameter.value / period_type.value | ||
|
||
# todo: replace with global requests.Session creating the index | ||
try: | ||
with FTP(DWD_SERVER) as ftp: | ||
ftp.login() | ||
files_server = ftp.list_files( | ||
remote_path=str(server_path), also_subfolders=True) | ||
|
||
except ftplib.all_errors as e: | ||
raise e("Creating file index currently not possible.") | ||
|
||
files_server = pd.DataFrame( | ||
files_server, columns=[DWDMetaColumns.FILENAME.value], dtype='str') | ||
|
||
# Add parameters | ||
files_server[DWDMetaColumns.PARAMETER.value] = parameter.value | ||
files_server[DWDMetaColumns.TIME_RESOLUTION.value] = time_resolution.value | ||
files_server[DWDMetaColumns.PERIOD_TYPE.value] = period_type.value | ||
|
||
# Filter for .zip files | ||
files_server = files_server[files_server.FILENAME.str.endswith( | ||
ARCHIVE_FORMAT)] | ||
|
||
files_server.loc[:, DWDMetaColumns.FILENAME.value] = files_server.loc[:, DWDMetaColumns.FILENAME.value].\ | ||
str.replace(DWD_PATH + '/', '') | ||
|
||
file_names = files_server.loc[:, DWDMetaColumns.FILENAME.value].str.split("/").apply( | ||
lambda strings: strings[-1]) | ||
|
||
files_server.loc[:, DWDMetaColumns.STATION_ID.value] = file_names.apply( | ||
lambda x: re.findall(STATID_REGEX, x).pop(0)) | ||
|
||
files_server.loc[:, DWDMetaColumns.STATION_ID.value] = files_server.loc[:, DWDMetaColumns.STATION_ID.value].\ | ||
astype(int) | ||
|
||
files_server = files_server.sort_values( | ||
by=[DWDMetaColumns.STATION_ID.value, DWDMetaColumns.FILENAME.value]) | ||
|
||
selected_file_index_columns = [ | ||
DWDMetaColumns.PARAMETER.value, | ||
DWDMetaColumns.TIME_RESOLUTION.value, | ||
DWDMetaColumns.PERIOD_TYPE.value, | ||
DWDMetaColumns.STATION_ID.value, | ||
DWDMetaColumns.FILENAME.value | ||
] | ||
|
||
return files_server.loc[:, selected_file_index_columns] | ||
|
||
|
||
def reset_file_index_cache(): | ||
""" Function to reset the cached file index for all kinds of parameters """ | ||
create_file_index_for_dwd_server.cache_clear() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,75 +1,46 @@ | ||
""" file list creation for requested files """ | ||
from pathlib import Path | ||
from typing import List, Union | ||
from typing import Union | ||
import pandas as pd | ||
|
||
from python_dwd.additionals.functions import check_parameters | ||
from python_dwd.additionals.helpers import create_fileindex | ||
from python_dwd.constants.access_credentials import DWD_FOLDER_MAIN, DWD_FOLDER_METADATA | ||
from python_dwd.constants.metadata import FILELIST_NAME, DATA_FORMAT | ||
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns | ||
from python_dwd.enumerations.parameter_enumeration import Parameter | ||
from python_dwd.enumerations.period_type_enumeration import PeriodType | ||
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution | ||
from python_dwd.file_path_handling.file_index_creation import create_file_index_for_dwd_server, \ | ||
reset_file_index_cache | ||
|
||
|
||
def create_file_list_for_dwd_server(station_ids: List[int], | ||
def create_file_list_for_dwd_server(station_id: Union[str, int], | ||
parameter: Union[Parameter, str], | ||
time_resolution: Union[TimeResolution, str], | ||
period_type: Union[PeriodType, str], | ||
folder: str = DWD_FOLDER_MAIN, | ||
create_new_filelist=False) -> pd.DataFrame: | ||
create_new_file_index: bool = False) -> pd.DataFrame: | ||
""" | ||
Function for selecting datafiles (links to archives) for given | ||
station_ids, parameter, time_resolution and period_type under consideration of a | ||
created list of files that are | ||
available online. | ||
Args: | ||
station_ids: id(s) for the weather station to ask for data | ||
station_id: id for the weather station to ask for data | ||
parameter: observation measure | ||
time_resolution: frequency/granularity of measurement interval | ||
period_type: recent or historical files | ||
folder: | ||
create_new_filelist: boolean for checking existing file list or not | ||
create_new_file_index: set if new file index is created | ||
Returns: | ||
List of path's to file | ||
""" | ||
if create_new_file_index: | ||
reset_file_index_cache() | ||
|
||
parameter = Parameter(parameter) | ||
time_resolution = TimeResolution(time_resolution) | ||
period_type = PeriodType(period_type) | ||
|
||
# Check type of function parameters | ||
station_ids = [int(statid) for statid in station_ids] | ||
|
||
# Check for the combination of requested parameters | ||
check_parameters(parameter=parameter, | ||
time_resolution=time_resolution, | ||
period_type=period_type) | ||
|
||
# Create name of fileslistfile | ||
filelist_local = f'{FILELIST_NAME}_{parameter.value}_' \ | ||
f'{time_resolution.value}_{period_type.value}' | ||
|
||
# Create filepath to filelist in folder | ||
filelist_local_path = Path(folder, | ||
DWD_FOLDER_METADATA, | ||
filelist_local) | ||
|
||
filelist_local_path = f"{filelist_local_path}{DATA_FORMAT}" | ||
|
||
if create_new_filelist or not Path(filelist_local_path).is_file(): | ||
create_fileindex(parameter=parameter, | ||
time_resolution=time_resolution, | ||
period_type=period_type, | ||
folder=folder) | ||
file_index = create_file_index_for_dwd_server( | ||
parameter, time_resolution, period_type) | ||
|
||
filelist = pd.read_csv(filepath_or_buffer=filelist_local_path, | ||
sep=",", | ||
dtype={DWDMetaColumns.FILEID.value: int, | ||
DWDMetaColumns.STATION_ID.value: int, | ||
DWDMetaColumns.FILENAME.value: str}) | ||
file_index = file_index[ | ||
file_index[DWDMetaColumns.STATION_ID.value] == int(station_id) | ||
] | ||
|
||
return filelist.loc[filelist[DWDMetaColumns.STATION_ID.value].isin(station_ids), :] | ||
return file_index.loc[:, [DWDMetaColumns.FILENAME.value]] |
Oops, something went wrong.