In [1]:
#FTP connection parameters
server = "opendata.dwd.de"
user   = "anonymous"
passwd = "" 

In [2]:
#FTP Directory Definition and Station Description Filename Pattern
topic_dir = "/hourly/precipitation/recent/"
climate_data_dir = "/climate_environment/CDC/observations_germany/climate/"
ftp_dir =  climate_data_dir + topic_dir
print(ftp_dir)

/climate_environment/CDC/observations_germany/climate//hourly/precipitation/recent/


In [3]:
#Local Directories
local_ts_dir = "data/DWD/" + topic_dir
local_station_dir = local_ts_dir
print(local_station_dir)

data/DWD//hourly/precipitation/recent/


In [4]:
import os
os.makedirs(local_ts_dir,exist_ok = True)
os.makedirs(local_station_dir,exist_ok = True)

In [5]:
#FTP Connect
import ftplib
ftp = ftplib.FTP(server)
res = ftp.login(user=user, passwd = passwd)
print(res)

230 Login successful.


In [6]:
ret = ftp.cwd(".")
print(ret)

250 Directory successfully changed.


In [7]:
#FTP download file function
def grabFile(ftpfullname,localfullname):
    try:
        ret = ftp.cwd(".") # A dummy action to chack the connection and to provoke an exception if necessary.
        localfile = open(localfullname, 'wb')
        ftp.retrbinary('RETR ' + ftpfullname, localfile.write, 1024)
        localfile.close()
    
    except ftplib.error_perm:
        print("FTP ERROR. Operation not permitted. File not found?")

    except ftplib.error_temp:
        print("FTP ERROR. Timeout.")

    except ConnectionAbortedError:
        print("FTP ERROR. Connection aborted.")

In [8]:
#Generate Pandas Dataframe from FTP Directory Listing
import pandas as pd
import os

def gen_df_from_ftp_dir_listing(ftp, ftpdir):
    lines = []
    flist = []
    try:    
        res = ftp.retrlines("LIST "+ftpdir, lines.append)
    except:
        print("Error: ftp.retrlines() failed. ftp timeout? Reconnect!")
        return
        
    if len(lines) == 0:
        print("Error: ftp dir is empty")
        return
    
    for line in lines:
#        print(line)
        [ftype, fsize, fname] = [line[0:1], int(line[31:42]), line[56:]]
#        itemlist = [line[0:1], int(line[31:42]), line[56:]]
#        flist.append(itemlist)
        
        fext = os.path.splitext(fname)[-1]
        
        if fext == ".zip":
            station_id = int(fname.split("_")[2])
        else:
            station_id = -1 
        
        flist.append([station_id, fname, fext, fsize, ftype])
        
        

    df_ftpdir = pd.DataFrame(flist,columns=["station_id", "name", "ext", "size", "type"])
    return(df_ftpdir)

In [9]:
df_ftpdir = gen_df_from_ftp_dir_listing(ftp, ftp_dir)
df_ftpdir

Unnamed: 0,station_id,name,ext,size,type
0,-1,BESCHREIBUNG_obsgermany_climate_hourly_precipi...,.pdf,68888,-
1,-1,DESCRIPTION_obsgermany_climate_hourly_precipit...,.pdf,68313,-
2,-1,RR_Stundenwerte_Beschreibung_Stationen.txt,.txt,209079,-
3,20,stundenwerte_RR_00020_akt.zip,.zip,43624,-
4,44,stundenwerte_RR_00044_akt.zip,.zip,43874,-
...,...,...,...,...,...
970,15478,stundenwerte_RR_15478_akt.zip,.zip,41253,-
971,15490,stundenwerte_RR_15490_akt.zip,.zip,42086,-
972,15512,stundenwerte_RR_15512_akt.zip,.zip,41319,-
973,15514,stundenwerte_RR_15514_akt.zip,.zip,42695,-


In [10]:
#Dataframes with .zip extension
df_zips = df_ftpdir[df_ftpdir["ext"]==".zip"]
df_zips

Unnamed: 0,station_id,name,ext,size,type
3,20,stundenwerte_RR_00020_akt.zip,.zip,43624,-
4,44,stundenwerte_RR_00044_akt.zip,.zip,43874,-
5,53,stundenwerte_RR_00053_akt.zip,.zip,42246,-
6,71,stundenwerte_RR_00071_akt.zip,.zip,21157,-
7,73,stundenwerte_RR_00073_akt.zip,.zip,43267,-
...,...,...,...,...,...
970,15478,stundenwerte_RR_15478_akt.zip,.zip,41253,-
971,15490,stundenwerte_RR_15490_akt.zip,.zip,42086,-
972,15512,stundenwerte_RR_15512_akt.zip,.zip,41319,-
973,15514,stundenwerte_RR_15514_akt.zip,.zip,42695,-


In [11]:
#fileName = '/Data/Extracted/NRW_Station_Active_data.csv'
#Need to traverse back to the top level folder if the file is not in the same location as .py file
#Read .csv file extracted from QGIS which contains active stations in NRW
df_active = pd.read_csv('../../../Data/Extracted/NRW_Station_Active_data.csv')
print(df_active)

    station_id   date_from     date_to  altitude  latitude  longitude  \
0          216  2004-10-01  2020-11-04       298   51.1143     7.8807   
1          389  2009-11-01  2020-11-04       436   51.0148     8.4318   
2          390  2004-07-01  2020-11-04       610   50.9837     8.3683   
3          554  1995-09-01  2020-11-04        23   51.8293     6.5365   
4          603  1999-03-03  2020-11-04       147   50.7293     7.2040   
..         ...         ...         ...       ...       ...        ...   
78       13671  2007-12-01  2020-11-04       221   50.9655     7.2753   
79       13696  2007-12-01  2020-11-04        60   51.5966     7.4048   
80       13700  2008-05-01  2020-11-04       205   51.3329     7.3411   
81       13713  2007-11-01  2020-11-04       386   51.0899     7.6289   
82       15000  2011-04-01  2020-11-04       231   50.7983     6.0244   

                            name                state  
0        Attendorn-Neulisternohl  Nordrhein-Westfalen  
1          

In [12]:
#extract only active station id from the df
sid = df_active['station_id']
type(sid)
print(sid)

0       216
1       389
2       390
3       554
4       603
      ...  
78    13671
79    13696
80    13700
81    13713
82    15000
Name: station_id, Length: 83, dtype: int64


In [13]:
#local variable to create a filename structure
fileNameTemp1 = "stundenwerte_RR_"
fileNameTemp2 = "_akt.zip"

type(df_zips['name'])
print(df_zips['name'])

3      stundenwerte_RR_00020_akt.zip
4      stundenwerte_RR_00044_akt.zip
5      stundenwerte_RR_00053_akt.zip
6      stundenwerte_RR_00071_akt.zip
7      stundenwerte_RR_00073_akt.zip
                   ...              
970    stundenwerte_RR_15478_akt.zip
971    stundenwerte_RR_15490_akt.zip
972    stundenwerte_RR_15512_akt.zip
973    stundenwerte_RR_15514_akt.zip
974    stundenwerte_RR_15555_akt.zip
Name: name, Length: 972, dtype: object


In [44]:
#Download all the .zip files from FTP server for all the active station id
for st_id in sid:
    #print(st_id)
    if not df_zips['name'].empty:       
       #print(len(str(st_id)))
       if len(str(st_id)) == 3:
            temp_st_id = "00"+str(st_id)
            #print(temp_st_id)
       elif len(str(st_id)) == 4:
                temp_st_id = "0"+str(st_id)
                #print(temp_st_id)
       elif len(str(st_id)) == 5:
                temp_st_id = str(st_id)
    #print(fileNameTemp1+temp_st_id+fileNameTemp2)
    station_fname = fileNameTemp1+temp_st_id+fileNameTemp2
    #print(type(station_fname))
    #print("grab file: " + station_fname + "\nfrom ftp dir: " + ftp_dir)
    grabFile(ftp_dir + station_fname, local_station_dir + station_fname)

FTP ERROR. Timeout.


EOFError: 

In [15]:
#Iterate through the directory and extract the 'produkt_rr_stunde_*'file from all the .zip file
import glob
from zipfile import ZipFile
# Recursively print
for filepath in glob.iglob(r'data/DWD/hourly/precipitation/recent/*.zip', recursive=True):
    #print(filepath)
    with ZipFile(filepath, 'r') as zipObj:
        listOfFileNames = zipObj.namelist()
        for fileName in listOfFileNames:
            if fileName.startswith('produkt_rr_stunde_'):
                zipObj.extract(fileName, 'temp_percipitation')          
        

In [16]:
os.getcwd()
my_dir = "temp_percipitation/"
os.chdir(my_dir)

In [17]:
df = pd.DataFrame({})
print(df)

Empty DataFrame
Columns: []
Index: []


In [37]:
#dictionary based station data extration, correct way
station_dict = {}
for k,filepath in enumerate(glob.iglob(r'*.txt', recursive=True)):
    file = filepath
    sample_df = pd.read_csv(file,sep=";")
    #print(type(sample_df))
    dfkeys = sample_df.keys();
    key_R1 = dfkeys[3]
    key_station = dfkeys[0]
    station_id = sample_df[key_station][0]
    station_dict[str(station_id)] = sample_df[key_R1];
df_station_dict = pd.DataFrame(station_dict)
print(df_station_dict)

       216  389  390  554  603  613  617  644  796  871  ...  7344  7374  \
0      0.0  0.0  0.0  0.0  0.8  0.0  0.0  0.0  0.4  0.0  ...   0.0   0.0   
1      0.0  0.0  0.0  0.0  1.5  0.0  0.0  0.0  0.0  0.0  ...   0.0   0.0   
2      0.0  0.0  0.0  0.0  2.1  0.0  0.0  0.0  0.0  0.0  ...   0.0   0.0   
3      0.0  0.0  0.0  0.0  0.6  0.0  0.0  0.0  0.0  0.0  ...   0.0   0.0   
4      0.0  0.0  0.0  0.0  0.1  0.0  0.0  0.0  0.0  0.0  ...   0.0   0.0   
...    ...  ...  ...  ...  ...  ...  ...  ...  ...  ...  ...   ...   ...   
13195  NaN  0.6  NaN  0.0  NaN  NaN  NaN  0.0  NaN  NaN  ...   0.0   0.0   
13196  NaN  0.2  NaN  0.0  NaN  NaN  NaN  0.0  NaN  NaN  ...   0.0   0.0   
13197  NaN  0.0  NaN  0.0  NaN  NaN  NaN  0.0  NaN  NaN  ...   0.0   0.0   
13198  NaN  0.0  NaN  0.0  NaN  NaN  NaN  0.0  NaN  NaN  ...   0.0   0.0   
13199  NaN  0.0  NaN  0.0  NaN  NaN  NaN  0.0  NaN  NaN  ...   0.0   0.0   

       7378  13669  13670  13671  13696  13700  13713  15000  
0       0.2    0.0    0.

In [18]:
df = pd.concat([df_zips["name"]]); os.getcwd()

'E:\\Programs\\Megha\\Geoinformatics\\JupyterLab\\Practice\\Pandas\\temp_percipitation'

In [None]:
#test validation - For confirmation of column data length
#No need to run
for key in ts_df.keys():
    #print(len(ts_df),len(df_station_dict))
    n,m = len(ts_df[key]), len(df_station_dict[key])
    val = np.allclose(ts_df[key],df_station_dict[key][:n]);
    print(n,m,val)
    if not val:
        mask = ts_df[key]==df_station_dict[key][:n]
        diff = ts_df[key] - df_station_dict[key][:n]
        print(diff)

In [None]:
#validation code - for confirmation of column data length
#No need to run
for key in ts_df.keys():
    #print(len(ts_df),len(df_station_dict))
    n,m = len(ts_df[key]), len(df_station_dict[key])
    mask1 = ~(np.isnan(ts_df[key])); mask2 = ~(np.isnan(df_station_dict[key][:n]));
    val = np.allclose(ts_df[key][mask1],df_station_dict[key][:n][mask2]);
    print(n,m,val)

In [41]:
ts_df['216'][::4]

0        0.0
4        0.0
8        0.0
12       0.1
16       0.0
        ... 
13104    0.0
13108    0.1
13112    2.3
13116    0.0
13120    0.0
Name: 216, Length: 3281, dtype: float64

In [42]:
df_station_dict['216'][::4]

0        0.0
4        0.0
8        0.0
12       0.1
16       0.0
        ... 
13180    NaN
13184    NaN
13188    NaN
13192    NaN
13196    NaN
Name: 216, Length: 3300, dtype: float64

In [43]:
np.allclose(ts_df['216'],df_station_dict['216'][:13122])

True