Skip to content

Commit

Permalink
Use meaningful column names, starting with daily climate summary data
Browse files Browse the repository at this point in the history
From "column_name_mapping.py" and "column_names_enumeration.py",
we are seeing the intention to map meteorological short identifiers
to more meaningful English long names.

This complements the current implementation by providing appropriate
mappings for daily climate summary data (kl) and also adds an
appropriate test case for that.

While being on it, we discovered that "_parse_dwd_data" as well as
"collect_dwd_data" somehow wouldn't actually account for column names
to be propagated, so we adjusted some spots on data frame handling.

Trivia:
- The test case has been marked as "remote" to be able to tell unit
  tests based on fixtures and full integration tests apart.
- When massaging the data frame after parsing data from CSV,
  the "EOR" column gets dropped right away as it actually
  has no real value on downstream processing.
  • Loading branch information
amotl committed Jun 16, 2020
1 parent ae0d291 commit 2b1c70e
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 73 deletions.
4 changes: 4 additions & 0 deletions pytest.ini
@@ -0,0 +1,4 @@
[pytest]

markers =
remote: Tests accessing the internet.
26 changes: 13 additions & 13 deletions python_dwd/additionals/helpers.py
Expand Up @@ -14,7 +14,7 @@
from python_dwd.constants.metadata import METADATA_COLUMNS, METADATA_MATCHSTRINGS, FILELIST_NAME, FTP_METADATA_NAME, \
ARCHIVE_FORMAT, DATA_FORMAT, METADATA_FIXED_COLUMN_WIDTH, STATIONDATA_SEP, NA_STRING, TRIES_TO_DOWNLOAD_FILE, \
STATID_REGEX, METADATA_1MIN_GEO_PREFIX, METADATA_1MIN_PAR_PREFIX
from python_dwd.enumerations.column_names_enumeration import DWDColumns
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns
from python_dwd.download.download_services import create_remote_file_name
from python_dwd.download.ftp_handling import FTP
from python_dwd.enumerations.parameter_enumeration import Parameter
Expand Down Expand Up @@ -120,7 +120,7 @@ def metaindex_for_1minute_data(parameter: Parameter,

metaindex_df = metaindex_df.astype(METADATA_DTYPE_MAPPING)

return metaindex_df.sort_values(DWDColumns.STATION_ID.value).reset_index(drop=True)
return metaindex_df.sort_values(DWDMetaColumns.STATION_ID.value).reset_index(drop=True)


def download_metadata_file_for_1minute_data(metadatafile: str) -> BytesIO:
Expand Down Expand Up @@ -174,10 +174,10 @@ def combine_geo_and_par_file_to_metadata_df(metadata_file_and_statid: Tuple[Byte
metadata_par_df = metadata_par_df.rename(columns=str.upper).rename(columns=GERMAN_TO_ENGLISH_COLUMNS_MAPPING)

metadata_geo_df = metadata_geo_df.iloc[[-1], :]
metadata_par_df = metadata_par_df.loc[:, [DWDColumns.FROM_DATE.value, DWDColumns.TO_DATE.value]].dropna()
metadata_par_df = metadata_par_df.loc[:, [DWDMetaColumns.FROM_DATE.value, DWDMetaColumns.TO_DATE.value]].dropna()

metadata_geo_df[DWDColumns.FROM_DATE.value] = metadata_par_df[DWDColumns.FROM_DATE.value].min()
metadata_geo_df[DWDColumns.TO_DATE.value] = metadata_par_df[DWDColumns.TO_DATE.value].max()
metadata_geo_df[DWDMetaColumns.FROM_DATE.value] = metadata_par_df[DWDMetaColumns.FROM_DATE.value].min()
metadata_geo_df[DWDMetaColumns.TO_DATE.value] = metadata_par_df[DWDMetaColumns.TO_DATE.value].max()

return metadata_geo_df.reindex(columns=METADATA_COLUMNS)

Expand Down Expand Up @@ -236,27 +236,27 @@ def create_fileindex(parameter: Parameter,
f"{str(e)}")

files_server = pd.DataFrame(files_server,
columns=[DWDColumns.FILENAME.value],
columns=[DWDMetaColumns.FILENAME.value],
dtype='str')

files_server.loc[:, DWDColumns.FILENAME.value] = files_server.loc[:, DWDColumns.FILENAME.value].apply(
files_server.loc[:, DWDMetaColumns.FILENAME.value] = files_server.loc[:, DWDMetaColumns.FILENAME.value].apply(
lambda filename: filename.replace(DWD_PATH + '/', ''))

files_server = files_server[files_server.FILENAME.str.contains(
ARCHIVE_FORMAT)]

files_server.loc[:, DWDColumns.FILEID.value] = files_server.index
files_server.loc[:, DWDMetaColumns.FILEID.value] = files_server.index

file_names = files_server.iloc[:, 0].str.split("/").apply(
lambda string: string[-1])

files_server.loc[:, DWDColumns.STATION_ID.value] = file_names.apply(lambda x: re.findall(STATID_REGEX, x).pop(0))
files_server.loc[:, DWDMetaColumns.STATION_ID.value] = file_names.apply(lambda x: re.findall(STATID_REGEX, x).pop(0))

files_server = files_server.iloc[:, [1, 2, 0]]

files_server.iloc[:, 1] = files_server.iloc[:, 1].astype(int)

files_server = files_server.sort_values(by=[DWDColumns.STATION_ID.value])
files_server = files_server.sort_values(by=[DWDMetaColumns.STATION_ID.value])

remove_old_file(file_type=FILELIST_NAME,
parameter=parameter,
Expand Down Expand Up @@ -293,11 +293,11 @@ def create_stationdata_dtype_mapping(columns: List[str]) -> dict:
""" Possible columns: STATION_ID, DATETIME, EOR, QN_ and other, measured values like rainfall """

for column in columns:
if column == DWDColumns.STATION_ID.value:
if column == DWDMetaColumns.STATION_ID.value:
stationdata_dtype_mapping[column] = int
elif column == DWDColumns.DATE.value:
elif column == DWDMetaColumns.DATE.value:
stationdata_dtype_mapping[column] = "datetime64"
elif column == DWDColumns.EOR.value:
elif column == DWDMetaColumns.EOR.value:
stationdata_dtype_mapping[column] = str
else:
stationdata_dtype_mapping[column] = float
Expand Down
58 changes: 38 additions & 20 deletions python_dwd/constants/column_name_mapping.py
@@ -1,28 +1,46 @@
""" mapping from german column names to english column names"""
from numpy import datetime64
from python_dwd.enumerations.column_names_enumeration import DWDOrigColumns, DWDColumns
from python_dwd.enumerations.column_names_enumeration import DWDOrigColumns, DWDMetaColumns, DWDDataColumns

GERMAN_TO_ENGLISH_COLUMNS_MAPPING = {
DWDOrigColumns.STATION_ID.value: DWDColumns.STATION_ID.value,
DWDOrigColumns.DATE.value: DWDColumns.DATE.value,
DWDOrigColumns.FROM_DATE.value: DWDColumns.FROM_DATE.value,
DWDOrigColumns.TO_DATE.value: DWDColumns.TO_DATE.value,
DWDOrigColumns.STATIONHEIGHT.value: DWDColumns.STATIONHEIGHT.value,
DWDOrigColumns.LATITUDE.value: DWDColumns.LATITUDE.value,
DWDOrigColumns.LATITUDE_ALTERNATIVE.value: DWDColumns.LATITUDE.value,
DWDOrigColumns.LONGITUDE.value: DWDColumns.LONGITUDE.value,
DWDOrigColumns.LONGITUDE_ALTERNATIVE.value: DWDColumns.LONGITUDE.value,
DWDOrigColumns.STATIONNAME.value: DWDColumns.STATIONNAME.value,
DWDOrigColumns.STATE.value: DWDColumns.STATE.value
DWDOrigColumns.STATION_ID.value: DWDMetaColumns.STATION_ID.value,
DWDOrigColumns.DATE.value: DWDMetaColumns.DATE.value,
DWDOrigColumns.FROM_DATE.value: DWDMetaColumns.FROM_DATE.value,
DWDOrigColumns.TO_DATE.value: DWDMetaColumns.TO_DATE.value,
DWDOrigColumns.STATIONHEIGHT.value: DWDMetaColumns.STATIONHEIGHT.value,
DWDOrigColumns.LATITUDE.value: DWDMetaColumns.LATITUDE.value,
DWDOrigColumns.LATITUDE_ALTERNATIVE.value: DWDMetaColumns.LATITUDE.value,
DWDOrigColumns.LONGITUDE.value: DWDMetaColumns.LONGITUDE.value,
DWDOrigColumns.LONGITUDE_ALTERNATIVE.value: DWDMetaColumns.LONGITUDE.value,
DWDOrigColumns.STATIONNAME.value: DWDMetaColumns.STATIONNAME.value,
DWDOrigColumns.STATE.value: DWDMetaColumns.STATE.value,
}

GERMAN_TO_ENGLISH_COLUMNS_MAPPING_HUMANIZED = {
# Daily climate summary
DWDOrigColumns.FX.value: DWDDataColumns.FX.value,
DWDOrigColumns.FM.value: DWDDataColumns.FM.value,
DWDOrigColumns.RSK.value: DWDDataColumns.RSK.value,
DWDOrigColumns.RSKF.value: DWDDataColumns.RSKF.value,
DWDOrigColumns.SDK.value: DWDDataColumns.SDK.value,
DWDOrigColumns.SHK_TAG.value: DWDDataColumns.SHK_TAG.value,
DWDOrigColumns.NM.value: DWDDataColumns.NM.value,
DWDOrigColumns.VPM.value: DWDDataColumns.VPM.value,
DWDOrigColumns.PM.value: DWDDataColumns.PM.value,
DWDOrigColumns.TMK.value: DWDDataColumns.TMK.value,
DWDOrigColumns.UPM.value: DWDDataColumns.UPM.value,
DWDOrigColumns.TXK.value: DWDDataColumns.TXK.value,
DWDOrigColumns.TNK.value: DWDDataColumns.TNK.value,
DWDOrigColumns.TGK.value: DWDDataColumns.TGK.value,
}

METADATA_DTYPE_MAPPING = {
DWDColumns.STATION_ID.value: int,
DWDColumns.FROM_DATE.value: datetime64,
DWDColumns.TO_DATE.value: datetime64,
DWDColumns.STATIONHEIGHT.value: float,
DWDColumns.LATITUDE.value: float,
DWDColumns.LONGITUDE.value: float,
DWDColumns.STATIONNAME.value: str,
DWDColumns.STATE.value: str
DWDMetaColumns.STATION_ID.value: int,
DWDMetaColumns.FROM_DATE.value: datetime64,
DWDMetaColumns.TO_DATE.value: datetime64,
DWDMetaColumns.STATIONHEIGHT.value: float,
DWDMetaColumns.LATITUDE.value: float,
DWDMetaColumns.LONGITUDE.value: float,
DWDMetaColumns.STATIONNAME.value: str,
DWDMetaColumns.STATE.value: str
}
18 changes: 9 additions & 9 deletions python_dwd/constants/metadata.py
@@ -1,14 +1,14 @@
""" metdata constants """
from python_dwd.enumerations.column_names_enumeration import DWDColumns
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns

METADATA_COLUMNS = [DWDColumns.STATION_ID.value,
DWDColumns.FROM_DATE.value,
DWDColumns.TO_DATE.value,
DWDColumns.STATIONHEIGHT.value,
DWDColumns.LATITUDE.value,
DWDColumns.LONGITUDE.value,
DWDColumns.STATIONNAME.value,
DWDColumns.STATE.value]
METADATA_COLUMNS = [DWDMetaColumns.STATION_ID.value,
DWDMetaColumns.FROM_DATE.value,
DWDMetaColumns.TO_DATE.value,
DWDMetaColumns.STATIONHEIGHT.value,
DWDMetaColumns.LATITUDE.value,
DWDMetaColumns.LONGITUDE.value,
DWDMetaColumns.STATIONNAME.value,
DWDMetaColumns.STATE.value]

METADATA_MATCHSTRINGS = ['beschreibung', '.txt']
METADATA_1MIN_GEO_PREFIX = "Metadaten_Geographie_"
Expand Down
13 changes: 11 additions & 2 deletions python_dwd/data_collection.py
Expand Up @@ -4,6 +4,7 @@
from typing import List, Union
import pandas as pd

from python_dwd.constants.column_name_mapping import GERMAN_TO_ENGLISH_COLUMNS_MAPPING_HUMANIZED
from python_dwd.enumerations.parameter_enumeration import Parameter
from python_dwd.enumerations.period_type_enumeration import PeriodType
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution
Expand All @@ -24,7 +25,8 @@ def collect_dwd_data(station_ids: List[int],
prefer_local: bool = False,
parallel_download: bool = False,
write_file: bool = False,
create_new_filelist: bool = False) -> pd.DataFrame:
create_new_filelist: bool = False,
humanize_column_names: bool = False) -> pd.DataFrame:
"""
Function that organizes the complete pipeline of data collection, either
from the internet or from a local file. It therefor goes through every given
Expand All @@ -42,6 +44,7 @@ def collect_dwd_data(station_ids: List[int],
parallel_download: boolean if to use parallel download when downloading files
write_file: boolean if to write data to local storage
create_new_filelist: boolean if to create a new filelist for the data selection
humanize_column_names: boolean to yield column names better for human consumption
Returns:
a pandas DataFrame with all the data given by the station ids
Expand Down Expand Up @@ -84,4 +87,10 @@ def collect_dwd_data(station_ids: List[int],

data.append(station_data)

return pd.concat(data, axis=1, ignore_index=True)
data = pd.concat(data)

# Assign meaningful column names (humanized).
if humanize_column_names:
data = data.rename(columns=GERMAN_TO_ENGLISH_COLUMNS_MAPPING_HUMANIZED)

return data
4 changes: 2 additions & 2 deletions python_dwd/download/download.py
Expand Up @@ -11,14 +11,14 @@
from python_dwd.constants.metadata import STATIONDATA_MATCHSTRINGS
from python_dwd.download.download_services import create_remote_file_name
from python_dwd.additionals.functions import find_all_matchstrings_in_string
from python_dwd.enumerations.column_names_enumeration import DWDColumns
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns


def download_dwd_data(remote_files: pd.DataFrame,
parallel_download: bool = False) -> List[Tuple[str, BytesIO]]:
""" wrapper for _download_dwd_data to provide a multiprocessing feature"""

remote_files: List[str] = remote_files[DWDColumns.FILENAME.value].to_list()
remote_files: List[str] = remote_files[DWDMetaColumns.FILENAME.value].to_list()

if parallel_download:
return list(
Expand Down
16 changes: 10 additions & 6 deletions python_dwd/dwd_station_request.py
Expand Up @@ -15,7 +15,7 @@
from python_dwd.exceptions.start_date_end_date_exception import StartDateEndDateError

from python_dwd.constants.access_credentials import DWD_FOLDER_MAIN
from python_dwd.enumerations.column_names_enumeration import DWDColumns
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns

log = logging.getLogger(__name__)

Expand All @@ -30,7 +30,8 @@ def __init__(self,
time_resolution: Union[str, TimeResolution],
period_type: Union[None, str, list, PeriodType] = None,
start_date: Union[None, str, Timestamp] = None,
end_date: Union[None, str, Timestamp] = None) -> None:
end_date: Union[None, str, Timestamp] = None,
humanized_column_names: bool = False) -> None:

if not (period_type or (start_date and end_date)):
raise ValueError("Define either a 'time_resolution' or both the 'start_date' and 'end_date' and "
Expand Down Expand Up @@ -93,6 +94,8 @@ def __init__(self,
raise ValueError("No combination for parameter, time_resolution "
"and period_type could be found.")

self.humanized_column_names = humanized_column_names

def __eq__(self, other):
return [self.station_ids,
self.parameter,
Expand Down Expand Up @@ -145,12 +148,13 @@ def collect_data(self,
parallel_download=parallel_download,
write_file=write_file,
create_new_filelist=create_new_filelist,
humanize_column_names=self.humanized_column_names
)

# Filter out values which already are in the dataframe
try:
period_df = period_df[
~period_df[DWDColumns.DATE.value].isin(df_of_station_id[DWDColumns.DATE.value])]
~period_df[DWDMetaColumns.DATE.value].isin(df_of_station_id[DWDMetaColumns.DATE.value])]
except KeyError:
pass

Expand All @@ -159,9 +163,9 @@ def collect_data(self,
# Filter for dates range if start_date and end_date are defined
if self.start_date:
df_of_station_id = df_of_station_id[
(df_of_station_id[DWDColumns.DATE.value] >= self.start_date) &
(df_of_station_id[DWDColumns.DATE.value] <= self.end_date)
]
(df_of_station_id[DWDMetaColumns.DATE.value] >= self.start_date) &
(df_of_station_id[DWDMetaColumns.DATE.value] <= self.end_date)
]

# Empty dataframe should be skipped
if df_of_station_id.empty:
Expand Down
40 changes: 38 additions & 2 deletions python_dwd/enumerations/column_names_enumeration.py
Expand Up @@ -24,9 +24,25 @@ class DWDOrigColumns(Enum):
STATIONNAME = "STATIONSNAME"
STATE = "BUNDESLAND"

# Daily climate summary
FX = "FX"
FM = "FM"
RSK = "RSK"
RSKF = "RSKF"
SDK = "SDK"
SHK_TAG = "SHK_TAG"
NM = "NM"
VPM = "VPM"
PM = "PM"
TMK = "TMK"
UPM = "UPM"
TXK = "TXK"
TNK = "TNK"
TGK = "TGK"

class DWDColumns(Enum):
""" Overhauled column names for the library """

class DWDMetaColumns(Enum):
""" Overhauled column names for metadata fields """
STATION_ID = "STATION_ID"
DATE = "DATE"
FROM_DATE = "FROM_DATE"
Expand All @@ -41,3 +57,23 @@ class DWDColumns(Enum):
FILENAME = "FILENAME"
HAS_FILE = "HAS_FILE"
FILEID = "FILEID"


class DWDDataColumns(Enum):
""" Overhauled column names for data fields """

# Daily climate summary
FX = "WIND_GUST_MAX"
FM = "WIND_VELOCITY"
RSK = "PRECIPITATION_HEIGHT"
RSKF = "PRECIPITATION_FORM"
SDK = "SUNSHINE_DURATION"
SHK_TAG = "SNOW_DEPTH"
NM = "CLOUD_COVER"
VPM = "VAPOR_PRESSURE"
PM = "PRESSURE"
TMK = "TEMPERATURE"
UPM = "HUMIDITY"
TXK = "TEMPERATURE_MAX_200"
TNK = "TEMPERATURE_MIN_200"
TGK = "TEMPERATURE_MIN_005"
10 changes: 5 additions & 5 deletions python_dwd/file_path_handling/file_list_creation.py
Expand Up @@ -7,7 +7,7 @@
from python_dwd.additionals.helpers import create_fileindex
from python_dwd.constants.access_credentials import DWD_FOLDER_MAIN, DWD_FOLDER_METADATA
from python_dwd.constants.metadata import FILELIST_NAME, DATA_FORMAT
from python_dwd.enumerations.column_names_enumeration import DWDColumns
from python_dwd.enumerations.column_names_enumeration import DWDMetaColumns
from python_dwd.enumerations.parameter_enumeration import Parameter
from python_dwd.enumerations.period_type_enumeration import PeriodType
from python_dwd.enumerations.time_resolution_enumeration import TimeResolution
Expand Down Expand Up @@ -68,8 +68,8 @@ def create_file_list_for_dwd_server(station_ids: List[int],

filelist = pd.read_csv(filepath_or_buffer=filelist_local_path,
sep=",",
dtype={DWDColumns.FILEID.value: int,
DWDColumns.STATION_ID.value: int,
DWDColumns.FILENAME.value: str})
dtype={DWDMetaColumns.FILEID.value: int,
DWDMetaColumns.STATION_ID.value: int,
DWDMetaColumns.FILENAME.value: str})

return filelist.loc[filelist[DWDColumns.STATION_ID.value].isin(station_ids), :]
return filelist.loc[filelist[DWDMetaColumns.STATION_ID.value].isin(station_ids), :]

0 comments on commit 2b1c70e

Please sign in to comment.