Skip to content

Commit

Permalink
Merge pull request earthobservations#57 from gutzbenj/add_collect_met…
Browse files Browse the repository at this point in the history
…hod_to_station_request

Add method to get data for a defined station request
  • Loading branch information
gutzbenj committed Jun 11, 2020
2 parents e5374dd + 1ef26c9 commit d13a904
Showing 1 changed file with 100 additions and 22 deletions.
122 changes: 100 additions & 22 deletions python_dwd/dwd_station_request.py
@@ -1,10 +1,9 @@
from pathlib import Path
from typing import List, Union, Optional, Dict, Generator
import pandas as pd
from pandas import Timestamp

from python_dwd.file_path_handling.file_list_creation import create_file_list_for_dwd_server
from python_dwd.download.download import download_dwd_data
from python_dwd.parsing_data.parse_data_from_files import parse_dwd_data
from python_dwd import collect_dwd_data
from python_dwd.additionals.time_handling import parse_date
from python_dwd.constants.parameter_mapping import PARAMETER_WORDLIST_MAPPING, TIMERESOLUTION_WORDLIST_MAPPING, \
PERIODTYPE_WORDLIST_MAPPING
Expand All @@ -23,7 +22,7 @@ class DWDStationRequest:
The DWDStationRequest class represents a request for station data as provided by the DWD service
"""
def __init__(self,
station_id: Union[str, int, List[Union[int, str]]],
station_ids: Union[str, int, List[Union[int, str]]],
parameter: Union[str, Parameter],
time_resolution: Union[str, TimeResolution],
period_type: Union[None, str, list, PeriodType] = None,
Expand All @@ -34,20 +33,38 @@ def __init__(self,
raise ValueError("Define either a 'time_resolution' or both the 'start_date' and 'end_date' and "
"leave the other one empty!")

if not all(isinstance(x, int) for x in station_id):
raise ValueError("List of station id's contains none integer values or is at least not given as a list")

self.station_id = [int(s) for s in cast_to_list(station_id)]

self.parameter = parameter if isinstance(parameter, Parameter) \
else _parse_parameter_from_value(parameter, PARAMETER_WORDLIST_MAPPING)

self.time_resolution = time_resolution if isinstance(time_resolution, TimeResolution) \
else _parse_parameter_from_value(time_resolution, TIMERESOLUTION_WORDLIST_MAPPING)

self.period_type = cast_to_list(period_type) if isinstance(period_type, (PeriodType, type(None))) \
else [_parse_parameter_from_value(period_type, PERIODTYPE_WORDLIST_MAPPING)
for period_type in cast_to_list(period_type)]
try:
self.station_ids = [int(station_id) for station_id in cast_to_list(station_ids)]
except ValueError:
raise ValueError("List of station id's can not be parsed to integers.")

try:
self.parameter = Parameter(parameter)
except ValueError:
self.parameter = _parse_parameter_from_value(
parameter, PARAMETER_WORDLIST_MAPPING)

try:
self.time_resolution = TimeResolution(time_resolution)
except ValueError:
self.time_resolution = _parse_parameter_from_value(
time_resolution, TIMERESOLUTION_WORDLIST_MAPPING)

self.period_type = []
for pt in cast_to_list(period_type):
if pt is None:
self.period_type.append(None)
continue

try:
self.period_type.append(PeriodType(pt))
except ValueError:
self.period_type.append(
_parse_parameter_from_value(period_type, PERIODTYPE_WORDLIST_MAPPING))

# Additional sorting required for self.period_type to ensure that for multiple
# periods the data is first sourced from historical
self.period_type = sorted(self.period_type)

self.start_date = parse_date(start_date)
self.end_date = parse_date(end_date)
Expand All @@ -57,7 +74,7 @@ def __init__(self,
self.period_type = [PeriodType.HISTORICAL, PeriodType.RECENT, PeriodType.NOW]

if not self.start_date <= self.end_date:
raise StartDateEndDateError
raise StartDateEndDateError("Error: 'start_date' must be smaller or equal to 'end_date'.")

for period_type in self.period_type.copy():
if not check_parameters(parameter=self.parameter,
Expand All @@ -70,24 +87,85 @@ def __init__(self,

# Use the clean up of self.period_type to identify if there's any data with those parameters
if not self.period_type:
raise ValueError("Error: no combination for parameter, time_resolution and period_type could be found.")
raise ValueError("No combination for parameter, time_resolution "
"and period_type could be found.")

def __eq__(self, other):
return [self.station_id,
return [self.station_ids,
self.parameter,
self.time_resolution,
self.period_type,
self.start_date,
self.end_date] == other

def __str__(self):
return ", ".join([f"station_ids {'& '.join([str(stat_id) for stat_id in self.station_id])}",
return ", ".join([f"station_ids {'& '.join([str(station_id) for station_id in self.station_ids])}",
self.parameter.value,
self.time_resolution.value,
"& ".join([period_type.value for period_type in self.period_type]),
self.start_date.value,
self.end_date.value])

def collect_data(self,
prefer_local: bool = False,
write_file: bool = False,
folder: Union[str, Path] = DWD_FOLDER_MAIN,
parallel_download: bool = False,
create_new_filelist: bool = False) -> Generator[pd.DataFrame, None, None]:
"""
Method to collect data for a defined request. The function is build as generator in
order to not cloak the memory thus if the user wants the data as one pandas DataFrame
the generator has to be casted to a DataFrame manually via
pd.concat(list(request.collect_data([...])).
Args:
prefer_local: definition if data should rather be taken from a local source
write_file: should data be written to a local file
folder: place where filelists (and station data) are stored
parallel_download: definition if data is downloaded in parallel
create_new_filelist: definition if the fileindex should be recreated
Returns:
via a generator per station a pandas.DataFrame
"""
for station_id in self.station_ids:
df_of_station_id = pd.DataFrame()

for period_type in self.period_type:
period_df = collect_dwd_data(
station_ids=[station_id],
parameter=self.parameter,
time_resolution=self.time_resolution,
period_type=period_type,
folder=folder,
prefer_local=prefer_local,
parallel_download=parallel_download,
write_file=write_file,
create_new_filelist=create_new_filelist,
)

# Filter out values which already are in the dataframe
try:
period_df = period_df[
~period_df[DWDColumns.DATE.value].isin(df_of_station_id[DWDColumns.DATE.value])]
except KeyError:
pass

df_of_station_id = df_of_station_id.append(period_df)

# Filter for dates range if start_date and end_date are defined
if self.start_date:
df_of_station_id = df_of_station_id[
(df_of_station_id[DWDColumns.DATE.value] >= self.start_date) &
(df_of_station_id[DWDColumns.DATE.value] <= self.end_date)
]

# Empty dataframe should be skipped
if df_of_station_id.empty:
continue

yield df_of_station_id


def _find_any_one_word_from_wordlist(string_list: List[str],
word_list: List[List[str]]) -> bool:
Expand Down

0 comments on commit d13a904

Please sign in to comment.