In [21]:
import requests
import json

import numpy as np
import pandas as pd

from datetime import datetime
from dateutil.relativedelta import relativedelta
import pytz

from multiprocessing.pool import ThreadPool
from tqdm import tqdm_notebook as tqdm
from time import time as timer
from IPython.display import clear_output

%matplotlib inline

De distributielaag van de waterwebservices van rijkswaterstaat is minimaal gedocumenteerd in het document <a href="https://www.rijkswaterstaat.nl/rws/opendata/DistributielaagWebservices-SUM-2v7.pdf">DistributielaagWebservices-SUM-2v7.pdf</a>. 

Er zijn vier verschillende services met verschillende type request binnen elke service. De services zijn:
- MetadataServices
- OnlinewaarnemingenServices
- BulkwaarnemingServices
- WebFeatureServices

Hieronder is toegepast hoe deze services gebruikt kunnen worden om zeespiegelwaterstanden mee op te vragen

In [3]:
OphalenCatalogus           = 'https://waterwebservices.rijkswaterstaat.nl/METADATASERVICES_DBO/OphalenCatalogus/'
OphalenWaarnemingen        = 'https://waterwebservices.rijkswaterstaat.nl/ONLINEWAARNEMINGENSERVICES_DBO/OphalenWaarnemingen'
OphalenLaatsteWaarnemingen = 'https://waterwebservices.rijkswaterstaat.nl/ONLINEWAARNEMINGENSERVICES_DBO/OphalenLaatsteWaarnemingen'

Begin met een metadata request via de `OpenhalenCatalogus`

In [4]:
# get station information from DDL (metadata uit Catalogus)
request = {
    "CatalogusFilter": {
        "Eenheden": True,
        "Grootheden": True,
        "Hoedanigheden": True
    }
}
resp = requests.post(OphalenCatalogus, json=request)
result = resp.json()
# print alle variabelen in de catalogus
#print(result)

dfLocaties = pd.DataFrame(result['LocatieLijst']).set_index('Code')
# load normalized JSON object (since it contains nested JSON)
dfMetadata = pd.io.json.json_normalize(result['AquoMetadataLijst']).set_index('AquoMetadata_MessageID')

In [5]:
# note that there are two stations for IJmuiden. 
# The station was moved from the sluices to outside of the harbor in 1981.
ids = ['DELFZL', 'DENHDR', 'HARLGN', 'HOEKVHLD', 'IJMDBTHVN', 'IJMDNDSS', 'VLISSGN']
dfLocaties.loc[ids]

Unnamed: 0_level_0,Coordinatenstelsel,Locatie_MessageID,Naam,X,Y
Code,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
DELFZL,25831,84020,Delfzijl,761899.77096,5915790.0
DENHDR,25831,84011,Den Helder,617198.393684,5869731.0
HARLGN,25831,84096,Harlingen,661021.585505,5894519.0
HOEKVHLD,25831,72345,Hoek van Holland,576917.669784,5759136.0
IJMDBTHVN,25831,84132,IJmuiden buitenhaven,605633.035699,5813598.0
IJMDNDSS,25831,84560,IJmuiden Noordersluis,609033.673551,5814221.0
VLISSGN,25831,84387,Vlissingen,541425.983215,5699182.0


Vervolg met het ophalen van waarnemingen request via de `OpenhalenWaarnemingen`

De waterwebservices werkt door middel van een POST query van een JSON object met daarin 3 elementen gedefineerd:
- Locatie
- AquoPlusWaarnemingMetadata
- Periode

Een voorbeeld ziet er als volgt uit:

In [6]:
request_handmatig = {
  'Locatie': {
    'X': 761899.770959577,
    'Y': 5915790.48491405,
    'Code': 'DELFZL'
  },
  'AquoPlusWaarnemingMetadata': {
    'AquoMetadata': {
      'Eenheid': {
        'Code': 'cm'
      },
      'Grootheid': {
        'Code': 'WATHTE'
      },
      'Hoedanigheid': {
        'Code': 'NAP'
      }
    }
  },
  'Periode': {
    'Einddatumtijd': '2012-01-27T09:30:00.000+01:00',
    'Begindatumtijd': '2012-01-27T09:00:00.000+01:00'
  }
}

Een dynamisch object kunnen we als volgt maken

In [7]:
def strftime(date):
    """
    hopelijk heeft iemand nog een beter idee hoe je makkelijk een isoformat 
    zie uitstaande vraag https://stackoverflow.com/q/45610753/2459096
    """
    (dt, micro,tz) = date.strftime('%Y-%m-%dT%H:%M:%S.%f%Z:00').replace('+','.').split('.')
    dt = "%s.%03d+%s" % (dt, int(micro) / 1000,tz)
    return dt

def POSTOphalenWaarnemingen(beginDatumTijd, dfLocatie, dfAquoMetadata):
    """
    maak een JSON object aan voor een POST request voor het ophalen van waarnemingen
    Parameters
    ---
    beginDatumTijd   :  datetime object inc tzinfo 
                        (eindDatumTijd staat nu hard in code op 1 maand na beginDatumTijd)
    dfLocatie        :  dataframe met enkel station locatie info
    dfAquoMetaData   :  dataframe object met enkel eenheid/grootheid/hoedanigheid info
    
    returns JSON object
    """
    # empty json object
    request_dynamisch = {}

    request_dynamisch['Locatie'] = {}
    rd_Locatie = request_dynamisch['Locatie']
    rd_Locatie['X'] = dfLocatie.X#float("{:.9f}".format(dfLocatie.loc[ids[0]].X))
    rd_Locatie['Y'] = dfLocatie.Y#float("{:.8f}".format(dfLocatie.loc[ids[0]].Y))
    rd_Locatie['Code'] = dfLocatie.name

    request_dynamisch['AquoPlusWaarnemingMetadata'] = {}
    rd_APWM = request_dynamisch['AquoPlusWaarnemingMetadata']
    rd_APWM['AquoMetadata'] = {}
    rd_AM = rd_APWM['AquoMetadata']
    rd_AM['Eenheid']      = {'Code':dfAquoMetadata['Eenheid.Code'].values[0]}
    rd_AM['Grootheid']    = {'Code':dfAquoMetadata['Grootheid.Code'].values[0]}
    rd_AM['Hoedanigheid'] = {'Code':dfAquoMetadata['Hoedanigheid.Code'].values[0]}

    request_dynamisch['Periode'] = {}
    rd_Periode = request_dynamisch['Periode']
    rd_Periode['Begindatumtijd'] = strftime(beginDatumTijd)
    #strftime(datetime(year=2012, month=1, day=27, hour=9, minute=0,tzinfo=pytz.timezone('Etc/GMT-1')))
    rd_Periode['Einddatumtijd']  = strftime(beginDatumTijd+relativedelta(months=1))
    
    return request_dynamisch

In [8]:
# maak handmatig een lange lijst van data objecten
# alleen begin-data sinds eind-data steeds 1 maand na begin-data is
startDates = []
for year in np.arange(1850,2018):
    for month in np.arange(1,13):
        startDates.append(datetime(year=year, month=month, day=1, hour=0, minute=0,tzinfo=pytz.timezone('Etc/GMT-1')))
startDates = pd.Series(startDates)
startDates.head()

0   1850-01-01 00:00:00+01:00
1   1850-02-01 00:00:00+01:00
2   1850-03-01 00:00:00+01:00
3   1850-04-01 00:00:00+01:00
4   1850-05-01 00:00:00+01:00
dtype: datetime64[ns, Etc/GMT-1]

In [9]:
# selDates = startDates[(startDates > '1879-01-01') & (startDates < '1879-06-01')]
# selDates

In [10]:
# selecteer een enkel station
for station in ids[0:1]:
    dfLocatie = dfLocaties.loc[station]
dfLocatie.head()

Coordinatenstelsel          25831
Locatie_MessageID           84020
Naam                     Delfzijl
X                          761900
Y                     5.91579e+06
Name: DELFZL, dtype: object

In [11]:
# selecteer een metadata object op basis van eenheid/grootheid/hoedanigheid
df_WATHTE_NAP = dfMetadata[(dfMetadata['Grootheid.Code']=='WATHTE')&(dfMetadata['Hoedanigheid.Code']=='NAP')]
df_WATHTE_NAP.T.head()

AquoMetadata_MessageID,71
Eenheid.Code,cm
Eenheid.Omschrijving,centimeter
Grootheid.Code,WATHTE
Grootheid.Omschrijving,Waterhoogte
Hoedanigheid.Code,NAP


In [12]:
# request_dynamisch = POSTOphalenWaarnemingen(beginDatumTijd=selDates[3],dfLocatie=dfLocatie,dfAquoMetadata=df_WATHTE_NAP)
# request_dynamisch

In [13]:
# try:
#     resp = requests.post(OphalenWaarnemingen, json=request_dynamisch)
#     df_out = pd.io.json.json_normalize(resp.json()['WaarnemingenLijst'][0]['MetingenLijst'])[['Meetwaarde.Waarde_Numeriek','Tijdstip']]
#     df_out['Tijdstip'] = pd.to_datetime(df_out['Tijdstip'])
#     df_out.set_index('Tijdstip', inplace=True)
#     df_out.columns=['zeespiegel_mmNAP']
#     df_out.loc[df_out['zeespiegel_mmNAP'] == 999999999.0] = np.nan
#     df_out.plot()
# except Exception as e:
#     print (e)

Open een HDFStore om de maandelijkse 10 minuten data weg te schrijven in een PyTables object.  

Maak een functie van de OphalenWaarnemingen in combinatie met schrijf actie

In [29]:
def fetch_OphalenWaarnemingen(startDate):
    try:
        # prepare the POST object
        request_dynamisch=POSTOphalenWaarnemingen(beginDatumTijd = startDate,
                                                  dfLocatie = dfLocatie,
                                                  dfAquoMetadata = df_WATHTE_NAP)
        # do the query
        resp = requests.post(OphalenWaarnemingen, json=request_dynamisch)
        
        # parse the result to DataFrame
        df_out = pd.io.json.json_normalize(resp.json()['WaarnemingenLijst'][0]['MetingenLijst'])[['Meetwaarde.Waarde_Numeriek','Tijdstip']]
        df_out['Tijdstip'] = pd.to_datetime(df_out['Tijdstip'])
        df_out.set_index('Tijdstip', inplace=True)
        df_out.columns=['zeespiegel_mmNAP']
        df_out.loc[df_out['zeespiegel_mmNAP'] == 999999999.0] = np.nan
        # add to HDFStore        
        hdf.append(key = dfLocatie.name+'/year'+str(startDate.year), 
                   value = df_out, format='table')
        
        return startDate, None
    except Exception as e:
        return startDate, e

In [None]:
# hdf = pd.HDFStore('stationData.h5') # depends on PyTables

# start = timer()
# results = ThreadPool(20).imap_unordered(fetch_OphalenWaarnemingen, startDates)

# for startDate, error in results:
#     if error is None:
#         print("%r fetched en verwerkt in %ss" % (startDate, timer() - start))
#     else:
#         print("error fetching %r: %s" % (startDate, error))
# print("Vestreken tijd: %s" % (timer() - start,))

Het gebruik van een ThreadPool om de data op te vragen lukt wel, maar het lijkt alsof ik niet in parallel kan schrijven naar een HDFStore. 

Zie https://github.com/pandas-dev/pandas/issues/4409#issuecomment-21906827: `'yes concurrent writing from thread/process or even multi processing can cause havoc, [..] really just avoid it'`

Dan maar met een dubbel lusje over stations en dates.

In [25]:
hdf = pd.HDFStore('stationData.h5') # depends on PyTables
start = timer()

# itereer over stations
for station in tqdm(ids):
    dfLocatie = dfLocaties.loc[station]    

    for startDate in tqdm(startDates):
        startDate, error = fetch_OphalenWaarnemingen(startDate)

        if error is None:
            print("%r fetched en verwerkt in %ss" % (startDate, timer() - start))
        else:
            print("error fetching %r: %s" % (startDate, error))
        clear_output(wait=True)
print("Vestreken tijd: %s" % (timer() - start,))    


Vestreken tijd: 7272.551747560501


In [27]:
hdf.close()

In [28]:
hdf.

<class 'pandas.io.pytables.HDFStore'>
File path: stationData.h5
File is CLOSED

In [30]:
hdf.is_open

False