In [1]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

In [2]:
# The topic of interest.
topic_dir = "/hourly/precipitation/historical/"
#topic_dir = "/annual/kl/historical/"

# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"
ftp_dir =  ftp_climate_data_dir + topic_dir

In [3]:
local_ftp_dir         = "../data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

local_generated_dir   = "../data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallelly merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin

In [4]:
print(local_ftp_dir)
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

../data/original/DWD/
../data/original/DWD//hourly/precipitation/historical/
../data/original/DWD//hourly/precipitation/historical/

../data/generated/DWD/
../data/generated/DWD//hourly/precipitation/historical/
../data/generated/DWD//hourly/precipitation/historical/
../data/generated/DWD//hourly/precipitation/historical/


In [5]:
import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

In [6]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [7]:

ret = ftp.cwd(".")

In [8]:
from my_dwd import gen_df_from_ftp_dir_listing
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)
df_ftpdir.head(5)

Unnamed: 0,station_id,name,ext,size,type
0,-1,BESCHREIBUNG_obsgermany_climate_hourly_precipi...,.pdf,71445,-
1,-1,DESCRIPTION_obsgermany_climate_hourly_precipit...,.pdf,69716,-
2,-1,RR_Stundenwerte_Beschreibung_Stationen.txt,.txt,209079,-
3,3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,419265,-
4,20,stundenwerte_RR_00020_20040814_20191231_hist.zip,.zip,407378,-


In [9]:
import pandas as pd

In [10]:
from my_dwd import grabFile

In [11]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print("Station description file name:\n%s" % (station_fname))

# ALternative
#station_fname2 = df_ftpdir[df_ftpdir["name"].str.match("^.*Beschreibung_Stationen.*txt$")]["name"].values[0]
#print(station_fname2)

Station description file name:
RR_Stundenwerte_Beschreibung_Stationen.txt


In [12]:
src = ftp_dir + station_fname
dest = local_ftp_station_dir + station_fname
print("grabFile(ftp, src, dest):")
print("FTP source: " + src)
print("Local dest:   " + dest)
grabFile(ftp, src, dest)

grabFile(ftp, src, dest):
FTP source: /climate_environment/CDC/observations_germany/climate//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt
Local dest:   ../data/original/DWD//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt


In [13]:
from my_dwd import station_desc_txt_to_csv
basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")
df_stations.head()

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
3,1995-09-01,2011-04-01,202,50.7827,6.0941,Aachen,Nordrhein-Westfalen
20,2004-08-14,2021-03-20,432,48.922,9.9129,Abtsgm�nd-Untergr�ningen,Baden-W�rttemberg
44,2007-04-01,2021-03-20,44,52.9336,8.237,Gro�enkneten,Niedersachsen
53,2005-10-01,2021-03-20,60,52.585,13.5634,Ahrensfelde,Brandenburg
71,2004-10-22,2020-01-01,759,48.2156,8.9784,Albstadt-Badkap,Baden-W�rttemberg


In [14]:
# Create variable with TRUE if state is Nordrhein-Westfalen

# isNRW = df_stations['state'] == "Nordrhein-Westfalen"
isNRW = df_stations['state'].str.contains("Nordrhein")

# Create variable with TRUE if date_to is latest date (indicates operation up to now)
isOperational = df_stations['date_to'] == df_stations.date_to.max() 

#isBefore1950 = df_stations['date_from'] < '1950'
#dfNRW = df_stations[isNRW & isOperational & isBefore1950]

# select on both conditions

dfNRW = df_stations[isNRW & isOperational]

#print("Number of stations in NRW: \n", dfNRW.count())
dfNRW

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
216,2004-10-01,2021-03-20,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
389,2009-11-01,2021-03-20,436,51.0148,8.4318,"Berleburg, Bad-Arfeld",Nordrhein-Westfalen
390,2004-07-01,2021-03-20,610,50.9837,8.3683,"Berleburg, Bad-St�nzel",Nordrhein-Westfalen
554,1995-09-01,2021-03-20,23,51.8293,6.5365,Bocholt-Liedern (Wasserwerk),Nordrhein-Westfalen
613,2004-11-01,2021-03-20,206,51.5677,9.2324,Borgentreich,Nordrhein-Westfalen
...,...,...,...,...,...,...,...
13671,2007-12-01,2021-03-20,221,50.9655,7.2753,Overath-B�ke,Nordrhein-Westfalen
13696,2007-12-01,2021-03-20,60,51.5966,7.4048,Waltrop-Abdinghof,Nordrhein-Westfalen
13700,2008-05-01,2021-03-20,205,51.3329,7.3411,Gevelsberg-Oberbr�king,Nordrhein-Westfalen
13713,2007-11-01,2021-03-20,386,51.0899,7.6289,Meinerzhagen-Redlendorf,Nordrhein-Westfalen


In [15]:
import pandas as pd
from geopandas import GeoDataFrame
from shapely.geometry import Point
import fiona
from pyproj import CRS

#df = pd.read_csv('data.csv')
df = dfNRW

geometry = [Point(xy) for xy in zip(df.longitude, df.latitude)]
crs = CRS("epsg:4326") #http://www.spatialreference.org/ref/epsg/2263/
stations_gdf = GeoDataFrame(df, crs=crs, geometry=geometry)

stations_gdf.head(5)

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state,geometry
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
216,2004-10-01,2021-03-20,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen,POINT (7.88070 51.11430)
389,2009-11-01,2021-03-20,436,51.0148,8.4318,"Berleburg, Bad-Arfeld",Nordrhein-Westfalen,POINT (8.43180 51.01480)
390,2004-07-01,2021-03-20,610,50.9837,8.3683,"Berleburg, Bad-St�nzel",Nordrhein-Westfalen,POINT (8.36830 50.98370)
554,1995-09-01,2021-03-20,23,51.8293,6.5365,Bocholt-Liedern (Wasserwerk),Nordrhein-Westfalen,POINT (6.53650 51.82930)
613,2004-11-01,2021-03-20,206,51.5677,9.2324,Borgentreich,Nordrhein-Westfalen,POINT (9.23240 51.56770)


In [16]:
# PostgreSQL connection parameters -> create connection string (URL) 

param_dic = {
  "user" : "geo_master",
  "pw"   : "xxxxxx",
  "host" : "localhost",
  "db"   : "geo"
}

# https://www.w3schools.com/python/ref_string_format.asp
template = "postgres://{user}:{pw}@{host}:5432/{db}"

db_connection_url = template.format(**param_dic)
print("Connection URL: ", db_connection_url)

Connection URL:  postgres://geo_master:xxxxxx@localhost:5432/geo


In [17]:
# https://geopandas.readthedocs.io/en/latest/docs/reference/api/geopandas.GeoDataFrame.to_postgis.html
# https://docs.sqlalchemy.org/en/13/core/types.html

from sqlalchemy import create_engine
from sqlalchemy import Numeric, Float, Date, REAL

engine = create_engine(db_connection_url)

# Set data types in PG explicitly.
dtypes = {"station_id": Numeric(6,0), "altitude" : REAL, "date_from" : Date, "date_to" : Date, "longitude" : REAL, "latitude" : REAL}

stations_gdf.to_postgis(name="stations", schema="dwd", if_exists = "replace", index = "station_id", index_label=True, con=engine, dtype=dtypes)

#engine.execute('alter table dwd.stations add constraint my_awesome_pkey primary key (station_id)')
engine.execute('alter table dwd.stations add primary key (station_id)')

<sqlalchemy.engine.result.ResultProxy at 0x14c77828190>

In [18]:
#df_ftpdir["ext"]==".zip"
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.head(5)

Unnamed: 0_level_0,name,ext,size,type
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,419265,-
20,stundenwerte_RR_00020_20040814_20191231_hist.zip,.zip,407378,-
44,stundenwerte_RR_00044_20070401_20191231_hist.zip,.zip,320516,-
53,stundenwerte_RR_00053_20051001_20191231_hist.zip,.zip,361931,-
71,stundenwerte_RR_00071_20041022_20191231_hist.zip,.zip,402880,-


In [19]:
# Add the names of the actually downloaded zip files to this list. 
local_zip_list = []

station_ids_selected = list(dfNRW.index)

for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        print(fname)
        grabFile(ftp, ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)

stundenwerte_RR_00216_20041001_20191231_hist.zip
stundenwerte_RR_00389_20091101_20191231_hist.zip
stundenwerte_RR_00390_20040701_20191231_hist.zip
stundenwerte_RR_00554_19950901_20191231_hist.zip
stundenwerte_RR_00613_20041101_20191231_hist.zip
stundenwerte_RR_00617_20040601_20191231_hist.zip
stundenwerte_RR_00644_20050101_20191231_hist.zip
stundenwerte_RR_00796_20041101_20191231_hist.zip
stundenwerte_RR_00871_20050801_20191231_hist.zip
stundenwerte_RR_00902_20061001_20191231_hist.zip
stundenwerte_RR_00934_20041001_20191231_hist.zip
stundenwerte_RR_00989_20050201_20191231_hist.zip
stundenwerte_RR_01024_20060801_20191231_hist.zip
stundenwerte_RR_01046_20041001_20191231_hist.zip
stundenwerte_RR_01078_19950901_20191231_hist.zip
stundenwerte_RR_01241_20061201_20191231_hist.zip
stundenwerte_RR_01246_20150801_20191231_hist.zip
stundenwerte_RR_01300_20040601_20191231_hist.zip
stundenwerte_RR_01303_19950901_20191231_hist.zip
stundenwerte_RR_01327_20040801_20191231_hist.zip
stundenwerte_RR_0159

In [20]:
from zipfile import ZipFile
from my_dwd import prec_ts_to_df

In [21]:
csvfname = "prec_ts_appended_3_cols.csv"

first = False

for elt in local_zip_list:
    ffname = local_ftp_ts_dir + elt
    print("Zip archive: " + ffname)
    with ZipFile(ffname) as myzip:
        # read the time series data from the file starting with "produkt"
        prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
        print("Extract product file: %s" % prodfilename)
        print()
        with myzip.open(prodfilename) as myfile:
            dftmp = prec_ts_to_df(myfile)[["stations_id","r1"]]
            # df.rename(columns={'oldName1': 'newName1', 'oldName2': 'newName2'}, inplace=True)
            dftmp.rename(columns={'stations_id': 'station_id', 'r1': 'val', 'mess_datum': 'ts'}, inplace = True)
            dftmp.rename_axis('ts', inplace = True)
            # dftmp.to_csv(f, header=f.tell()==0)
            if (first):
                first = False
                dftmp.to_csv(csvfname, mode = "w", header = True)
            else:
                dftmp.to_csv(csvfname, mode = "a", header = False)

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00216_20041001_20191231_hist.zip
Extract product file: produkt_rr_stunde_20041001_20191231_00216.txt



  df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')


Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00389_20091101_20191231_hist.zip
Extract product file: produkt_rr_stunde_20091101_20191231_00389.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00390_20040701_20191231_hist.zip
Extract product file: produkt_rr_stunde_20040701_20191231_00390.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00554_19950901_20191231_hist.zip
Extract product file: produkt_rr_stunde_19950901_20191231_00554.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00613_20041101_20191231_hist.zip
Extract product file: produkt_rr_stunde_20041101_20191231_00613.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00617_20040601_20191231_hist.zip
Extract product file: produkt_rr_stunde_20040601_20191231_00617.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenw

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_03913_20040701_20191231_hist.zip
Extract product file: produkt_rr_stunde_20040701_20191231_03913.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_04063_20030701_20191231_hist.zip
Extract product file: produkt_rr_stunde_20030701_20191231_04063.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_04127_20050101_20191231_hist.zip
Extract product file: produkt_rr_stunde_20050101_20191231_04127.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_04150_20051201_20191231_hist.zip
Extract product file: produkt_rr_stunde_20051201_20191231_04150.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_04313_20040801_20191231_hist.zip
Extract product file: produkt_rr_stunde_20040801_20191231_04313.txt

Zip archive: ../data/original/DWD//hourly/precipitation/historical/stundenw

In [22]:
dftmp

Unnamed: 0_level_0,station_id,val
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2011-04-01 00:00:00+00:00,15000,0.0
2011-04-01 01:00:00+00:00,15000,0.0
2011-04-01 02:00:00+00:00,15000,0.0
2011-04-01 03:00:00+00:00,15000,0.0
2011-04-01 04:00:00+00:00,15000,0.0
...,...,...
2019-12-31 19:00:00+00:00,15000,0.0
2019-12-31 20:00:00+00:00,15000,0.0
2019-12-31 21:00:00+00:00,15000,0.0
2019-12-31 22:00:00+00:00,15000,0.0


In [23]:
first = True

dtypes = {"station_id": Numeric(6,0), "val" : REAL}

#for elt in local_zip_list[0:1]:
for elt in local_zip_list:
    ffname = local_ftp_ts_dir + elt
    #print("Zip archive: " + ffname)
    with ZipFile(ffname) as myzip:
        # read the time series data from the file starting with "produkt"
        prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
        print("Extract product file: %s" % prodfilename)
        # print()
        with myzip.open(prodfilename) as myfile:
            dftmp = prec_ts_to_df(myfile)[["stations_id","r1"]]
            # df.rename(columns={'oldName1': 'newName1', 'oldName2': 'newName2'}, inplace=True)
            dftmp.rename(columns={'stations_id': 'station_id', 'r1': 'val', 'mess_datum': 'ts'}, inplace = True)
            dftmp.rename_axis('ts', inplace = True)
            # dftmp.to_csv(f, header=f.tell()==0)
            if (first):
                first = False
                # dftmp.to_csv(csvfname, mode = "w", header = False)
                dftmp.to_sql(name="prec", schema="dwd", if_exists = "replace", index = ["ts"], index_label=True, con=engine, dtype=dtypes)
            else:
                # dftmp.to_csv(csvfname, mode = "a", header = False)
                dftmp.to_sql(name="prec", schema="dwd", if_exists = "append",  index = ["ts"], index_label=True, con=engine, dtype=dtypes)

# After insert completed: ceate index
print("create index")
engine.execute("ALTER TABLE dwd.prec ADD PRIMARY KEY (ts, station_id)")

Extract product file: produkt_rr_stunde_20041001_20191231_00216.txt


  df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')


Extract product file: produkt_rr_stunde_20091101_20191231_00389.txt
Extract product file: produkt_rr_stunde_20040701_20191231_00390.txt
Extract product file: produkt_rr_stunde_19950901_20191231_00554.txt
Extract product file: produkt_rr_stunde_20041101_20191231_00613.txt
Extract product file: produkt_rr_stunde_20040601_20191231_00617.txt
Extract product file: produkt_rr_stunde_20050101_20191231_00644.txt
Extract product file: produkt_rr_stunde_20041101_20191231_00796.txt
Extract product file: produkt_rr_stunde_20050801_20191231_00871.txt
Extract product file: produkt_rr_stunde_20061001_20191231_00902.txt
Extract product file: produkt_rr_stunde_20041001_20191231_00934.txt
Extract product file: produkt_rr_stunde_20050201_20191231_00989.txt
Extract product file: produkt_rr_stunde_20060801_20191231_01024.txt
Extract product file: produkt_rr_stunde_20041001_20191231_01046.txt
Extract product file: produkt_rr_stunde_19950901_20191231_01078.txt
Extract product file: produkt_rr_stunde_20061201

<sqlalchemy.engine.result.ResultProxy at 0x14c77bdd1c0>