# Write a DWD Station Decription Data Frame as Geopackage, Shapefile and Store it in PostGIS


* FTP: ftp://opendata.dwd.de/climate_environment/CDC/observations_germany/
* HTTPS: https://opendata.dwd.de/climate_environment/CDC/observations_germany/


## Directory Definitions

### Define FTP Directory Definition and Station Description Filename Pattern

In [86]:
# The topic of interest.
topic_dir = "/hourly/precipitation/recent/"
#topic_dir = "/annual/kl/historical/"

# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"
ftp_dir =  ftp_climate_data_dir + topic_dir

### Define Local Directories

In [87]:
local_ftp_dir         = "../data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

local_generated_dir   = "../data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallelly merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin

print("local_ftp_dir:         " , local_ftp_dir)
print("local_ftp_station_dir: ", local_ftp_station_dir)
print("local_ftp_ts_dir:      ", local_ftp_ts_dir)
print()
print("local_generated_dir:   ", local_generated_dir)
print("local_station_dir:     ", local_station_dir)
print("local_ts_merged_dir:   ", local_ts_merged_dir)
print("local_ts_appended_dir: ", local_ts_appended_dir)

local_ftp_dir:          ../data/original/DWD/
local_ftp_station_dir:  ../data/original/DWD//hourly/precipitation/recent/
local_ftp_ts_dir:       ../data/original/DWD//hourly/precipitation/recent/

local_generated_dir:    ../data/generated/DWD/
local_station_dir:      ../data/generated/DWD//hourly/precipitation/recent/
local_ts_merged_dir:    ../data/generated/DWD//hourly/precipitation/recent/
local_ts_appended_dir:  ../data/generated/DWD//hourly/precipitation/recent/


### Create Local Directories

In [88]:
import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

## FTP Connection

### Connection Parameters

In [89]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

### FTP Connect

In [90]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [91]:
# Just a check whether the connections is closed because of timeout ...
ret = ftp.cwd(".")

In [92]:
#ftp.quit()

## Generate Pandas Dataframe from FTP Directory Listing

In [93]:
from my_dwd import gen_df_from_ftp_dir_listing
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)
df_ftpdir.head(5)

Unnamed: 0,station_id,name,ext,size,type
0,-1,BESCHREIBUNG_obsgermany_climate_hourly_precipi...,.pdf,68888,-
1,-1,DESCRIPTION_obsgermany_climate_hourly_precipit...,.pdf,68313,-
2,-1,RR_Stundenwerte_Beschreibung_Stationen.txt,.txt,209079,-
3,20,stundenwerte_RR_00020_akt.zip,.zip,43931,-
4,44,stundenwerte_RR_00044_akt.zip,.zip,44209,-


## Download the Station Description File

In [94]:
import pandas as pd

In [95]:
from my_dwd import grabFile

In [96]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print("Station description file name:\n%s" % (station_fname))

# ALternative
#station_fname2 = df_ftpdir[df_ftpdir["name"].str.match("^.*Beschreibung_Stationen.*txt$")]["name"].values[0]
#print(station_fname2)

Station description file name:
RR_Stundenwerte_Beschreibung_Stationen.txt


In [97]:
src = ftp_dir + station_fname
dest = local_ftp_station_dir + station_fname
print("grabFile(ftp, src, dest):")
print("FTP source: " + src)
print("Local dest:   " + dest)
grabFile(ftp, src, dest)

grabFile(ftp, src, dest):
FTP source: /climate_environment/CDC/observations_germany/climate//hourly/precipitation/recent/RR_Stundenwerte_Beschreibung_Stationen.txt
Local dest:   ../data/original/DWD//hourly/precipitation/recent/RR_Stundenwerte_Beschreibung_Stationen.txt


## Read the Station Descrition File (.txt) into a Data Frame and Translate the Column Names

In [98]:
# extract column names. They are in German (de)
# We have to use codecs because of difficulties with character encoding (German Umlaute)
import codecs

def read_station_desc_txt_translate(txtfile):
    file = codecs.open(txtfile,"r","utf-8")
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(txtfile,skiprows=2,names=colnames_en, parse_dates=["date_from","date_to"],index_col = 0)
    
    return(df)


df_stations = read_station_desc_txt_translate(local_ftp_station_dir + station_fname)
df_stations.head()

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
3,1995-09-01,2011-04-01,202,50.7827,6.0941,Aachen,Nordrhein-Westfalen
20,2004-08-14,2021-02-03,432,48.922,9.9129,Abtsgmünd-Untergröningen,Baden-Württemberg
44,2007-04-01,2021-02-03,44,52.9336,8.237,Großenkneten,Niedersachsen
53,2005-10-01,2021-02-03,60,52.585,13.5634,Ahrensfelde,Brandenburg
71,2004-10-22,2020-01-01,759,48.2156,8.9784,Albstadt-Badkap,Baden-Württemberg


## Write the Clean Full Stations Data Frame to a CSV File

In [99]:
basename = os.path.splitext(station_fname)[0]
dest = local_station_dir + basename + ".csv"
print("Destination: ", dest)
df_stations.to_csv(dest, sep=";")

Destination:  ../data/generated/DWD//hourly/precipitation/recent/RR_Stundenwerte_Beschreibung_Stationen.csv


## Select Stations Located in NRW and Operational 

In [100]:
# Create variable with TRUE if state is Nordrhein-Westfalen

# isNRW = df_stations['state'] == "Nordrhein-Westfalen"
isNRW = df_stations['state'].str.contains("Nordrhein")

# Create variable with TRUE if date_to is latest date (indicates operation up to now)
isOperational = df_stations['date_to'] == df_stations.date_to.max() 

#isBefore1950 = df_stations['date_from'] < '1950'
#dfNRW = df_stations[isNRW & isOperational & isBefore1950]

# select on both conditions

dfNRW = df_stations[isNRW & isOperational]

dfNRW.to_csv(local_station_dir + basename + "_NRW" + ".csv", sep=";")

dfNRW.head(5)

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
216,2004-10-01,2021-02-03,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
389,2009-11-01,2021-02-03,436,51.0148,8.4318,"Berleburg, Bad-Arfeld",Nordrhein-Westfalen
390,2004-07-01,2021-02-03,610,50.9837,8.3683,"Berleburg, Bad-Stünzel",Nordrhein-Westfalen
554,1995-09-01,2021-02-03,23,51.8293,6.5365,Bocholt-Liedern (Wasserwerk),Nordrhein-Westfalen
603,1999-03-03,2021-02-03,147,50.7293,7.204,Königswinter-Heiderhof,Nordrhein-Westfalen


## Write the NRW Stations Data Frame to a CSV File

This CSV file contains point coordinates of each station (longitude, latitude). The CSV file can be imported in QGIS and the (longitude, latitude) can be associated with geometry x and y coodinates in the import dialog.  

In [101]:
basename = os.path.splitext(station_fname)[0]
dest = local_station_dir + basename + ".csv"
print("Destination: ", dest)
df_stations.to_csv(dest, sep=";")

Destination:  ../data/generated/DWD//hourly/precipitation/recent/RR_Stundenwerte_Beschreibung_Stationen.csv


## Geopandas - Create a Geo Data Frame

A Geopandas geo data frame is a Pandas data frame enriched with an additional geometry column. Each row in the data frame becomes a location information. Thus a geo-df contains geometry and attributes, i.e. full features. The geo-df is self-contained and complete. It can be easily saved in different vectore file formats, i.e. shapefile or geopackage.

In [102]:
import pandas as pd
from geopandas import GeoDataFrame
from shapely.geometry import Point
import fiona
from pyproj import CRS

#df = pd.read_csv('data.csv')
df = dfNRW

geometry = [Point(xy) for xy in zip(df.longitude, df.latitude)]
crs = CRS("epsg:4326") #http://www.spatialreference.org/ref/epsg/2263/
stations_gdf = GeoDataFrame(df, crs=crs, geometry=geometry)

stations_gdf.head(5)

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state,geometry
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
216,2004-10-01,2021-02-03,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen,POINT (7.88070 51.11430)
389,2009-11-01,2021-02-03,436,51.0148,8.4318,"Berleburg, Bad-Arfeld",Nordrhein-Westfalen,POINT (8.43180 51.01480)
390,2004-07-01,2021-02-03,610,50.9837,8.3683,"Berleburg, Bad-Stünzel",Nordrhein-Westfalen,POINT (8.36830 50.98370)
554,1995-09-01,2021-02-03,23,51.8293,6.5365,Bocholt-Liedern (Wasserwerk),Nordrhein-Westfalen,POINT (6.53650 51.82930)
603,1999-03-03,2021-02-03,147,50.7293,7.204,Königswinter-Heiderhof,Nordrhein-Westfalen,POINT (7.20400 50.72930)


### Write Geo Data Frame in Geopackage Format

In [103]:
# https://geopandas.org/io.html
basename = "DWD_Stations_Prec_Hourly_NRW"
dest = local_station_dir + basename + ".gpkg"
stations_gdf.to_file(driver="GPKG",filename=dest, layer='stations')

### Write Geo Data Frame in ESRI Shape File Format

In [104]:
# -> DriverSupportError: ESRI Shapefile does not support datetime fields
# stations_gdf.to_file(driver='ESRI Shapefile', filename='data.shp')

stations_gdf_esri = stations_gdf.copy() 

stations_gdf_esri["date_to"]=stations_gdf_esri["date_to"].astype(str)
stations_gdf_esri["date_from"]=stations_gdf_esri["date_from"].astype(str)

dest = local_station_dir + basename + ".shp"
stations_gdf_esri.to_file(driver='ESRI Shapefile', filename=dest)

## ATTENTION: The following will only work if you have a PostGIS Database set up properly!

## Write Geo Data Frame directly into PostGIS Database

* https://geopandas.readthedocs.io/en/latest/docs/reference/api/geopandas.GeoDataFrame.to_postgis.html
* https://docs.sqlalchemy.org/en/13/core/types.html
* https://www.postgresqltutorial.com/postgresql-primary-key/
* https://www.postgresql.org/docs/13/sql-altertable.html

In [105]:
# PostgreSQL connection parameters -> create connection string (URL) 

param_dic = {
  "user" : "geo_master",
  "pw"   : "xxxxxx",
  "host" : "localhost",
  "db"   : "geo"
}

db_connection_url = template.format(**param_dic)
print("Connection URL: ", db_connection_url) 

Connection URL:  postgres://geo_master:xxxxxx@localhost:5432/geo


In [106]:
# https://geopandas.readthedocs.io/en/latest/docs/reference/api/geopandas.GeoDataFrame.to_postgis.html
# https://docs.sqlalchemy.org/en/13/core/types.html

from sqlalchemy import create_engine
from sqlalchemy import Numeric, Float, Date, REAL

engine = create_engine(db_connection_url)

# Set data types in PG explicitly.
dtypes = {"station_id": Numeric(6,0), "altitude" : REAL, "date_from" : Date, "date_to" : Date, "longitude" : REAL, "latitude" : REAL}

stations_gdf.to_postgis(name="stations", schema="dwd", if_exists = "replace", index = "station_id", index_label=True, con=engine, dtype=dtypes)

#engine.execute('alter table dwd.stations add constraint my_awesome_pkey primary key (station_id)')
engine.execute('alter table dwd.stations add primary key (station_id)')

<sqlalchemy.engine.result.ResultProxy at 0x1bdbff29a60>

In [107]:
# https://www.w3schools.com/python/ref_string_format.asp
template = "postgres://{user}:{pw}@{host}:5432/{db}"

'postgres://geo_master:xxxxxx@localhost:5432/geo'