## FTP Connection

### Connection Parameters

In [39]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

### FTP Directory Definition and Station Description Filename Pattern

In [40]:
# The topic of interest.
topic_dir = "/hourly/precipitation/historical/"
#topic_dir = "/annual/kl/historical/"

# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"
ftp_dir =  ftp_climate_data_dir + topic_dir

### Local Directories

In [41]:
local_ftp_dir         = "data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

local_generated_dir   = "data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallelly merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin

In [42]:
print(local_ftp_dir)
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

data/original/DWD/
data/original/DWD//hourly/precipitation/historical/
data/original/DWD//hourly/precipitation/historical/

data/generated/DWD/
data/generated/DWD//hourly/precipitation/historical/
data/generated/DWD//hourly/precipitation/historical/
data/generated/DWD//hourly/precipitation/historical/


In [43]:
import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

### FTP Connect

In [44]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [45]:
ret = ftp.cwd(".")

In [46]:
#ftp.quit()

### FTP Grab File Function

In [47]:
def grabFile(ftpfullname,localfullname):
    try:
        ret = ftp.cwd(".") # A dummy action to chack the connection and to provoke an exception if necessary.
        localfile = open(localfullname, 'wb')
        ftp.retrbinary('RETR ' + ftpfullname, localfile.write, 1024)
        localfile.close()
    
    except ftplib.error_perm:
        print("FTP ERROR. Operation not permitted. File not found?")

    except ftplib.error_temp:
        print("FTP ERROR. Timeout.")

    except ConnectionAbortedError:
        print("FTP ERROR. Connection aborted.")



### Generate Pandas Dataframe from FTP Directory Listing

In [48]:
import pandas as pd
import os

def gen_df_from_ftp_dir_listing(ftp, ftpdir):
    lines = []
    flist = []
    try:    
        res = ftp.retrlines("LIST "+ftpdir, lines.append)
    except:
        print("Error: ftp.retrlines() failed. ftp timeout? Reconnect!")
        return
        
    if len(lines) == 0:
        print("Error: ftp dir is empty")
        return
    
    for line in lines:
#        print(line)
        [ftype, fsize, fname] = [line[0:1], int(line[31:42]), line[56:]]
#        itemlist = [line[0:1], int(line[31:42]), line[56:]]
#        flist.append(itemlist)
        
        fext = os.path.splitext(fname)[-1]
        
        if fext == ".zip":
            station_id = int(fname.split("_")[2])
        else:
            station_id = -1 
        
        flist.append([station_id, fname, fext, fsize, ftype])
        
        

    df_ftpdir = pd.DataFrame(flist,columns=["station_id", "name", "ext", "size", "type"])
    return(df_ftpdir)

In [49]:
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)

In [50]:
df_ftpdir.head(10)

Unnamed: 0,station_id,name,ext,size,type
0,-1,BESCHREIBUNG_obsgermany_climate_hourly_precipi...,.pdf,166317,-
1,-1,DESCRIPTION_obsgermany_climate_hourly_precipit...,.pdf,161348,-
2,-1,RR_Stundenwerte_Beschreibung_Stationen.txt,.txt,303009,-
3,3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,419296,-
4,20,stundenwerte_RR_00020_20040814_20201231_hist.zip,.zip,432124,-
5,44,stundenwerte_RR_00044_20070401_20201231_hist.zip,.zip,354983,-
6,53,stundenwerte_RR_00053_20051001_20201231_hist.zip,.zip,385830,-
7,71,stundenwerte_RR_00071_20041022_20200101_hist.zip,.zip,402875,-
8,73,stundenwerte_RR_00073_20070401_20201231_hist.zip,.zip,357529,-
9,78,stundenwerte_RR_00078_20041101_20201231_hist.zip,.zip,421522,-


### Dataframe with TS Zip Files

In [51]:
#df_ftpdir["ext"]==".zip"
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.head(10)

Unnamed: 0_level_0,name,ext,size,type
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
3,stundenwerte_RR_00003_19950901_20110401_hist.zip,.zip,419296,-
20,stundenwerte_RR_00020_20040814_20201231_hist.zip,.zip,432124,-
44,stundenwerte_RR_00044_20070401_20201231_hist.zip,.zip,354983,-
53,stundenwerte_RR_00053_20051001_20201231_hist.zip,.zip,385830,-
71,stundenwerte_RR_00071_20041022_20200101_hist.zip,.zip,402875,-
73,stundenwerte_RR_00073_20070401_20201231_hist.zip,.zip,357529,-
78,stundenwerte_RR_00078_20041101_20201231_hist.zip,.zip,421522,-
87,stundenwerte_RR_00087_20050201_20201231_hist.zip,.zip,403610,-
91,stundenwerte_RR_00091_20040901_20201231_hist.zip,.zip,419182,-
96,stundenwerte_RR_00096_20190409_20201231_hist.zip,.zip,53260,-


### Download the Station Description File

In [52]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print(station_fname)

RR_Stundenwerte_Beschreibung_Stationen.txt


In [53]:
print("grabFile: ")
print("From: " + ftp_dir + station_fname)
print("To:   " + local_ftp_station_dir + station_fname)
grabFile(ftp_dir + station_fname, local_ftp_station_dir + station_fname)

grabFile: 
From: /climate_environment/CDC/observations_germany/climate//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt
To:   data/original/DWD//hourly/precipitation/historical/RR_Stundenwerte_Beschreibung_Stationen.txt


In [54]:
# extract column names. They are in German (de)
# We have to use codecs because of difficulties with character encoding (German Umlaute)
import codecs

def station_desc_txt_to_csv(txtfile, csvfile):
    file = codecs.open(txtfile,"r","utf-8")
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(txtfile,skiprows=2,names=colnames_en, parse_dates=["date_from","date_to"],index_col = 0)
    
    # write csv
    df.to_csv(csvfile, sep = ";")
    return(df)

In [55]:
basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")
df_stations.head()

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
3,1995-09-01,2011-04-01,202,50.7827,6.0941,Aachen,Nordrhein-Westfalen
20,2004-08-14,2022-03-26,432,48.9219,9.9129,Abtsgm�nd-Untergr�ningen,Baden-W�rttemberg
29,2006-01-10,2022-03-26,260,49.7175,10.9101,Adelsdorf (Kl�ranlage),Bayern
44,2007-04-01,2022-03-26,44,52.9336,8.237,Gro�enkneten,Niedersachsen
46,2006-01-03,2022-03-26,325,48.945,12.4639,Aholfing,Bayern


### Select Stations Located in NRW from Station Description Dataframe

In [56]:
station_ids_selected = df_stations[df_stations['state'].str.contains("Nordrhein")].index
station_ids_selected

Int64Index([    3,   216,   326,   389,   390,   554,   555,   599,   603,
              613,
            ...
            14179, 14180, 14181, 14182, 14183, 14184, 14185, 14186, 14187,
            15000],
           dtype='int64', name='station_id', length=158)

In [57]:
# Create variable with TRUE if state is Nordrhein-Westfalen
isNRW = df_stations['state'].str.contains("Nordrhein")

isOperational = (df_stations['date_to'] >'2018-06-30 00:00:00')
isNotOperational = (df_stations['date_to'] <'2018-06-30 00:00:00')

dfNRW = df_stations[isNRW & isOperational]
dfNRW_nonOperational = df_stations[isNRW & isNotOperational]

roi = [216,2947,5468,7330,6264,4488,2483,3215,1300]
#dfNRW = dfNRW.loc[roi]
dfNRW
#dfNRW_nonOperational

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
216,2004-10-01,2022-03-26,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
389,2009-11-01,2022-03-26,436,51.0148,8.4318,"Berleburg, Bad-Arfeld",Nordrhein-Westfalen
390,2004-07-01,2022-03-26,610,50.9837,8.3683,"Berleburg, Bad-St�nzel",Nordrhein-Westfalen
554,1995-09-01,2022-03-26,23,51.8293,6.5365,Bocholt-Liedern (Wasserwerk),Nordrhein-Westfalen
555,2008-01-01,2018-11-01,101,51.4789,7.2697,Bochum,Nordrhein-Westfalen
...,...,...,...,...,...,...,...
14184,2016-06-01,2022-03-26,126,51.2242,7.1070,Wuppertal-Buchenhofen/Wupper,Nordrhein-Westfalen
14185,2017-08-01,2022-03-26,304,51.6050,8.8175,Lichtenau-Ebbinghausen (HRB),Nordrhein-Westfalen
14186,2017-08-01,2022-03-26,291,51.5319,8.7289,Gollentaler Grund (HRB),Nordrhein-Westfalen
14187,2017-08-01,2021-01-07,226,51.5831,8.8478,Husen-Dalheim (HRB),Nordrhein-Westfalen


In [58]:
print(df_zips)

                                                        name   ext    size  \
station_id                                                                   
3           stundenwerte_RR_00003_19950901_20110401_hist.zip  .zip  419296   
20          stundenwerte_RR_00020_20040814_20201231_hist.zip  .zip  432124   
44          stundenwerte_RR_00044_20070401_20201231_hist.zip  .zip  354983   
53          stundenwerte_RR_00053_20051001_20201231_hist.zip  .zip  385830   
71          stundenwerte_RR_00071_20041022_20200101_hist.zip  .zip  402875   
...                                                      ...   ...     ...   
15555       stundenwerte_RR_15555_20160501_20201231_hist.zip  .zip  120197   
15810       stundenwerte_RR_15810_20180601_20201231_hist.zip  .zip   69532   
19140       stundenwerte_RR_19140_20201101_20201231_hist.zip  .zip    9708   
19171       stundenwerte_RR_19171_20200901_20201231_hist.zip  .zip   12514   
19172       stundenwerte_RR_19172_20200901_20201231_hist.zip  .z

### Download TS Data from FTP Server

Problem: Not all stations listed in the station description file are associated with a time series (zip file)! The stations in the description file and the set of stations whoch are TS data provided for (zip files) do not match perfectly.  

In [59]:
list(dfNRW.index)

[216,
 389,
 390,
 554,
 555,
 603,
 613,
 617,
 644,
 796,
 871,
 902,
 934,
 989,
 1024,
 1046,
 1078,
 1241,
 1246,
 1300,
 1303,
 1327,
 1590,
 1595,
 1766,
 2027,
 2110,
 2254,
 2473,
 2483,
 2497,
 2629,
 2667,
 2810,
 2947,
 2968,
 2999,
 3028,
 3031,
 3081,
 3098,
 3215,
 3321,
 3328,
 3339,
 3499,
 3540,
 3591,
 3795,
 3913,
 4063,
 4127,
 4150,
 4154,
 4313,
 4368,
 4371,
 4400,
 4488,
 4692,
 4741,
 4849,
 5064,
 5347,
 5360,
 5468,
 5480,
 5513,
 5619,
 5699,
 5717,
 5733,
 6041,
 6042,
 6043,
 6044,
 6045,
 6046,
 6047,
 6048,
 6049,
 6050,
 6051,
 6052,
 6053,
 6054,
 6055,
 6057,
 6058,
 6059,
 6060,
 6061,
 6064,
 6067,
 6197,
 6264,
 6276,
 6313,
 6337,
 7106,
 7330,
 7344,
 7374,
 7378,
 13669,
 13670,
 13671,
 13696,
 13700,
 13713,
 14096,
 14142,
 14143,
 14144,
 14145,
 14146,
 14147,
 14148,
 14149,
 14150,
 14151,
 14152,
 14153,
 14154,
 14155,
 14156,
 14158,
 14159,
 14164,
 14165,
 14166,
 14167,
 14168,
 14169,
 14170,
 14171,
 14172,
 14173,
 14174,
 14175

In [60]:
# Add the names of the zip files only to a list. 
local_zip_list = []

station_ids_selected = list(dfNRW.index)
for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        print(fname) 
        grabFile(ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)


stundenwerte_RR_00216_20041001_20201231_hist.zip
stundenwerte_RR_00389_20091101_20201231_hist.zip
stundenwerte_RR_00390_20040701_20201231_hist.zip
stundenwerte_RR_00554_19950901_20201231_hist.zip
stundenwerte_RR_00555_20080101_20181101_hist.zip
stundenwerte_RR_00603_19990303_20201231_hist.zip
stundenwerte_RR_00613_20041101_20201231_hist.zip
stundenwerte_RR_00617_20040601_20201231_hist.zip
stundenwerte_RR_00644_20050101_20201231_hist.zip
stundenwerte_RR_00796_20041101_20201231_hist.zip
stundenwerte_RR_00871_20050801_20201231_hist.zip
stundenwerte_RR_00902_20061001_20201231_hist.zip
stundenwerte_RR_00934_20041001_20201231_hist.zip
stundenwerte_RR_00989_20050201_20201231_hist.zip
stundenwerte_RR_01024_20060801_20201231_hist.zip
stundenwerte_RR_01046_20041001_20201231_hist.zip
stundenwerte_RR_01078_19950901_20201231_hist.zip
stundenwerte_RR_01241_20061201_20201231_hist.zip
stundenwerte_RR_01246_20150801_20201231_hist.zip
stundenwerte_RR_01300_20040601_20201231_hist.zip
stundenwerte_RR_0130

### Join (Merge) the Time Series Columns

https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd


In [61]:
from datetime import datetime
import numpy as np

startdate = pd.to_datetime("2018-04-16").date()
enddate = pd.to_datetime("2018-08-16").date()
fields = ['STATIONS_ID', 'MESS_DATUM','  R1']

def prec_ts_to_df(fname):
    
    dateparse = lambda dates: [datetime.strptime(str(d), '%Y%m%d%H') for d in dates]

    #df = pd.read_csv(fname, delimiter=";", encoding="utf8", index_col="STATIONS_ID", parse_dates = ["MESS_DATUM"], date_parser = dateparse, na_values = [-999.0, -999])
    df = pd.read_csv(fname, delimiter=";", encoding="utf8", index_col="STATIONS_ID", parse_dates = ["MESS_DATUM"], date_parser = dateparse, na_values = [-999.0, -999], usecols=fields)
    mask = (df['MESS_DATUM'] >= '2018-04-16') & (df['MESS_DATUM'] <= '2018-08-16')
    df = df.loc[mask]    
    day = df.loc[:,['MESS_DATUM']]
    df.insert(2, "Day",pd.to_datetime(day["MESS_DATUM"], format='%Y/%m/%d').dt.date, True)
    #df = pd.read_csv(fname, delimiter=";", encoding="iso8859_2",\
    #             index_col="MESS_DATUM", parse_dates = ["MESS_DATUM"], date_parser = dateparse)
    
    # https://medium.com/@chaimgluck1/working-with-pandas-fixing-messy-column-names-42a54a6659cd

    # Column headers: remove leading blanks (strip), replace " " with "_", and convert to lower case.
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')
    df.index.name = df.index.name.strip().lower().replace(' ', '_').replace('(', '').replace(')', '')
    return(df)

In [62]:
from zipfile import ZipFile

In [63]:
# PRECIPITATION
def prec_ts_merge():
    # Very compact code.
    df = pd.DataFrame()
    for elt in local_zip_list:
        ffname = local_ftp_ts_dir + elt
        print("Zip archive: " + ffname)
        with ZipFile(ffname) as myzip:
            # read the time series data from the file starting with "produkt"
            prodfilename = [elt for elt in myzip.namelist() if elt.split("_")[0]=="produkt"][0] 
            print("Extract product file: %s" % prodfilename)
            print()
            with myzip.open(prodfilename) as myfile:
                dftmp = prec_ts_to_df(myfile)
                #print(dftmp)
                # outer merge.
                #df = pd.merge(df, s, left_index=True, right_index=True, how='outer')
                #df = pd.merge(df, dftmp, left_index=True, right_index=True, how='outer')
                df = df.append(dftmp)
    #df.index.names = ["year"]
    df.index.rename(name = "station_id", inplace = True)
    df.rename(columns={'r1':'prec_rate'}, inplace=True)
    return(df)

In [64]:
df_merged_ts = prec_ts_merge()

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00216_20041001_20201231_hist.zip
Extract product file: produkt_rr_stunde_20041001_20201231_00216.txt



  df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')


Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00389_20091101_20201231_hist.zip
Extract product file: produkt_rr_stunde_20091101_20201231_00389.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00390_20040701_20201231_hist.zip
Extract product file: produkt_rr_stunde_20040701_20201231_00390.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00554_19950901_20201231_hist.zip
Extract product file: produkt_rr_stunde_19950901_20201231_00554.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00555_20080101_20181101_hist.zip
Extract product file: produkt_rr_stunde_20080101_20181101_00555.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00603_19990303_20201231_hist.zip
Extract product file: produkt_rr_stunde_19990303_20201231_00603.txt

Zip archive: data/original/DWD//hourly/precipitation/historical/stundenwerte_RR_00613_2004

In [68]:
df_merged_prep = pd.merge(df_merged_ts,dfNRW, left_index=True, right_index=True, how='outer')
df_merged_prep

Unnamed: 0_level_0,mess_datum,prec_rate,day,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
216,2018-04-16 00:00:00,0.0,2018-04-16,2004-10-01,2022-03-26,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
216,2018-04-16 01:00:00,0.0,2018-04-16,2004-10-01,2022-03-26,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
216,2018-04-16 02:00:00,0.0,2018-04-16,2004-10-01,2022-03-26,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
216,2018-04-16 03:00:00,0.0,2018-04-16,2004-10-01,2022-03-26,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
216,2018-04-16 04:00:00,0.0,2018-04-16,2004-10-01,2022-03-26,298,51.1143,7.8807,Attendorn-Neulisternohl,Nordrhein-Westfalen
...,...,...,...,...,...,...,...,...,...,...
15000,2018-08-15 20:00:00,0.0,2018-08-15,2011-04-01,2022-03-26,231,50.7983,6.0244,Aachen-Orsbach,Nordrhein-Westfalen
15000,2018-08-15 21:00:00,0.0,2018-08-15,2011-04-01,2022-03-26,231,50.7983,6.0244,Aachen-Orsbach,Nordrhein-Westfalen
15000,2018-08-15 22:00:00,0.0,2018-08-15,2011-04-01,2022-03-26,231,50.7983,6.0244,Aachen-Orsbach,Nordrhein-Westfalen
15000,2018-08-15 23:00:00,0.0,2018-08-15,2011-04-01,2022-03-26,231,50.7983,6.0244,Aachen-Orsbach,Nordrhein-Westfalen


In [99]:
df_merged_prep = df_merged_prep.reset_index()
filepathname = local_ts_merged_dir + "prec_ts_merged_Ex5_data.csv"
print("df_merged_ts is saved to: %s" % (filepathname))
df_merged_prep[df_merged_prep['station_id'].isin(roi)].to_csv(filepathname3,sep=";")

df_merged_ts is saved to: data/generated/DWD//hourly/precipitation/historical/prec_ts_merged_Ex5_data.csv
