## 2. DWD Precipitation Measurements in North Rhine-Westphalia (NRW): Create a Movie with QGIS Temporal Controller

##### Spatio-Temporal Precipitation Anmination using PostGIS together with QGIS Temporal Controller

Produce a rainfall video over a 7-day period in 2021 that covers the heavy rains that led to the catastrophic flooding. The animation should cover all DWD stations in NRW with hourly precipitation. Create a point layer in QGIS showing the stations in NRW with time dependent precipitation rate data (mm per hour) in the attribute table. This layer has to come from a PostGIS view, which joins the two tables with 1) static station info including coordinates and 2) time dependent precipitation rates. Use the QGIS Temporal Controller to produce an animation.

#### Sub-Task 2.1:

Create and fill your own geodatabase
Create and fill your own geodatabase with DWD precipitation station data as well as hourly precipitation time series. Follow the Jupyter Notebook tutorial of geo0930_PostGIS_Insert_DWD_Stations_and_TS (main notebook geo0930_PostGIS_DWD_Stations_and_TS_V002.ipynb) together with the respective YouTube tutorial here

(Remark: The general opengeo repository is under construction. I am using it to reorganize my teaching material independent from specific courses. It will be filled step by step.)

The last part of the video tutorial (from March 2021) is meanwhile outdated, because it explains how to produce an animation based on the QGIS TimaManager plugin. The TimeManager plugin is deprecated! The new way is to use the QGIS Temporal Controller, which is integrated in recent QGIS versions. I have no video on that, yet.

The following video shows an example of a precipitation animation generated with the QGIS TimeManager. It is meant to give you an idea about the activity.

In [88]:
# The topic of interest
topic_dir = "/hourly/precipitation/historical/"
print("Subdirectory on FTP Server:", topic_dir)

Subdirectory on FTP Server: /hourly/precipitation/historical/


## Local Directories

In [89]:
#local_ftp_dir         = "../data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_dir         = "data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

#local_generated_dir   = "../data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_generated_dir   = "data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallel merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin

In [90]:
import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

In [91]:
print(local_ftp_dir)
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

data/original/DWD/
data/original/DWD//hourly/precipitation/historical/
data/original/DWD//hourly/precipitation/historical/

data/generated/DWD/
data/generated/DWD//hourly/precipitation/historical/
data/generated/DWD//hourly/precipitation/historical/
data/generated/DWD//hourly/precipitation/historical/


## FTP Connection

In [92]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

### FTP Directory Definition and Station Description Filename Pattern


In [93]:
# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"

# The absolute ftp directory with the data (topic) of concern
ftp_dir =  ftp_climate_data_dir + topic_dir
print("Absolte FTP directory path with data of concern:", ftp_dir)

Absolte FTP directory path with data of concern: /climate_environment/CDC/observations_germany/climate//hourly/precipitation/historical/


### FTP Connect

In [94]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [95]:
ret = ftp.cwd(".")

### FTP Grab File Function

In [96]:
def grabFile(ftpfullname,localfullname):
    try:
        ret = ftp.cwd(".") # A dummy action to chack the connection and to provoke an exception if necessary.
        localfile = open(localfullname, 'wb')
        ftp.retrbinary('RETR ' + ftpfullname, localfile.write, 1024)
        localfile.close()
    
    except ftplib.error_perm:
        print("FTP ERROR. Operation not permitted. File not found?")

    except ftplib.error_temp:
        print("FTP ERROR. Timeout.")

    except ConnectionAbortedError:
        print("FTP ERROR. Connection aborted.")


## Generate Pandas Dataframe from FTP Directory Listing

In [97]:
import pandas as pd
import os

def gen_df_from_ftp_dir_listing(ftp, ftpdir):
    lines = []
    flist = []
    try:    
        res = ftp.retrlines("LIST "+ftpdir, lines.append)
    except:
        print("Error: ftp.retrlines() failed. ftp timeout? Reconnect!")
        return
        
    if len(lines) == 0:
        print("Error: ftp dir is empty")
        return
    
    for line in lines:
#        print(line)
        [ftype, fsize, fname] = [line[0:1], int(line[31:42]), line[56:]]
#        itemlist = [line[0:1], int(line[31:42]), line[56:]]
#        flist.append(itemlist)
        
        fext = os.path.splitext(fname)[-1]
        
        if fext == ".zip":
            station_id = int(fname.split("_")[2])
        else:
            station_id = -1 
        
        flist.append([station_id, fname, fext, fsize, ftype])
        
        

    df_ftpdir = pd.DataFrame(flist,columns=["station_id", "name", "ext", "size", "type"])
    return(df_ftpdir)

In [98]:
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)

In [99]:
df_ftpdir.head(10)

Unnamed: 0,station_id,name,ext,size,type
0,-1,BESCHREIBUNG_obsgermany_climate_hourly_precipi...,.pdf,166317,-
1,-1,DESCRIPTION_obsgermany_climate_hourly_precipit...,.pdf,161348,-
2,-1,RR_Stundenwerte_Beschreibung_Stationen.txt,.txt,310685,-
3,3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,418905,-
4,20,stundenwerte_RR_00020_20040814_20211231_hist.zip,.zip,456263,-
5,44,stundenwerte_RR_00044_20070401_20211231_hist.zip,.zip,378416,-
6,53,stundenwerte_RR_00053_20051001_20211231_hist.zip,.zip,409591,-
7,71,stundenwerte_RR_00071_20041022_20200101_hist.zip,.zip,402406,-
8,73,stundenwerte_RR_00073_20070401_20211231_hist.zip,.zip,380526,-
9,78,stundenwerte_RR_00078_20041101_20211231_hist.zip,.zip,445888,-


### Dataframe with TS Zip Files

In [100]:
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.head(10)

Unnamed: 0_level_0,name,ext,size,type
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,418905,-
20,stundenwerte_RR_00020_20040814_20211231_hist.zip,.zip,456263,-
44,stundenwerte_RR_00044_20070401_20211231_hist.zip,.zip,378416,-
53,stundenwerte_RR_00053_20051001_20211231_hist.zip,.zip,409591,-
71,stundenwerte_RR_00071_20041022_20200101_hist.zip,.zip,402406,-
73,stundenwerte_RR_00073_20070401_20211231_hist.zip,.zip,380526,-
78,stundenwerte_RR_00078_20041101_20211231_hist.zip,.zip,445888,-
87,stundenwerte_RR_00087_20050201_20211231_hist.zip,.zip,426930,-
91,stundenwerte_RR_00091_20040901_20211231_hist.zip,.zip,443441,-
96,stundenwerte_RR_00096_20190409_20211231_hist.zip,.zip,79249,-


## Download the Station Description File

In [101]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print(station_fname)

RR_Stundenwerte_Beschreibung_Stationen.txt


In [102]:
print("grabFile: ")
print("From: " + ftp_dir + station_fname)
print("To:   " + local_ftp_station_dir + station_fname)
grabFile(ftp_dir + station_fname, local_ftp_station_dir + station_fname)

grabFile: 
From: /climate_environment/CDC/observations_germany/climate//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt
To:   data/original/DWD//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt


In [103]:
# extract column names. They are in German (de)
# We have to use codecs because of difficulties with character encoding (German Umlaute)
import codecs

def station_desc_txt_to_csv(txtfile, csvfile):
    file = codecs.open(txtfile, "r", encoding="latin1")
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(\
        txtfile, skiprows=2, names=colnames_en, \
        parse_dates=["date_from","date_to"], index_col = 0, \
        encoding="latin1")
    
    # write csv
    df.to_csv(csvfile, sep = ";")
    return(df)

In [104]:
basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")
df_stations.head()

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
3,1995-09-01,2011-04-01,202,50.7827,6.0941,Aachen,Nordrhein-Westfalen
20,2004-08-14,2022-09-04,432,48.9219,9.9129,Abtsgmünd-Untergröningen,Baden-Württemberg
29,2006-01-10,2022-09-04,260,49.7175,10.9101,Adelsdorf (Kläranlage),Bayern
44,2007-04-01,2022-09-04,44,52.9336,8.237,Großenkneten,Niedersachsen
46,2006-01-03,2022-09-04,325,48.945,12.4639,Aholfing,Bayern


### Select Stations Located in North Rhine-Westphalia from Station Description Dataframe

In [105]:
# Create variable with TRUE if state is Nordrhein-Westfalen
isNRW = (df_stations['state'] == "Nordrhein-Westfalen").values

# Create variable with TRUE if date_to is latest date (indicates operation up to now)
isOperational = df_stations['date_to'] == df_stations.date_to.max()

# select on both conditions
dfNRW = df_stations[isNRW & isOperational]

print(f"Stations located in North Rhine-Westphalia and still active till the end of the historical data: \n{list(dfNRW)}")

Stations located in North Rhine-Westphalia and still active till the end of the historical data: 
['date_from', 'date_to', 'altitude', 'latitude', 'longitude', 'name', 'state']


In [106]:
df_stations.loc[station_ids_selected].head()

ValueError: Cannot index with multidimensional key

In [48]:
# Add the names of the zip files only to a list. 
local_zip_list = []

for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        print(fname)
        grabFile(ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)

stundenwerte_RR_00216_20041001_20211231_hist.zip
stundenwerte_RR_00389_20091101_20211231_hist.zip
stundenwerte_RR_00390_20040701_20211231_hist.zip
stundenwerte_RR_00554_19950901_20211231_hist.zip
stundenwerte_RR_00603_19990303_20211231_hist.zip
stundenwerte_RR_00613_20041101_20211231_hist.zip
stundenwerte_RR_00617_20040601_20211231_hist.zip
stundenwerte_RR_00644_20050101_20211231_hist.zip
stundenwerte_RR_00796_20041101_20211231_hist.zip
stundenwerte_RR_00871_20050801_20211231_hist.zip
stundenwerte_RR_00902_20061001_20211231_hist.zip
stundenwerte_RR_00934_20041001_20211231_hist.zip
stundenwerte_RR_00989_20050201_20211231_hist.zip
stundenwerte_RR_01024_20060801_20211231_hist.zip
stundenwerte_RR_01046_20041001_20211231_hist.zip
stundenwerte_RR_01078_19950901_20211231_hist.zip
stundenwerte_RR_01241_20061201_20211231_hist.zip
stundenwerte_RR_01246_20150801_20211231_hist.zip
stundenwerte_RR_01300_20040601_20211231_hist.zip
stundenwerte_RR_01303_19950901_20211231_hist.zip
stundenwerte_RR_0132

In [49]:
print(len(local_zip_list))

85


## Selected Period for Time Series in Data Frames

These parameters are used to limit the time period of the series added to the Pandas data frames. 
<br>To select all dates in the series you can set a very broad interval, e.g.

In [50]:
date_from = '2021-07-12 00:00:00'
date_to   = '2021-07-19 00:00:00'
#working on hourly time frame during 7 days only when the flood occured in NRW in July 2021

## Join (Merge) the Time Series Columns

The goal is to create a data frame with column oriented time series. Each column contuins a time series for one station. Column name is the station number.

### Precipitation Time Series Data (text file) to Data Frame 
Read a text file with precipitation time series (CSV file) and make it a data frame. <br>
Only add measuerments which lie within the given time period.

In [51]:
import datetime as dt

In [52]:
def prec_ts_to_df(fname, date_from='2021-07-12 00:00:00', date_to='2021-07-12 00:00:00'):
    import datetime as dt
    
    dateparse = lambda dates: [dt.datetime.strptime(str(d), '%Y%m%d%H') for d in dates]

    df = pd.read_csv(fname, delimiter=";", encoding="utf8", index_col="MESS_DATUM",
                     parse_dates = ["MESS_DATUM"], date_parser = dateparse, na_values = [-999.0, -999])

    # Attention: Selecting df with given dates may lead to empty result!
    df = df[(df.index >= date_from) & (df.index <= date_to)]
    
    # Code inspired by: https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd
    # Column headers: remove leading blanks (strip), replace " " with "_", and convert to lower case.

    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=False).str.replace('(', '', regex=False).str.replace(')', '', regex=False)
    df.index.name = df.index.name.strip().lower().replace(' ', '_').replace('(', '').replace(')', '')
    return(df)

### Climate Time Series Data (text file) to Data Frame 
The KL data format for annual temperatures differs significantly from the RR format for hourly precipitation.

Tha hourly data uses a unique time stamp in hourly resolution `[MESS_DATUM]`, e.g. ['2021071410'], as time reference for the measurements.

In [53]:
def kl_ts_to_df(fname, date_from='2021-07-12', date_to='2021-07-19'):
    import datetime as dt
    
    dateparse = lambda dates: [dt.datetime.strptime(str(d), '%Y%m%d%H') for d in dates]

    df = pd.read_csv(fname, delimiter=";", encoding="utf8", index_col="MESS_DATUM", date_parser = dateparse, na_values = [-999.0, -999])

    # Attention: Selecting df with given dates may lead to empty result!
    df = df[(df.index >= date_from) & (df.index <= date_to)]
    
    # Code inspired by: https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd
    # Column headers: remove leading blanks (strip), replace " " with "_", and convert to lower case.

    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=False).str.replace('(', '', regex=False).str.replace(')', '', regex=False)
    df.index.name = df.index.name.strip().lower().replace(' ', '_').replace('(', '').replace(')', '')
    return(df)

### Merge Columnwise

In [54]:
from zipfile import ZipFile
def ts_merge(date_from='2021-07-12 00:00:00', date_to='2021-07-19 00:00:00'):
    # Very compact code.
    df = pd.DataFrame()
    for elt in local_zip_list:
        ffname = local_ftp_ts_dir + elt
        print("Zip archive: " + ffname)
        with ZipFile(ffname) as myzip:
            # read the time series data from the file starting with "produkt"
            prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
            print("Extract product file: %s" % prodfilename)
            print()
            with myzip.open(prodfilename) as myfile:
                dftmp = kl_ts_to_df(myfile, date_from, date_to)

                #if len(dftmp) > 0: # Check if cropped df is empty, i.e. no values in given period.
                 #   s = dftmp["R1"].rename(dftmp["stations_id"][0]).to_frame()
                  #  df = pd.merge(df, s, left_index=True, right_index=True, how='outer')
                #else:
                 #   print("WARNING: data file", prodfilename, "does not contain data for given period.")


    df.index.rename(name = "time", inplace = True)
    return(df)

In [55]:
df_merged_ts = ts_merge(date_from, date_to)

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00216_20041001_20211231_hist.zip
Extract product file: produkt_rr_stunde_20041001_20211231_00216.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00389_20091101_20211231_hist.zip
Extract product file: produkt_rr_stunde_20091101_20211231_00389.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00390_20040701_20211231_hist.zip
Extract product file: produkt_rr_stunde_20040701_20211231_00390.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00554_19950901_20211231_hist.zip
Extract product file: produkt_rr_stunde_19950901_20211231_00554.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00603_19990303_20211231_hist.zip
Extract product file: produkt_rr_stunde_19990303_20211231_00603.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00613_2004

In [61]:
df_plt = df_stations [(( df_stations['name'].str.contains("Meinerzhagen-Redlendorf")) |
                       (df_stations['name'].str.contains("Aachen-Orsbach")))]
df_plt

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
13713,2007-11-01,2022-09-04,386,51.0899,7.6289,Meinerzhagen-Redlendorf,Nordrhein-Westfalen
15000,2011-04-01,2022-09-04,231,50.7983,6.0244,Aachen-Orsbach,Nordrhein-Westfalen
