In [1]:
import functools
import logging

import numpy as np
import requests
import pandas as pd
import geopandas
import osgeo.osr
import beaker.cache
import geojson
import dateutil.parser

import shapely.geometry

beaker.cache.cache_regions.update({
    'short_term':{
        'expire':60,
        'type':'memory'
    },
    'long_term':{
        'expire':1800,
        'type':'dbm',
        'data_dir':'/tmp',
    }
})
cache = beaker.cache.CacheManager()

wgs84 = osgeo.osr.SpatialReference()
wgs84.ImportFromEPSG(4326)


0

In [2]:
# mapping from Aquo to cf (based on broken csv file)
# TODO: fix source
AQUO2CF = {
    '%O2': 'fractional_saturation_of_oxygen_in_sea_water',
    'CHLFa': 'concentration_of_chlorophyll_in_sea_water',
    'FLUORCTE': 'sea_water_fluorescence',
    'GELDHD': 'sea_water_electrical_conductivity',
    'GELSHD': 'speed_of_sound_in_sea_water',
    'Hm0': 'sea_surface_wave_significant_height',
    'INSLG': 'downwelling_longwave_radiance_in_air',
    'NH4': 'mass_concentration_of_ammonium_in_sea_water',
    'NO2': 'mass_concentration_of_nitrite_in_sea_water',
    'NO3': 'mass_concentration_of_nitrate_in_sea_water',
    'O2': 'mass_concentration_of_oxygen_in_sea_water',
    'P': 'mass_concentration_of_particulate_bound_phosphorus_in_sea_water',
    'POC': 'mass_concentration_of_particulate_bound_organic_carbon_in_sea_water',
    'Q': 'water_volume_transport_into_sea_water_from_rivers',
    'SALNTT': 'sea_water_salinity',
    'SiO2': 'mass_concentration_of_silicate_in_sea_water',
    'T': 'sea_water_temperature',
    'TOC': 'mass_concentration_of_organic_carbon_in_sea_water',
    'TROEBHD': 'sea_water_turbidity',
    'Th0': 'sea_surface_wave_from_direction',
    'Tm02': 'sea_surface_wind_wave_mean_period_from_variance_spectral_density_second_frequency_moment',
    'WATHTE': 'sea_surface_height',
    'ZICHT': 'secchi_depth_of_sea_water',
    'ZS': 'concentration_of_suspended_matter_in_sea_water',
    'pH': 'sea_water_ph_reported_on_total_scale',
    's_NO3NO2': 'mass_concentration_of_nitrate_and_nitrite_in_sea_water',
    'Fp': 'sea_surface_wave_frequency_at_variance_spectral_density_maximum',
    'H1/3': 'sea_surface_wave_mean_height_of_highest_third',
    'LUCHTDK': 'surface_air_pressure',
    'STROOMRTG': 'sea_water_to_direction',
    'STROOMSHD': 'sea_water_velocity',
    'T1/3': 'sea_surface_wave_mean_period_of_longest_third',
    'TH1/3': 'sea_surface_wave_mean_period_of_highest_third',
    'WATHTBRKD': 'sea_surface_height',
    'WINDRTG': 'wind_to_direction',
    'WINDSHD': 'wind_speed'
}

FILTERS = {
    "WATERLEVEL": ['WATHTBRKD', 'WATHTE'],
    "WIND": ['WINDRTG', 'WINDSHD']
}


In [3]:
class CustomEncoder(json.JSONEncoder):
    """Support for data types that JSON default encoder
    does not do.

    This includes:

        * Numpy array or number
        * Pandas Timestamp
        * Pandas DataFrame
        * geojson types
        * Complex number
        * Set
        * Bytes (Python 3)

    Examples
    --------
    >>> import json
    >>> import numpy as np
    >>> from astropy.utils.misc import JsonCustomEncoder
    >>> json.dumps(np.arange(3), cls=JsonCustomEncoder)
    '[0, 1, 2]'

    """

    def default(self, obj):
        # try and import everything
        HAVE_NUMPY = False
        try:
            import numpy as np
            HAVE_NUMPY = True
        except ImportError:
            pass
        HAVE_PANDAS = False
        try:
            import pandas as pd
            HAVE_PANDAS = True
        except ImportError:
            pass

        if isinstance(obj, set):
            return list(obj)
        if isinstance(obj, bytes):  # pragma: py3
            return obj.decode()
        if HAVE_NUMPY:
            if isinstance(obj, (np.ndarray, np.number)):
                return obj.tolist()
            elif isinstance(obj, (complex, np.complex)):
                return [obj.real, obj.imag]
        if HAVE_PANDAS:
            if isinstance(obj, pd.DataFrame):
                return obj.to_dict(orient='records')
            if isinstance(obj, pd.Timestamp):
                return obj.isoformat()
        if HAVE_GEOJSON:
            return geojson.GeoJSONEncoder.default(self, obj)
        return json.JSONEncoder.default(self, obj)
    
class NoDataException(BaseException):
    pass

In [4]:
# this takes 6 seconds, which is much too long, so we cache it
@cache.region('long_term', 'rws')
def get_ddl_metadata():
    """extract metadata from ddl"""
    ddl_url = 'https://waterwebservices.rijkswaterstaat.nl/METADATASERVICES_DBO/OphalenCatalogus/'

    request = {
        "CatalogusFilter": {
            "Eenheden": True,
            "Grootheden": True,
            "Hoedanigheden": True
        }
    }
    resp = requests.post(ddl_url, json=request)
    result = resp.json()
    return result


In [5]:
# this function is slow because every record can have a different coordinate system
@cache.region('short_term', 'rws')
def ddl2df(ddl_data, quantity_filter='WATERLEVEL'):
    """parse ddl data and return a data frame"""
    locaties_df = pd.DataFrame.from_dict(ddl_data['LocatieLijst'])
    metadata_locaties_df = pd.DataFrame.from_dict(ddl_data['AquoMetadataLocatieLijst'])
    metadata_df = pd.DataFrame.from_dict(ddl_data['AquoMetadataLijst'])

    # lowercase all columns (inconsistent)
    # should be in mergeable order
    dfs = [locaties_df, metadata_locaties_df, metadata_df]
    for df in dfs:
        df.columns = [x.lower() for x in df.columns]
    merged_df = functools.reduce(
        lambda left, right: pd.merge(left, right), 
        dfs
    )

    # let's hope there's only one coordinate system in use:
    assert len(set(merged_df.coordinatenstelsel)) == 1, 'assuming only 1 coordinate system'
    # get the epsg code
    crs = list(set(merged_df.coordinatenstelsel))[0]
    srs = osgeo.osr.SpatialReference()
    srs.ImportFromEPSG(int(crs))
    srs2wgs84 = osgeo.osr.CoordinateTransformation(srs, wgs84)
    points = srs2wgs84.TransformPoints(np.array(merged_df[['x', 'y']]))
    points = [
        shapely.geometry.Point(pt[0], pt[1])
        for pt 
        in points
    ]
    
    # this is very slow because we check coordinate system for each row
    merged_df['point'] = points
    merged_df['lon'] = merged_df.point.apply(lambda pt: pt.x)
    merged_df['lat'] = merged_df.point.apply(lambda pt: pt.y)
    
    # flatten some columns
    merged_df['units'] = merged_df.eenheid.apply(lambda x: x['Code'])
    merged_df['quantity'] = merged_df.grootheid.apply(lambda x: x['Code'])
    
    merged_df['standard_name'] = merged_df.grootheid.apply(lambda x: AQUO2CF.get(x['Code'], ''))
    # only show stations of relevant quantity
    idx = np.in1d(merged_df.quantity, FILTERS['WATERLEVEL'])
    filtered_df = merged_df[idx]
    # TODO: 
    # convert to standard names using
    # translation_df = pd.read_csv('../data/deltares/wns_en.csv', sep=';', keep_default_na=False)
    return filtered_df

In [6]:
def df2geojson(df):
    def row2feature(row):
        properties = row.to_dict()
        geometry = row.point
        feature = geojson.Feature(id=row.name, geometry=geometry, properties=properties)
        return feature
    features = df.apply(row2feature, axis=1).values
    fc = geojson.FeatureCollection(features)
    return fc

In [7]:
@cache.region('short_term', 'rws')
def get_ddl_response():
    ddl_data = get_ddl_metadata()
    ddl_df = ddl2df(ddl_data)
    fc = df2geojson(ddl_df)
    # serializa/deserialize to get rid of custom types
    response = json.loads(geojson.dumps(fc, cls=CustomEncoder))
    return response

In [8]:
ddl_data = get_ddl_metadata()
ddl_df = ddl2df(ddl_data)

In [9]:
ddl_df.head()

Unnamed: 0,code,coordinatenstelsel,locatie_messageid,naam,x,y,aquometadata_messageid,eenheid,grootheid,hoedanigheid,parameter_wat_omschrijving,point,lon,lat,units,quantity,standard_name
8480,NIEUWGN,25831,71488,Nieuwegein,644977.664118,5765674.0,71,"{'Omschrijving': 'centimeter', 'Code': 'cm'}","{'Omschrijving': 'Waterhoogte', 'Code': 'WATHTE'}",{'Omschrijving': 't.o.v. Normaal Amsterdams Pe...,Waterhoogte Oppervlaktewater t.o.v. Normaal Am...,POINT (5.113007175423572 52.02272937957118),5.113007,52.022729,cm,WATHTE,sea_surface_height
8481,PUTTHK,25831,71501,Puttershoek,607953.756126,5741249.0,71,"{'Omschrijving': 'centimeter', 'Code': 'cm'}","{'Omschrijving': 'Waterhoogte', 'Code': 'WATHTE'}",{'Omschrijving': 't.o.v. Normaal Amsterdams Pe...,Waterhoogte Oppervlaktewater t.o.v. Normaal Am...,POINT (4.565993453697386 51.81164817873146),4.565993,51.811648,cm,WATHTE,sea_surface_height
8482,COLPT,25831,71647,Colijnsplaat,559519.59855,5716999.0,71,"{'Omschrijving': 'centimeter', 'Code': 'cm'}","{'Omschrijving': 'Waterhoogte', 'Code': 'WATHTE'}",{'Omschrijving': 't.o.v. Normaal Amsterdams Pe...,Waterhoogte Oppervlaktewater t.o.v. Normaal Am...,POINT (3.85937966050791 51.60088238357844),3.85938,51.600882,cm,WATHTE,sea_surface_height
8483,ROERMBVN,25831,72431,Roermond boven,708252.606614,5677508.0,71,"{'Omschrijving': 'centimeter', 'Code': 'cm'}","{'Omschrijving': 'Waterhoogte', 'Code': 'WATHTE'}",{'Omschrijving': 't.o.v. Normaal Amsterdams Pe...,Waterhoogte Oppervlaktewater t.o.v. Normaal Am...,POINT (5.981674344429999 51.21094309197117),5.981674,51.210943,cm,WATHTE,sea_surface_height
8484,LINNBNDN,25831,72679,Linne beneden,704491.407033,5672533.0,71,"{'Omschrijving': 'centimeter', 'Code': 'cm'}","{'Omschrijving': 'Waterhoogte', 'Code': 'WATHTE'}",{'Omschrijving': 't.o.v. Normaal Amsterdams Pe...,Waterhoogte Oppervlaktewater t.o.v. Normaal Am...,POINT (5.925066828981741 51.16762248490855),5.925067,51.167622,cm,WATHTE,sea_surface_height


In [15]:

@cache.region('short_term', 'rws')
def get_series(row, start_time, end_time, validated=False):
    """get timeseries for a given row"""
    station = row

    ddl_data_url = 'https://waterwebservices.rijkswaterstaat.nl/ONLINEWAARNEMINGENSERVICES_DBO/OphalenWaarnemingen'
    request = {
        "AquoPlusWaarnemingMetadata": {
            "AquoMetadata": {
                "Grootheid": {
                    "Code": station.grootheid['Code']
                }
            }
        }, 
        "Locatie": {
            "X": station.x, 
            "Y": station.y, 
            "Code": station.code
        },
        "Periode": {
            "Begindatumtijd": start_time,
            "Einddatumtijd": end_time
        }
    }
    resp = requests.post(ddl_data_url, json=request)
    data = resp.json()
    if not data['Succesvol']:
        raise NoDataException(data['Foutmelding'])
    measurements = []
    metadata = {}
    for row in data['WaarnemingenLijst']:
        # metadata is specified per row, just update it so we have 1
        metadata.update(row['AquoMetadata'])
        standard_name = AQUO2CF.get(row['AquoMetadata']['Grootheid']['Code'])
        metadata['standard_name'] = standard_name
        metadata.update(row['Locatie'])
        for measurement in row['MetingenLijst']:
            # only add validated data if requested
            if validated and 'Gecontroleerd' not in measurement['WaarnemingMetadata']['StatuswaardeLijst']:
                continue
            measurements.append(measurement)
    
    timeseries = []
    for measurement in measurements:
        record = {}
        record['v'] = measurement['Meetwaarde']['Waarde_Numeriek']
        UTC = dateutil.tz.tzutc()
        # parse time
        date = dateutil.parser.parse(measurement['Tijdstip'])
        # convert to UTC
        record['t'] = date.astimezone(UTC)
        timeseries.append(record)
    timeseries_df = pd.DataFrame(timeseries)
    series = timeseries_df
    return {
        "timeseries": series,
        "metadata": metadata
    }

In [16]:
def get_ddl_data(station, start_time, end_time):
    ddl_data = get_ddl_metadata()
    ddl_df = ddl2df(ddl_data)
    
    datasets = []
    available_series = ddl_df[ddl_df.code == station]
    for idx, row in available_series.iterrows():
        try:
            series = get_series(row, start_time, end_time)
        except NoDataException as e:
            logging.exception('no data for %s at %s', row.quantity, row.code)
            continue
        datasets.append(series)
    return datasets
    

In [17]:
station = 'HOEKVHLD'
start_time = "2017-3-10T09:00:00.000+01:00"
end_time = "2017-3-14T10:10:00.000+01:00"



In [19]:
ddl_data = get_ddl_data(station, start_time, end_time)

# return response like this
# json.dumps(ddl_data, cls=CustomEncoder)