In [1]:
import httpx
import json
import pandas
import tqdm
import zipfile

### UDP Konfiguration

In [2]:
udp_domain = "udp-kn.de"
realm = "konstanz"
fiware_service = "nodered3"

# I load my secrets from my password manager; you can set them manually

import subprocess
def password(path):
    return subprocess.run(['pass', path], capture_output=True, text=True, check=True).stdout.splitlines()[0]

idm_client = 'api-access'
idm_client_secret = password(f'work/stadtkn/udp/idm/client/{idm_client}')
idm_username = 'patrik.keller@konstanz.de'
idm_password = password(f'work/stadtkn/udp/idm/user/{idm_username}')

### UDP Schnittstelle

In [3]:
transport = httpx.HTTPTransport(retries=3)
client = httpx.Client(transport=transport)

def get_token():
    r = client.request(
        'POST',
        f'https://idm.{udp_domain}/auth/realms/{realm}/protocol/openid-connect/token',
        data=dict(
            client_id = idm_client,
            client_secret = idm_client_secret,
            grant_type = 'password',
            username = idm_username,
            password = idm_password,
            scope = "api:read api:write api:delete",
        ),
        timeout = 10,
    )
    r.raise_for_status()
    return r.json()['access_token']

def delete_entity_type(ty):
    r = client.request(
        'DELETE',
        f'https://apim.{udp_domain}/gateway/quantumleap/v2/types/{ty}',
        headers = {
            'Authorization' : f"Bearer {get_token()}",
            'Fiware-Service' : fiware_service,
            'Fiware-ServicePath' : '/',
        },
        timeout = 10,
    )
    # r.raise_for_status() # may be 404
    return r

def delete_existing_data(entity_updates):
    unique_types = { x['type'] for x in entity_updates }
    print(f'Delete entity types {list(unique_types)} on UDP ...')
    for ty in unique_types:
        delete_entity_type(ty)

def post_entity_update_batch(lst):
    assert len(lst) <= 128
    r = client.request(
        'POST',
        f'https://apim.{udp_domain}/gateway/quantumleap/v2/notify',
        headers = {
            'Authorization' : f"Bearer {get_token()}",
            'Fiware-Service' : fiware_service,
            'Fiware-ServicePath' : '/',
            'Fiware-TimeIndex-Attribute' : 'time_index',
        },
        json = dict(
            data = lst
        ),
        timeout = 10,
    )
    r.raise_for_status()
    return r

def post_entity_updates(lst):
    print(f"Upload {len(lst)} entity updates to UDP ...")
    batch_size = 128
    if len(lst) <= batch_size:
        post_entity_update_batch(lst)
    else:
        for i in tqdm.tqdm(range(0, len(lst), batch_size)):
            post_entity_update_batch(lst[i:i+batch_size])

def upload_entity_updates(x):
    delete_existing_data(x)
    post_entity_updates(x)

### UDP-Upload: Wasseroberflächentemperaturen

In [4]:
lswt = pandas.read_csv('satellite-lake-water-temperature.csv', sep=',', encoding='utf8', parse_dates=['time'])
lswt

Unnamed: 0,time,lake_surface_water_temperature,lswt_uncertainty,lswt_quality_level,lswt_obs_instr,lswt_flag_bias_correction,lakeid_CCI,lakeid_GloboLakes
0,1995-06-02 12:00:00,286.45110,0.458000,3.000000,1.000000,1.0,352,352
1,1995-06-08 12:00:00,289.06854,0.481524,3.333333,1.000000,1.0,352,352
2,1995-06-24 12:00:00,285.28000,0.511000,1.000000,1.000000,1.0,352,352
3,1995-06-27 12:00:00,291.51250,0.393062,3.812500,1.000000,1.0,352,352
4,1995-06-30 12:00:00,293.44970,0.247692,4.179487,1.000000,1.0,352,352
...,...,...,...,...,...,...,...,...
5244,2023-12-17 12:00:00,276.01170,0.243333,1.166667,85.333336,,352,352
5245,2023-12-19 12:00:00,278.72247,0.210750,3.500000,32.000000,,352,352
5246,2023-12-27 12:00:00,278.06708,0.209583,3.583333,52.000000,,352,352
5247,2023-12-30 12:00:00,278.48135,0.186614,3.454546,32.000000,,352,352


In [5]:
data_cols = list(lswt.columns)
data_cols.remove('time')

def map_row(i, row):
    upd = dict(
        id = f"urn:raw:cds:lswt:cci_lake_id:{row.lakeid_CCI}",
        type = "raw_cds_lswt",
        time_index = dict(
            type = 'DateTime',
            value = row.time.isoformat(),
        ),
    )
    for c in data_cols:
        upd[c] = dict(
            type = 'Number',
            value = row[c],
        )
    return upd

lswt_entity_updates = [ map_row(i, r) for i, r in lswt.iterrows() ]
lswt_entity_updates[:2]

[{'id': 'urn:raw:cds:lswt:cci_lake_id:352',
  'type': 'raw_cds_lswt',
  'time_index': {'type': 'DateTime', 'value': '1995-06-02T12:00:00'},
  'lake_surface_water_temperature': {'type': 'Number', 'value': 286.4511},
  'lswt_uncertainty': {'type': 'Number', 'value': 0.45800003},
  'lswt_quality_level': {'type': 'Number', 'value': 3.0},
  'lswt_obs_instr': {'type': 'Number', 'value': 1.0},
  'lswt_flag_bias_correction': {'type': 'Number', 'value': 1.0},
  'lakeid_CCI': {'type': 'Number', 'value': 352},
  'lakeid_GloboLakes': {'type': 'Number', 'value': 352}},
 {'id': 'urn:raw:cds:lswt:cci_lake_id:352',
  'type': 'raw_cds_lswt',
  'time_index': {'type': 'DateTime', 'value': '1995-06-08T12:00:00'},
  'lake_surface_water_temperature': {'type': 'Number', 'value': 289.06854},
  'lswt_uncertainty': {'type': 'Number', 'value': 0.48152387},
  'lswt_quality_level': {'type': 'Number', 'value': 3.3333333},
  'lswt_obs_instr': {'type': 'Number', 'value': 1.0},
  'lswt_flag_bias_correction': {'type': 

In [6]:
upload_entity_updates(lswt_entity_updates)

Delete entity types ['raw_cds_lswt'] on UDP ...
Upload 5249 entity updates to UDP ...


100%|█████████████████████████████████████████████████████████| 42/42 [00:13<00:00,  3.12it/s]


### UDP-Upload: Hitzewellentage

In [7]:
hwt = pandas.read_pickle('sis-heat-and-cold-spells.pkl.gz').reset_index()
hwt

Unnamed: 0,time,latitude,longitude,rcp85-mean,rcp85-stdev,rcp45-mean,rcp45-stdev
0,1986-01-01,47.7,9.2,0.664428,0.864754,0.664428,1.237293
1,1987-01-01,47.7,9.2,0.820577,0.864754,0.820577,1.237293
2,1988-01-01,47.7,9.2,0.956149,0.864754,0.956149,1.237293
3,1989-01-01,47.7,9.2,0.972930,0.864754,0.972930,1.237293
4,1990-01-01,47.7,9.2,0.993763,0.864754,0.993763,1.237293
...,...,...,...,...,...,...,...
95,2081-01-01,47.7,9.2,14.117562,6.499632,5.402597,2.383281
96,2082-01-01,47.7,9.2,14.437883,6.499632,5.410567,2.383281
97,2083-01-01,47.7,9.2,14.551274,6.499632,5.481069,2.383281
98,2084-01-01,47.7,9.2,14.922153,6.499632,5.383126,2.383281


In [8]:
data_cols = list(hwt.columns)
data_cols.remove('time')
data_cols.remove('latitude')
data_cols.remove('longitude')

def map_row(i, row):
    upd = dict(
        id = f"urn:raw:cds:hwt",
        type = "raw_cds_hwt",
        time_index = dict(
            type = 'DateTime',
            value = row.time.isoformat(),
        ),
        location = dict(
            type = "geo:json",
            value = dict(
                type = "Point",
                coordinates = [ row.longitude, row.latitude ],
            ),
        ),
    )
    for c in data_cols:
        upd[c] = dict(
            type = 'Number',
            value = row[c],
        )
    return upd

hwt_entity_updates = [ map_row(i, r) for i, r in hwt.iterrows() ]
hwt_entity_updates[:2]

[{'id': 'urn:raw:cds:hwt',
  'type': 'raw_cds_hwt',
  'time_index': {'type': 'DateTime', 'value': '1986-01-01T00:00:00'},
  'location': {'type': 'geo:json',
   'value': {'type': 'Point', 'coordinates': [9.200000000000003, 47.7]}},
  'rcp85-mean': {'type': 'Number', 'value': 0.664428174495697},
  'rcp85-stdev': {'type': 'Number', 'value': 0.8647542595863342},
  'rcp45-mean': {'type': 'Number', 'value': 0.664428174495697},
  'rcp45-stdev': {'type': 'Number', 'value': 1.2372934818267822}},
 {'id': 'urn:raw:cds:hwt',
  'type': 'raw_cds_hwt',
  'time_index': {'type': 'DateTime', 'value': '1987-01-01T00:00:00'},
  'location': {'type': 'geo:json',
   'value': {'type': 'Point', 'coordinates': [9.200000000000003, 47.7]}},
  'rcp85-mean': {'type': 'Number', 'value': 0.8205769658088684},
  'rcp85-stdev': {'type': 'Number', 'value': 0.8647542595863342},
  'rcp45-mean': {'type': 'Number', 'value': 0.8205769658088684},
  'rcp45-stdev': {'type': 'Number', 'value': 1.2372934818267822}}]

In [9]:
upload_entity_updates(hwt_entity_updates)

Delete entity types ['raw_cds_hwt'] on UDP ...
Upload 100 entity updates to UDP ...


### UDP-Upload: Klimaindikatoren

In [10]:
ci_data = dict()
ci_meta = dict()
ci_areas = None
with zipfile.ZipFile("sis-ecde-climate-indicators.zip", 'r') as zf:
        for name in zf.namelist():
            if name.endswith('.csv'):
                with zf.open(name) as f: 
                    ci_data[name.removesuffix('.csv')] = pandas.read_csv(f, sep=',', parse_dates=['date'])
            elif name.endswith('.json'):
                with zf.open(name) as f:
                    obj = json.load(f)
                    ci_areas = obj.pop('areas') # areas is the same everywhere, we store it only once
                    ci_meta[name.removesuffix('.json')] = obj

In [11]:
def ci_entity_update_gen():
    for name, df in ci_data.items():
        data_cols = list(df.columns)
        data_cols.remove('date')
        for i, row in df.iterrows():
            upd = dict(
                id = f"urn:raw:cds:climate_indicator:{name}",
                type = "raw_cds_climate_indicator",
                time_index = dict(
                    type = 'DateTime',
                    value = row.date.isoformat(),
                ),
            )
            for c in data_cols:
                assert c not in upd
                upd[c] = dict(
                    type = 'Number',
                    value = row[c],
                )

            for key, value in ci_meta[name].items():
                if key in upd:
                    key = '_' + key
                assert key not in upd
                upd[key] = dict(
                    type = 'Text',
                    value = value,
                )
                
            yield upd
        
ci_entity_updates = list(ci_entity_update_gen())
ci_entity_updates[:2]

[{'id': 'urn:raw:cds:climate_indicator:02_growing_degree_days-reanalysis-yearly-grid-1940-2023-v1.0-t2m',
  'type': 'raw_cds_climate_indicator',
  'time_index': {'type': 'DateTime', 'value': '1940-01-01T00:00:00'},
  'aalen': {'type': 'Number', 'value': 1665.878},
  'bamberg': {'type': 'Number', 'value': 1800.916},
  'berlin': {'type': 'Number', 'value': 1874.234},
  'frankfurt': {'type': 'Number', 'value': 2033.723},
  'freiburg': {'type': 'Number', 'value': 1904.395},
  'hamburg': {'type': 'Number', 'value': 1706.64},
  'heidelberg': {'type': 'Number', 'value': 1999.37},
  'heidenheim': {'type': 'Number', 'value': 1724.329},
  'innsbruck': {'type': 'Number', 'value': 534.2343},
  'karlsruhe': {'type': 'Number', 'value': 2123.29},
  'kempten': {'type': 'Number', 'value': 1433.82},
  'koeln': {'type': 'Number', 'value': 1983.607},
  'konstanz': {'type': 'Number', 'value': 1863.127},
  'leipzig': {'type': 'Number', 'value': 1906.222},
  'madrid': {'type': 'Number', 'value': 3029.502},
 

In [12]:
upload_entity_updates(ci_entity_updates)

Delete entity types ['raw_cds_climate_indicator'] on UDP ...
Upload 7514 entity updates to UDP ...


100%|█████████████████████████████████████████████████████████| 59/59 [00:49<00:00,  1.19it/s]
