 # Download and transformation of German temperature data (1990-2021).

## On this notebook
This notebook describes the download and transformation of historical weather data from Germany. The source of the data is the Climate Data Center (CDC) of Deutscher Wetterdienst (DWD, engl. German Meteorological Service). For more information about this data source, see [this README about the CDC-OpenData area](https://opendata.dwd.de/climate_environment/CDC/Readme_intro_CDC_ftp.pdf).

From several available data sets, historical data with a 10 minute frequency have been choosen. Data start from 1990-01-01 and last until 2021-12-31. However, the coverage of this date range varies strongly between the weather stations contained in the data set.

The download (executed at 2023-01-17) includes 1611 single files, each file containing a time series with 10 minutes frequency for at least one weather station. The data are stored in "long format", resulting in > 500 million rows. As my interest was the temperature values, I converted the "long format" to "wide format", discarding all numerical values except the temperature data and setting the weather stations as columns. The result file contains a time series with temperature data from 513 weather stations. The script can be used to extract the other numerical values (available values see documentation below) with the same sequence of transforming steps.

The transformation faces several problems due to the large number of data points:
* On Google colab, the memory consumption (currently 12 GB for free users) is exceeded.
* On Kaggle, the disk space in the working directory (for free users) is exceeded.
* On my local machine (with 16 GB RAM), the process is running properly, but needs several hours to complete.

The transformation is performed in the following steps to avoid exceeding the memory limit on my machine:
* Download of the original files.
* Read the files, extract the numerical column and write a bunch of intermediate bigger files.
* Finally merge this bigger file into one big file.
* To drop duplicates, isolate the data for each weather station.
* Unstack the weather stations from long to wide format.
* Final cleanup and export:
    * replace the original spaceholders for missing values (-999.0) by nan values
    * sort the columns by ascending weather station id
    * fill the gaps in the time series, making it possible to create a time series with 10 minutes frequency

It is recommended to execute the cells step by step and monitor the process. **The script is not optimized or tested for automation.**

I might add a version of the script that works without writing temporary files on Kaggle, because the bigger memory limit on Kaggle (currently 30 GB) might allow the extraction without writing temporary files.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import missingno as msno
import bs4
import requests
import glob
import shutil

# Set some paths
As the processing of the data requires quite some amount of memory, it will be separated in several steps of writing temporary output files. In the following cell, some paths are defined for this process:
* **download_path**: directory for the original download files
* **extraction_path**: directory for the extracted downloaded files, in case you want to extract before parsing in
* **tmp_path**: directory for writing intermediate files
* **result_path**: directory for the result file

**Caution**: Make sure these directories exist before executing the cells that make use of the directories.

In [None]:
# we use the current directory for writing the downloaded files
download_path   = "download/"
extraction_path = "extracted/"
tmp_path        = "tmp/"
result_path     = "results/"

# Download of the original files
In case you want to download the files using this notebook, you can use the following two cells.

In [None]:
def download_files(url, output_path):
    r = requests.get(url)
    data = bs4.BeautifulSoup(r.text, "html.parser")
    for l in data.find_all("a"):
        filename = l['href']
        if filename != "../":
            print(filename)
            r = requests.get(url + filename)
            open(output_path + filename, 'wb').write(r.content)

In [None]:
# we download the historical data
url = "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/10_minutes/air_temperature/historical/"

# execute the downloads
download_files(url, download_path)

# Read in the original files
* read the original files
* extract the relevant information:
    * measurement date
    * weather station id
    * one of the numerical values

The following numerical values are available:
* **QN**: quality level of next columns coding see paragraph "Quality information"
* **PP_10**: pressure at station height hPa
* **TT_10**: air temperature at 2m height °C
* **TM5_10**: air temperature at 5cm height °C
* **RF_10**: relative humidity at 2m height %
* **TD_10** dew point temperature at 2m height °C

In the following example code, we extract a matrix with the measurement date time series as index and the weather station ids as columns. We use the "air temperature at 2m height °C" (**TT_10**) as the numerical value.

In [None]:
# list of downloaded original data files
filenames = glob.glob(extraction_path + "*.txt")
number_of_files = len(filenames)

# columns of interest in the original data files
target_index_col = 'MESS_DATUM'
target_header_col = 'STATIONS_ID'

### Set the target numeric value for the rest of the notebook
In case you want to extract the same file format for other information, you can change target value here (possible target values see description above).

In [None]:
target_numeric_col = 'TT_10'
usecols = [target_index_col, target_header_col, target_numeric_col]

### Remarks on the data types used for import
* The measurement date (MESS_DATUM) will be imported as string. The reason is that the data contain some duplicates (based on the combination MESS_DATUM/STATIONS_ID, and the pandas built in functions for duplicate detection (`duplicated()` or `drop_duplicates()`) do not work properly with the datetime values.
* Although it would be desirable to use `float16` as import type for the numerical data (to save memory), this does not work properly, as the imported data contain rounding errors when using `float16`. Using just `float` results in `float64` values, which consume quite a lot of memory. However, there seems no way around this.


In [None]:
# data types for import
dtypes_dict = {'MESS_DATUM' : 'str',
               'STATIONS_ID' : 'int16',
               'QN' : 'int8',
               'PP_10' : 'float',
               'TT_10': 'float',
               'TM5_10' : 'float',
               'RF_10' : 'float',
               'TD_10' : 'float'}

In [None]:
# Set the number of original files for preprocessing the original data in one step and saving to intermediate files.
# Lower the stepsize to reduce memory consumption if necessary.
stepsize = 20

# create the bundles with the configured stepsize
start_list = list(range(0, (number_of_files - stepsize), stepsize))
stop_list = list(range(stepsize, number_of_files, stepsize))
start_stop_list = list(zip(start_list, stop_list))

# add the "rest"
start_stop_list.append((stop_list[-1], number_of_files))
print(start_stop_list)

In [None]:
count = 0
for start, stop in start_stop_list:
    count += 1

    if count % 10 == 0:
        print("Reading original files " + str(start) + " to " + str(stop))

    dfs = []
    for i in range(start, stop):
        # several options to optimize memory behaviour:
        # - import only the cols needed
        # - put MESS_DATUM and STATIONS_ID in the index
        #   (if you have enough memory, leaving this out will be faster)
        # - use minimal datatypes
        df = pd.read_csv(filenames[i],
                         usecols=usecols,
                         index_col=[target_index_col, target_header_col],
                         dtype=dtypes_dict,
                         sep=";")

        if df.shape[0] > 0:
            dfs.append(df)

    df = pd.concat(dfs, ignore_index=False)
    df.to_csv(tmp_path + "tmp_" + str(start+1) + "_" + str(stop) + ".csv", index=True)

print("Writing of intermediate files finished.")

# Convert temporary files to one file per weather station.
The data for one weather station (STATIONS_ID) can be delivered in several of the original files (for different measurement time periods).

We therefore now merge the intermediate files to one file and then extract one file per weather station (STATIONS_ID). This is now less memory consuming, because we have reduced the data to only one of the contained numerical values.

In [None]:
def combine_files(tmp_files, tmp_all):
    with open(tmp_all, 'wb') as outfile:
        for i, filename in enumerate(tmp_files):
            print(filename)
            if filename == tmp_all:
                continue
            with open(filename, 'rb') as readfile:
                if i != 0:
                    readfile.readline()
                shutil.copyfileobj(readfile, outfile)

In [None]:
# We merge all outputfiles into one.
del(df)
tmp_all = tmp_path + 'tmp_all.csv'
tmp_files = glob.glob(tmp_path + '*.{}'.format('csv'))
combine_files(tmp_files, tmp_all)

# Drop duplicates per weather station
Duplicates only occur for single weather stations, because a duplicate is a row with identical values in the columns MESS_DATUM and STATIONS_ID.

Duplicate search over the complete dataframe is very inefficient. So we search and delete duplicates per weather station.

The duplicate free data are stored in intermediate files "station_<STATIONS_ID>.csv" in the tmp directory.

In [None]:
# we repeat this in case we resume excecution
tmp_all = tmp_path + 'tmp_all.csv'

df = pd.read_csv(tmp_all,
    index_col='STATIONS_ID',
    dtype=dtypes_dict,
    sep=",")

In [None]:
df.info()

In [None]:
count_deleted_duplicates = 0
for station in df.index.unique():
    # select and count before cleaning
    tmp = df.loc[station, :].copy()
    before = len(tmp)

    # execute removal of duplicates
    tmp.reset_index(inplace=True)
    tmp.drop_duplicates(subset=['MESS_DATUM', 'STATIONS_ID'], keep='first', inplace=True)

    # do the statistics
    after = len(tmp)
    deleted = before - after
    count_deleted_duplicates += deleted

    # write result
    print("Cleaning STATIONS_ID " + str(station) + ": " + str(deleted) + " deleted duplicates.")
    tmp.to_csv(tmp_path + "station_" + str(station) + ".csv", index=False)

print("Deleted duplicates: " + str(count_deleted_duplicates))

In [None]:
# In the above output, we recognize that the weather station 3023
# has extraordinarily many duplicates. Let's check this.

tmp = df.loc[3023, :].copy()
tmp.reset_index(inplace=True)
duplicates = tmp[tmp.duplicated(subset=['MESS_DATUM', 'STATIONS_ID'], keep=False)].copy()
duplicates.sort_values(by=['MESS_DATUM', 'STATIONS_ID'], inplace=True)
duplicates.head(100)

In [None]:
# We see indeed a lot of correctly classified duplicates.
# Let's check another example.

tmp = df.loc[3348, :].copy()
tmp.reset_index(inplace=True)
duplicates = tmp[tmp.duplicated(subset=['MESS_DATUM', 'STATIONS_ID'], keep=False)].copy()
duplicates.sort_values(by=['MESS_DATUM', 'STATIONS_ID'], inplace=True)
duplicates.head(100)

In [None]:
# Ok, we can be quite confident that the duplicate detection worked well.
# So let's clean up.
del(df)
del(tmp)
del(duplicates)

# Unstacking the STATIONS_ID
We use `int64` as type for the MESS_DATUM column here, because the unstacking runs much faster with this type. 

In [None]:
unstacking_dtypes_dict = {'MESS_DATUM': 'int64', 'STATIONS_ID': 'short', target_numeric_col: 'float'}
station_filenames = glob.glob(tmp_path + "station_*.csv")
count_rows = 0
dfs = []
for station_file in station_filenames:
    print(station_file)
    df = pd.read_csv(station_file,
                     index_col=['MESS_DATUM', 'STATIONS_ID'],
                     dtype=unstacking_dtypes_dict,
                     sep=",")

    # We count the rows of the imported data to compare the number
    # to some output from above, just to double check that we lost no data
    count_rows += len(df)
    df = df.unstack()
    dfs.append(df)

In [None]:
# From the output a view cells above, we can see that we had originally 509311968 entries,
# from which we deleted 2238399 duplicates. So we expect to see a total number of
# 509311968 - 2238399 = 507073569 rows in the station_*.csv files.

# We also keep this number in mind for later comparison with the concatenated unstacked version.
count_rows

In [None]:
df = pd.concat(dfs, axis=1)
del(dfs)

# Prepare data for the final export

In [None]:
# Dropping metalevel of index resulting from unstacking.
df = df.droplevel(0, axis=1)

In [None]:
df.info()

In [None]:
# So let's compare ...
df.count().sum()

In [None]:
# We left this step for now to be able to compare the number of
# transformed entries with the original numbers.
df.replace(-999.0, np.nan, inplace=True)

In [None]:
# Let's remember the number of values without the -999.0 data points
df.count().sum()

In [None]:
# convert columns in int type and sort them ascending from left to right
columns = []
for col in df.columns:
    columns.append(int(col))

df.columns = columns
df = df[sorted(df.columns)]

In [None]:
# convert index to datetime index and sort it
df.index = pd.to_datetime(df.index, format='%Y%m%d%H%M')
df.sort_index(inplace=True)

In [None]:
# fill the gaps in the time series
complete_range = pd.date_range(df.index.min(), df.index.max(), freq="10min")
complete_df = pd.DataFrame(index=complete_range)
print("Length of df       range: " + str(len(df.index)))
print("Length of complete range: " + str(len(complete_df.index)))

# Merge and check if the range has the correct length afterwards.
complete_df = complete_df.join(df)
complete_df.index.name = "MESS_DATUM"
print("Length of merged   range: " + str(len(complete_df.index)))

In [None]:
# Are the data still unchanged?
df.count().sum()

In [None]:
result_file = result_path + "/" + target_numeric_col + ".csv"
df.to_csv(result_file, index=True)

In [None]:
del(df)
del(complete_df)
del(complete_range)

# Let's check the extracted data

In [None]:
# we convert to datetime in a separate step, because we can provide the format, which is more efficient
df = pd.read_csv(result_file, index_col="MESS_DATUM")
df.index = pd.to_datetime(df.index, format="%Y-%m-%d %H:%M:%S")

In [None]:
df.head()

In [None]:
df.info()

In [None]:
# we have now a reduced value count due to the replacement of -999.0 values by nan values.
df.count().sum()

In [None]:
df.describe()

In [None]:
msno.bar(df, )
plt.title("Proportion of non-null data per weather station in complete date/time range", fontdict={'size' : '20'});

# Let's view some of the weather station data

In [None]:
def plot_weather_station_data(df, stations_id):
    one_day_window = 10*24

    fig, ax = plt.subplots(figsize=(20, 12))
    df.loc[:, str(stations_id)].plot()
    df.loc[:, str(stations_id)].rolling(one_day_window).mean().plot()

    plt.title("STATIONS_ID == " + str(stations_id), fontdict={"size" : 20}, pad=20)
    plt.xlabel("Years", fontdict={"size" : 20})
    plt.ylabel("Temerature in °C", fontdict={"size" : 20})
    plt.legend(["All values in 10 minute frequency", "Daily mean values"], prop={'size': 14})

    plt.show()

In [None]:
plot_weather_station_data(df, 3)

In [None]:
plot_weather_station_data(df, 298)

In [None]:
plot_weather_station_data(df, 1239)

In [None]:
plot_weather_station_data(df, 19172)