In [1]:
import geopandas
import pandas as pd
import os

In [2]:
# import geopandas got an error, and resolved using opening the jupyter notebook with anaconda navigator instead
# of powershell prompt

In [3]:
# The topic of interest
topic_dir = "/annual/kl/historical/"
print("Subdirectory on FTP Server:", topic_dir)

Subdirectory on FTP Server: /annual/kl/historical/


In [4]:
#local_ftp_dir         = "../data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_dir         = "data/original/DWD/"      # Local directory to store local ftp data copies, the local data source or input data. 
local_ftp_station_dir = local_ftp_dir + topic_dir # Local directory where local station info is located
local_ftp_ts_dir      = local_ftp_dir + topic_dir # Local directory where time series downloaded from ftp are located

#local_generated_dir   = "../data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_generated_dir   = "data/generated/DWD/" # The generated of derived data in contrast to local_ftp_dir
local_station_dir     = local_generated_dir + topic_dir # Derived station data, i.e. the CSV file
local_ts_merged_dir   = local_generated_dir + topic_dir # Parallel merged time series, wide data frame with one TS per column
local_ts_appended_dir = local_generated_dir + topic_dir # Serially appended time series, long data frame for QGIS TimeManager Plugin

In [5]:
#import os
os.makedirs(local_ftp_dir,exist_ok = True) # it does not complain if the dir already exists.
os.makedirs(local_ftp_station_dir,exist_ok = True)
os.makedirs(local_ftp_ts_dir,exist_ok = True)

os.makedirs(local_generated_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)
os.makedirs(local_ts_merged_dir,exist_ok = True)
os.makedirs(local_ts_appended_dir,exist_ok = True)

In [6]:
# check if directories are rightly generated
print(local_ftp_dir)
print(local_ftp_station_dir)
print(local_ftp_ts_dir)
print()
print(local_generated_dir)
print(local_station_dir)
print(local_ts_merged_dir)
print(local_ts_appended_dir)

data/original/DWD/
data/original/DWD//annual/kl/historical/
data/original/DWD//annual/kl/historical/

data/generated/DWD/
data/generated/DWD//annual/kl/historical/
data/generated/DWD//annual/kl/historical/
data/generated/DWD//annual/kl/historical/


### FTP Connection

## Connection parameters

In [7]:
server = "opendata.dwd.de"
user   = "anonymous"
passwd = ""

In [8]:
# This is the search pattern common to ALL station description file names 
station_desc_pattern = "_Beschreibung_Stationen.txt"

# Below this directory tree node all climate data are stored.
ftp_climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"

# The absolute ftp directory with the data (topic) of concern
ftp_dir =  ftp_climate_data_dir + topic_dir
print("Absolute FTP directory path with data of concern:", ftp_dir)

Absolute FTP directory path with data of concern: /climate_environment/CDC/observations_germany/climate//annual/kl/historical/


### FTP Connect

In [9]:
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [10]:
ret = ftp.cwd(".")

In [11]:
#ftp.quit()

### FTP Grab File Function

In [12]:
def grabFile(ftpfullname,localfullname):
    try:
        ret = ftp.cwd(".") # A dummy action to check the connection and to provoke an exception if necessary.
        localfile = open(localfullname, 'wb')
        ftp.retrbinary('RETR ' + ftpfullname, localfile.write, 1024)
        localfile.close()
    
    except ftplib.error_perm:
        print("FTP ERROR. Operation not permitted. File not found?")

    except ftplib.error_temp:
        print("FTP ERROR. Timeout.")

    except ConnectionAbortedError:
        print("FTP ERROR. Connection aborted.")

## Generate Pandas Dataframe from FTP Directory Listing

In [13]:
def gen_df_from_ftp_dir_listing(ftp, ftpdir):
    lines = []
    flist = []
    try:    
        res = ftp.retrlines("LIST "+ftpdir, lines.append)
    except:
        print("Error: ftp.retrlines() failed. ftp timeout? Reconnect!")
        return
        
    if len(lines) == 0:
        print("Error: ftp dir is empty")
        return
    
    for line in lines:
#        print(line)
        [ftype, fsize, fname] = [line[0:1], int(line[31:42]), line[56:]]
#        itemlist = [line[0:1], int(line[31:42]), line[56:]]
#        flist.append(itemlist)
        
        fext = os.path.splitext(fname)[-1]
        
        if fext == ".zip":
            station_id = int(fname.split("_")[2])
        else:
            station_id = -1 
        
        flist.append([station_id, fname, fext, fsize, ftype])
        
        

    df_ftpdir = pd.DataFrame(flist,columns=["station_id", "name", "ext", "size", "type"])
    return(df_ftpdir)

In [14]:
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)

In [15]:
df_ftpdir.head(10)

Unnamed: 0,station_id,name,ext,size,type
0,-1,KL_Jahreswerte_Beschreibung_Stationen.txt,.txt,240187,-
1,1,jahreswerte_KL_00001_19310101_19860630_hist.zip,.zip,12989,-
2,3,jahreswerte_KL_00003_18510101_20110331_hist.zip,.zip,20125,-
3,44,jahreswerte_KL_00044_19710301_20211231_hist.zip,.zip,16176,-
4,52,jahreswerte_KL_00052_19730101_20011231_hist.zip,.zip,13759,-
5,61,jahreswerte_KL_00061_19750701_19780831_hist.zip,.zip,9130,-
6,70,jahreswerte_KL_00070_19730601_19860930_hist.zip,.zip,9680,-
7,71,jahreswerte_KL_00071_19861101_20191231_hist.zip,.zip,14916,-
8,72,jahreswerte_KL_00072_19781001_19950531_hist.zip,.zip,12892,-
9,73,jahreswerte_KL_00073_19530101_20211231_hist.zip,.zip,17379,-


Dataframe with TS Zip Files

In [16]:
#df_ftpdir["ext"]==".zip"
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips.set_index("station_id", inplace = True)
df_zips.head(10)

Unnamed: 0_level_0,name,ext,size,type
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,jahreswerte_KL_00001_19310101_19860630_hist.zip,.zip,12989,-
3,jahreswerte_KL_00003_18510101_20110331_hist.zip,.zip,20125,-
44,jahreswerte_KL_00044_19710301_20211231_hist.zip,.zip,16176,-
52,jahreswerte_KL_00052_19730101_20011231_hist.zip,.zip,13759,-
61,jahreswerte_KL_00061_19750701_19780831_hist.zip,.zip,9130,-
70,jahreswerte_KL_00070_19730601_19860930_hist.zip,.zip,9680,-
71,jahreswerte_KL_00071_19861101_20191231_hist.zip,.zip,14916,-
72,jahreswerte_KL_00072_19781001_19950531_hist.zip,.zip,12892,-
73,jahreswerte_KL_00073_19530101_20211231_hist.zip,.zip,17379,-
78,jahreswerte_KL_00078_19610101_20211231_hist.zip,.zip,13962,-


Download the Station Description File

In [17]:
station_fname = df_ftpdir[df_ftpdir['name'].str.contains(station_desc_pattern)]["name"].values[0]
print(station_fname)

# ALternative
#station_fname2 = df_ftpdir[df_ftpdir["name"].str.match("^.*Beschreibung_Stationen.*txt$")]["name"].values[0]
#print(station_fname2)

KL_Jahreswerte_Beschreibung_Stationen.txt


In [18]:
print("grabFile: ")
print("From: " + ftp_dir + station_fname)
print("To:   " + local_ftp_station_dir + station_fname)
grabFile(ftp_dir + station_fname, local_ftp_station_dir + station_fname)

grabFile: 
From: /climate_environment/CDC/observations_germany/climate//annual/kl/historical/KL_Jahreswerte_Beschreibung_Stationen.txt
To:   data/original/DWD//annual/kl/historical/KL_Jahreswerte_Beschreibung_Stationen.txt


In [19]:
# extract column names. They are in German (de)
# We have to use codecs because of difficulties with character encoding (German Umlaute)
import codecs

def station_desc_txt_to_csv(txtfile, csvfile):
    file = codecs.open(txtfile, "r", encoding="latin1")
    r = file.readline()
    file.close()
    colnames_de = r.split()
    colnames_de
    
    translate = \
    {'Stations_id':'station_id',
     'von_datum':'date_from',
     'bis_datum':'date_to',
     'Stationshoehe':'altitude',
     'geoBreite': 'latitude',
     'geoLaenge': 'longitude',
     'Stationsname':'name',
     'Bundesland':'state'}
    
    colnames_en = [translate[h] for h in colnames_de]
    
    # Skip the first two rows and set the column names.
    df = pd.read_fwf(\
        txtfile, skiprows=2, names=colnames_en, \
        parse_dates=["date_from","date_to"], index_col = 0, \
        encoding="latin1")
    
    # write csv
    df.to_csv(csvfile, sep = ";")
    return(df)

In [20]:
basename = os.path.splitext(station_fname)[0]
df_stations = station_desc_txt_to_csv(local_ftp_station_dir + station_fname, local_station_dir + basename + ".csv")
df_stations.head()

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,1931-01-01,1986-06-30,478,47.8413,8.8493,Aach,Baden-Württemberg
3,1851-01-01,2011-03-31,202,50.7827,6.0941,Aachen,Nordrhein-Westfalen
44,1971-03-01,2022-12-31,44,52.9336,8.237,Großenkneten,Niedersachsen
52,1973-01-01,2001-12-31,46,53.6623,10.199,Ahrensburg-Wulfsdorf,Schleswig-Holstein
61,1975-07-01,1978-08-31,339,48.8443,12.6171,Aiterhofen,Bayern


### Select Stations Located in NRW from Station Description Dataframe

In [32]:
# Create variable with TRUE if state is Nordrhein-Westfalen
isNRW = (df_stations['state'] == "Nordrhein-Westfalen").values

# Create variable with TRUE if date_to is latest date (indicates operation up to now)
isActive2022 = (df_stations['date_to'] >= "2022").values 

# Create variable with TRUE if date_from is earlier than 1950
isBefore1950 = (df_stations['date_from'] < "1950")

# select on both conditions
station_ids_selected = df_stations[isNRW & isActive2022 & isBefore1950].index

print(f"Stations located in NRW, still active in 2022, and started before 1950: \n{list(station_ids_selected)}")


Stations located in NRW, still active in 2022, and started before 1950: 
[555, 1078, 1300, 1303, 1327, 1590, 2110, 2483, 2497, 2629, 2968, 4371, 5717]


In [33]:
len(station_ids_selected)

13

In [24]:
df_stations.loc[station_ids_selected].head(13)

Unnamed: 0_level_0,date_from,date_to,altitude,latitude,longitude,name,state
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
555,1925-01-01,2023-01-31,110,51.5026,7.2289,Bochum,Nordrhein-Westfalen
1078,1940-01-01,2022-12-31,37,51.296,6.7686,Düsseldorf,Nordrhein-Westfalen
1300,1931-01-01,2022-12-31,351,51.254,8.1565,Eslohe,Nordrhein-Westfalen
1303,1888-01-01,2022-12-31,150,51.4041,6.9677,Essen-Bredeney,Nordrhein-Westfalen
1327,1937-01-01,2022-12-31,147,50.7119,6.7905,Weilerswist-Lommersum,Nordrhein-Westfalen
1590,1937-01-01,2022-12-31,37,51.4942,6.2463,Geldern-Walbeck,Nordrhein-Westfalen
2110,1938-01-01,2022-12-31,57,51.0411,6.1042,Heinsberg-Schleiden,Nordrhein-Westfalen
2483,1926-11-01,2022-12-31,839,51.1803,8.4891,Kahler Asten,Nordrhein-Westfalen
2497,1937-01-01,2022-12-31,505,50.5014,6.5264,Kall-Sistig,Nordrhein-Westfalen
2629,1851-01-01,2022-12-31,46,51.7612,6.0954,Kleve,Nordrhein-Westfalen


### Download TS Data from FTP Server

In [23]:
# Add the names of the zip files only to a list. 
local_zip_list = []

for station_id in station_ids_selected:
    try:
        fname = df_zips["name"][station_id]
        print(fname)
        grabFile(ftp_dir + fname, local_ftp_ts_dir + fname)
        local_zip_list.append(fname)
    except:
        print("WARNING: TS file for key %d not found in FTP directory." % station_id)

jahreswerte_KL_00555_19250101_20181031_hist.zip
jahreswerte_KL_01078_19400101_20211231_hist.zip
jahreswerte_KL_01300_19310101_20211231_hist.zip
jahreswerte_KL_01303_18880101_20211231_hist.zip
jahreswerte_KL_01327_19370101_20211231_hist.zip
jahreswerte_KL_01590_19370101_20211231_hist.zip
jahreswerte_KL_02110_19380101_20211231_hist.zip
jahreswerte_KL_02483_19261101_20211231_hist.zip
jahreswerte_KL_02497_19370101_20211231_hist.zip
jahreswerte_KL_02629_18510101_20211231_hist.zip
jahreswerte_KL_02968_19030101_20211231_hist.zip
jahreswerte_KL_04371_19310101_20211231_hist.zip
jahreswerte_KL_05717_19070101_20211231_hist.zip
