# geo0930: Insert time series into PostgreSQL/PostGIS and join it with the station info geodata.

The main idea behind this activity is to reformat and merge time series (here we use hourly precipitation) as well as weather station information from the DWD Climate Data Center in such a way that it can be used with the **QGIS TimeManager extension**. But this time the **join** of station info geodata and time series are performed in **PostgreSQL/PostGIS** instead of Pandas and CSV file.

### Connection Parameters

In [1]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

### FTP Directory Definition and Station Description Filename Pattern

In [2]:
# The topic of interest.
topic_dir = "/hourly/precipitation/historical/"
#topic_dir = "/annual/kl/historical/"

# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"
ftp_dir =  ftp_climate_data_dir + topic_dir

### Define and Create Local Directories

In [3]:
local_ftp_dir         = "data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

local_generated_dir   = "data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallelly merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin


In [4]:
print(local_ftp_dir) #generated in GIS_ExamWiSe2020-21>Repo
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

data/original/DWD/
data/original/DWD//hourly/precipitation/historical/
data/original/DWD//hourly/precipitation/historical/

data/generated/DWD/
data/generated/DWD//hourly/precipitation/historical/
data/generated/DWD//hourly/precipitation/historical/
data/generated/DWD//hourly/precipitation/historical/


In [5]:
import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

### FTP Connect

In [6]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [7]:
ret = ftp.cwd(".")


In [8]:
#ftp.quit()

### Generate Pandas Dataframe from FTP Directory Listing

In [10]:
#import my_dwd.py
from my_dwd import gen_df_from_ftp_dir_listing
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)
df_ftpdir.head(5)

Unnamed: 0,station_id,name,ext,size,type
0,-1,BESCHREIBUNG_obsgermany_climate_hourly_precipi...,.pdf,166317,-
1,-1,DESCRIPTION_obsgermany_climate_hourly_precipit...,.pdf,161348,-
2,-1,RR_Stundenwerte_Beschreibung_Stationen.txt,.txt,303009,-
3,3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,419296,-
4,20,stundenwerte_RR_00020_20040814_20201231_hist.zip,.zip,432124,-


## Download and Process the Station Description File

In [11]:
import pandas as pd

### Grab the txt File 

In [12]:
from my_dwd import grabFile

In [13]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print("Station description file name:\n%s" % (station_fname))

# ALternative
#station_fname2 = df_ftpdir[df_ftpdir["name"].str.match("^.*Beschreibung_Stationen.*txt$")]["name"].values[0]
#print(station_fname2)

Station description file name:
RR_Stundenwerte_Beschreibung_Stationen.txt


In [14]:
src = ftp_dir + station_fname
dest = local_ftp_station_dir + station_fname
print("grabFile(ftp, src, dest):")
print("FTP source: " + src)
print("Local dest:   " + dest)
grabFile(ftp, src, dest)

grabFile(ftp, src, dest):
FTP source: /climate_environment/CDC/observations_germany/climate//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt
Local dest:   data/original/DWD//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt


In [15]:
# extract column names. They are in German (de)
# We have to use codecs because of difficulties with character encoding (German Umlaute)
import codecs

def station_desc_txt_to_csv(txtfile, csvfile):
    import pandas as pd
    
    file = codecs.open(txtfile,"r","utf-8") 
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    # German-English dictionary
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(txtfile,skiprows=2,names=colnames_en, parse_dates=["date_from","date_to"],index_col = 0, encoding="unicode-escape")
    
    # write CSV file with field separator semicolon
    df.to_csv(csvfile, sep = ";")
    return(df)

### Rename the Column Headers

In [16]:
#from my_dwd import station_desc_txt_to_csv
basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")
df_stations.head()

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
3,1995-09-01,2011-04-01,202,50.7827,6.0941,Aachen,Nordrhein-Westfalen
20,2004-08-14,2022-03-03,432,48.9219,9.9129,Abtsgmünd-Untergröningen,Baden-Württemberg
29,2006-01-10,2022-03-03,260,49.7175,10.9101,Adelsdorf (Kläranlage),Bayern
44,2007-04-01,2022-03-03,44,52.9336,8.237,Großenkneten,Niedersachsen
46,2006-01-03,2022-03-03,325,48.945,12.4639,Aholfing,Bayern


### Select only Stations Located in NRW and being Operational in 2018

In [17]:
# Create variable with TRUE if state is Nordrhein-Westfalen
isNRW = df_stations['state'].str.contains("Nordrhein")

# Create variable with TRUE if date_to is latest date (indicates operation up to now) and date_from is 2018 and before.
isOperational = (df_stations['date_to'] >= '2018') &  (df_stations['date_from'] <= '2018')

# select on both conditions
dfNRW = df_stations[isNRW & isOperational]

#print("Number of stations in NRW: \n", dfNRW.count())
dfNRW

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
216,2004-10-01,2022-02-25,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
389,2009-11-01,2022-03-03,436,51.0148,8.4318,"Berleburg, Bad-Arfeld",Nordrhein-Westfalen
390,2004-07-01,2022-03-03,610,50.9837,8.3683,"Berleburg, Bad-Stünzel",Nordrhein-Westfalen
554,1995-09-01,2022-03-03,23,51.8293,6.5365,Bocholt-Liedern (Wasserwerk),Nordrhein-Westfalen
555,2008-01-01,2018-11-01,101,51.4789,7.2697,Bochum,Nordrhein-Westfalen
...,...,...,...,...,...,...,...
14184,2016-06-01,2022-03-03,126,51.2242,7.1070,Wuppertal-Buchenhofen/Wupper,Nordrhein-Westfalen
14185,2017-08-01,2022-03-03,304,51.6050,8.8175,Lichtenau-Ebbinghausen (HRB),Nordrhein-Westfalen
14186,2017-08-01,2022-03-03,291,51.5319,8.7289,Gollentaler Grund (HRB),Nordrhein-Westfalen
14187,2017-08-01,2021-01-07,226,51.5831,8.8478,Husen-Dalheim (HRB),Nordrhein-Westfalen


## Geopandas - Create a Geo Data Frame

A Geopandas geo data frame is a Pandas data frame enriched with an additional geometry column. Each row in the data frame becomes a location information. Thus a geo-df contains geometry and attributes, i.e. full features. The geo-df is self-contained and complete. It can be easily saved in different vectore file formats, i.e. shapefile or geopackage.

In [18]:
import pandas as pd
import geopandas as gpd
from geopandas import GeoDataFrame
from shapely.geometry import Point
import fiona
from pyproj import CRS

#df = pd.read_csv('data.csv')
df = dfNRW

geometry = [Point(xy) for xy in zip(df.longitude, df.latitude)]
crs = CRS("epsg:4326") #http://www.spatialreference.org/ref/epsg/2263/
stations_gdf = gpd.GeoDataFrame(df, crs=crs, geometry=geometry)

stations_gdf.head(5)

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state,geometry
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
216,2004-10-01,2022-02-25,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen,POINT (7.88070 51.11430)
389,2009-11-01,2022-03-03,436,51.0148,8.4318,"Berleburg, Bad-Arfeld",Nordrhein-Westfalen,POINT (8.43180 51.01480)
390,2004-07-01,2022-03-03,610,50.9837,8.3683,"Berleburg, Bad-Stünzel",Nordrhein-Westfalen,POINT (8.36830 50.98370)
554,1995-09-01,2022-03-03,23,51.8293,6.5365,Bocholt-Liedern (Wasserwerk),Nordrhein-Westfalen,POINT (6.53650 51.82930)
555,2008-01-01,2018-11-01,101,51.4789,7.2697,Bochum,Nordrhein-Westfalen,POINT (7.26970 51.47890)


## Connect to the PostGIS database

In [19]:
# PostgreSQL connection parameters -> create connection string (URL) 

param_dic = {
  "user" : "geo_master",
  "pw"   : "xxxxxx",
  "host" : "localhost",
  "db"   : "geo"
}

# https://www.w3schools.com/python/ref_string_format.asp
template = "postgresql://{user}:{pw}@{host}:5432/{db}"

db_connection_url = template.format(**param_dic)
print("Connection URL: ", db_connection_url) 

Connection URL:  postgresql://geo_master:xxxxxx@localhost:5432/geo


## Write Geopandas Data Frame directly into PostGIS Database

In [20]:
from sqlalchemy import create_engine
from sqlalchemy import Numeric, Float, Date, REAL

engine = create_engine(db_connection_url)


# Set data types in PG explicitly.
dtypes = {"station_id": Numeric(6,0), "altitude" : REAL, "date_from" : Date, "date_to" : Date, "longitude" : REAL, "latitude" : REAL}
engine.execute("DROP VIEW IF EXISTS dwd.v_stations_prec")

stations_gdf.to_postgis(name="stations", schema="dwd", if_exists = "replace", index = "station_id", index_label=True, con=engine, dtype=dtypes)

#engine.execute('alter table dwd.stations add constraint my_awesome_pkey primary key (station_id)')
engine.execute('alter table dwd.stations add primary key (station_id)') 

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1e3adcac0a0>

## Download and Process the Time Series Zip Archives

Extract the product file (txt file containing several time series for different variables) from an archive, extract the relevant time series from the product file, limit the time series interval if needed and append it to a dataframe. Finally insert the dataframe to the PostGIS database. 

### Dataframe with TS Zip Files from FTP Directory Listing 

In [21]:
#df_ftpdir["ext"]==".zip"
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.head(5)

Unnamed: 0_level_0,name,ext,size,type
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,419296,-
20,stundenwerte_RR_00020_20040814_20201231_hist.zip,.zip,432124,-
44,stundenwerte_RR_00044_20070401_20201231_hist.zip,.zip,354983,-
53,stundenwerte_RR_00053_20051001_20201231_hist.zip,.zip,385830,-
71,stundenwerte_RR_00071_20041022_20200101_hist.zip,.zip,402875,-


### Download TS Data from FTP Server

In [22]:
# Add the names of the actually downloaded zip files to this list. 
local_zip_list = []

station_ids_selected = list(dfNRW.index)

for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        print(fname)
        grabFile(ftp, ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)

stundenwerte_RR_00216_20041001_20201231_hist.zip
stundenwerte_RR_00389_20091101_20201231_hist.zip
stundenwerte_RR_00390_20040701_20201231_hist.zip
stundenwerte_RR_00554_19950901_20201231_hist.zip
stundenwerte_RR_00555_20080101_20181101_hist.zip
stundenwerte_RR_00603_19990303_20201231_hist.zip
stundenwerte_RR_00613_20041101_20201231_hist.zip
stundenwerte_RR_00617_20040601_20201231_hist.zip
stundenwerte_RR_00644_20050101_20201231_hist.zip
stundenwerte_RR_00796_20041101_20201231_hist.zip
stundenwerte_RR_00871_20050801_20201231_hist.zip
stundenwerte_RR_00902_20061001_20201231_hist.zip
stundenwerte_RR_00934_20041001_20201231_hist.zip
stundenwerte_RR_00989_20050201_20201231_hist.zip
stundenwerte_RR_01024_20060801_20201231_hist.zip
stundenwerte_RR_01046_20041001_20201231_hist.zip
stundenwerte_RR_01078_19950901_20201231_hist.zip
stundenwerte_RR_01241_20061201_20201231_hist.zip
stundenwerte_RR_01246_20150801_20201231_hist.zip
stundenwerte_RR_01300_20040601_20201231_hist.zip
stundenwerte_RR_0130

In [23]:
len(local_zip_list)

88

In [26]:
from zipfile import ZipFile

In [27]:
# Filter to extract PRECIPITATION from recent and historical RR product files.
def prec_ts_to_df(fname):

    import pandas as pd
    import pytz
    from datetime import datetime
    
    dateparse = lambda dates: [datetime.strptime(str(d), '%Y%m%d%H') for d in dates]

    df = pd.read_csv(fname, delimiter=";", encoding="utf8", index_col="MESS_DATUM", parse_dates = ["MESS_DATUM"], date_parser = dateparse, na_values = [-999.0, -999])
    
    # https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd

    # Column headers: remove leading blanks (strip), replace " " with "_", and convert to lower case.
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=True).str.replace('(', '', regex=True).str.replace(')', '', regex=True)
    df.index.name = df.index.name.strip().lower().replace(' ', '_').replace('(', '').replace(')', '')
    
    # TIME ZONES: https://stackoverflow.com/questions/22800079/converting-time-zone-pandas-dataframe
    # DWD Prec Data is given in UTC
    df.index = df.index.tz_localize(pytz.utc)
    return(df)

In [194]:
#produce a very large txt file with only 3 rows
csvfname = "data/generated/precipitation_timeseriess_appended_3_cols.csv" #localcopy ot TS table Station_id, timestamp and measurement values

first = False
for elt in local_zip_list:
    ffname = local_ftp_ts_dir + elt
    print("Zip archive: " + ffname)
    with ZipFile(ffname) as myzip:
        # read the time series data from the file starting with "produkt"
        prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
        print("Extract product file: %s" % prodfilename)
        print()
        with myzip.open(prodfilename) as myfile:
            dftmp = prec_ts_to_df(myfile)[["stations_id","r1"]]
            # df.rename(columns={'oldName1': 'newName1', 'oldName2': 'newName2'}, inplace=True)
            dftmp.rename(columns={'stations_id': 'station_id', 'r1': 'val', 'mess_datum': 'ts'}, inplace = True)
            dftmp.rename_axis('ts', inplace = True)
            # dftmp.to_csv(f, header=f.tell()==0)
            if (first):
                first = False
                dftmp.to_csv(csvfname, mode = "w", header = True)
            else:
                dftmp.to_csv(csvfname, mode = "a", header = False)
                

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00216_20041001_20201231_hist.zip
Extract product file: produkt_rr_stunde_20041001_20201231_00216.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00389_20091101_20201231_hist.zip
Extract product file: produkt_rr_stunde_20091101_20201231_00389.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00390_20040701_20201231_hist.zip
Extract product file: produkt_rr_stunde_20040701_20201231_00390.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00554_19950901_20201231_hist.zip
Extract product file: produkt_rr_stunde_19950901_20201231_00554.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00555_20080101_20181101_hist.zip
Extract product file: produkt_rr_stunde_20080101_20181101_00555.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00603_1999

In [195]:
dftmp

Unnamed: 0_level_0,station_id,val
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2011-04-01 00:00:00+00:00,15000,0.0
2011-04-01 01:00:00+00:00,15000,0.0
2011-04-01 02:00:00+00:00,15000,0.0
2011-04-01 03:00:00+00:00,15000,0.0
2011-04-01 04:00:00+00:00,15000,0.0
...,...,...
2020-12-31 19:00:00+00:00,15000,0.0
2020-12-31 20:00:00+00:00,15000,0.1
2020-12-31 21:00:00+00:00,15000,0.0
2020-12-31 22:00:00+00:00,15000,0.1


In [28]:
#Inserts the data directly into postgresql #better option than to create a text file locally
first = True

dtypes = {"station_id": Numeric(6,0), "val" : REAL}

#for elt in local_zip_list[0:1]:
for elt in local_zip_list:
    ffname = local_ftp_ts_dir + elt
    #print("Zip archive: " + ffname)
    with ZipFile(ffname) as myzip:
        # read the time series data from the file starting with "produkt"
        prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
        print("Extract product file: %s" % prodfilename)
        # print()
        with myzip.open(prodfilename) as myfile:
            dftmp = prec_ts_to_df(myfile)[["stations_id","r1"]]
            # df.rename(columns={'oldName1': 'newName1', 'oldName2': 'newName2'}, inplace=True)
            dftmp.rename(columns={'stations_id': 'station_id', 'r1': 'val', 'mess_datum': 'ts'}, inplace = True)
            dftmp.rename_axis('ts', inplace = True)
            # dftmp.to_csv(f, header=f.tell()==0)
            if (first):
                first = False
                # dftmp.to_csv(csvfname, mode = "w", header = False)
                dftmp.to_sql(name="prec", schema="dwd", if_exists = "replace", index = ["ts"], index_label=True, con=engine, dtype=dtypes)
            else:
                # dftmp.to_csv(csvfname, mode = "a", header = False)
                dftmp.to_sql(name="prec", schema="dwd", if_exists = "append",  index = ["ts"], index_label=True, con=engine, dtype=dtypes)

# After insert completed: ceate index
print("create index")
engine.execute("ALTER TABLE dwd.prec ADD PRIMARY KEY (ts, station_id)")

Extract product file: produkt_rr_stunde_20041001_20201231_00216.txt
Extract product file: produkt_rr_stunde_20091101_20201231_00389.txt
Extract product file: produkt_rr_stunde_20040701_20201231_00390.txt
Extract product file: produkt_rr_stunde_19950901_20201231_00554.txt
Extract product file: produkt_rr_stunde_20080101_20181101_00555.txt
Extract product file: produkt_rr_stunde_19990303_20201231_00603.txt
Extract product file: produkt_rr_stunde_20041101_20201231_00613.txt
Extract product file: produkt_rr_stunde_20040601_20201231_00617.txt
Extract product file: produkt_rr_stunde_20050101_20201231_00644.txt
Extract product file: produkt_rr_stunde_20041101_20201231_00796.txt
Extract product file: produkt_rr_stunde_20050801_20201231_00871.txt
Extract product file: produkt_rr_stunde_20061001_20201231_00902.txt
Extract product file: produkt_rr_stunde_20041001_20201231_00934.txt
Extract product file: produkt_rr_stunde_20050201_20201231_00989.txt
Extract product file: produkt_rr_stunde_20060801

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2c1866c71c0>

In [197]:
engine.execute("CREATE OR REPLACE VIEW dwd.v_stations_prec as (select t1.station_id, t2.ts, t2.val, t1.geometry from dwd.stations t1, dwd.prec t2 where t1.station_id = t2.station_id)")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x183f37a1120>

In [198]:
import types
def imports():
    for name, val in globals().items():
        if isinstance(val, types.ModuleType):
            yield val.__name__
list(imports())

['builtins',
 'builtins',
 'os',
 'ftplib',
 'pandas',
 'codecs',
 'fiona',
 'geopandas',
 'types']