From 119a219a7a1c2ef222a35552fa2bd1af561f435e Mon Sep 17 00:00:00 2001 From: Just van den Broecke Date: Mon, 11 Feb 2019 17:02:46 +0100 Subject: [PATCH] smartemission/smartemission#153 LuftDaten: harvesting history data and refiner for hour-values --- config/harvester_luftdaten.cfg | 37 +++++++ smartem/devices/airsenseur.py | 16 --- smartem/devices/device.py | 14 ++- smartem/harvester/harvestluftdaten.py | 137 ++++++++++++++++++++++++++ smartem/harvester/luftdateninput.py | 6 +- smartem/refiner/refiner.py | 26 ++--- 6 files changed, 206 insertions(+), 30 deletions(-) create mode 100644 config/harvester_luftdaten.cfg create mode 100644 smartem/harvester/harvestluftdaten.py diff --git a/config/harvester_luftdaten.cfg b/config/harvester_luftdaten.cfg new file mode 100644 index 0000000..60ed411 --- /dev/null +++ b/config/harvester_luftdaten.cfg @@ -0,0 +1,37 @@ +# Smart Emission Data Harvester - Stetl config +# +# Just van den Broecke - 2016 +# +# This config reads measurements from Luftdaten.info API. +# It inserts this data as JSON blobs in Postgres. + +# The main Stetl ETL chain +[etl] +chains = input_luftdaten_1|output_postgres_insert + +# Read last hour of averaged LuftDaten +[input_luftdaten_1] +class = smartem.harvester.harvestluftdaten.HarvesterLuftdatenInput +device_type = vanilla +device_version = 1 +url = http://api.luftdaten.info/static/v2/data.1h.json +# bboxes={{'Nijmegen': [51.7,5.6,51.9,6.0], 'Amsterdam': [52.3,4.7,52.5,5.1] }} +bboxes={{'Nijmegen': [51.7,5.6,51.9,6.0] }} + +# for testing/debugging +[output_std] +class = outputs.standardoutput.StandardOutput + +# Insert file records +[output_postgres_insert] +class = outputs.dboutput.PostgresInsertOutput +input_format = record_array +host = {pg_host} +database = {pg_database} +user = {pg_user} +password = {pg_password} +schema = {pg_schema_raw} +table = timeseries +key=unique_id +replace=True + diff --git a/smartem/devices/airsenseur.py b/smartem/devices/airsenseur.py index cc97f46..c88bd91 100644 --- a/smartem/devices/airsenseur.py +++ b/smartem/devices/airsenseur.py @@ -120,22 +120,6 @@ def check_value(self, name, val_dict, value=None): return True, '%s OK' % name - # Get location as lon, lat - def get_lon_lat(self, val_dict): - lon = None - lat = None - if 'longitude' in val_dict: - lon = val_dict['longitude'] - if lon < -90.0 or lon > 90.0: - return None, None - - if 'latitude' in val_dict: - lat = val_dict['latitude'] - if lat < -180.0 or lat > 180.0: - return None, None - - return lon, lat - def get_last_value(self, device_id, name, val_dict): try: # Best effort diff --git a/smartem/devices/device.py b/smartem/devices/device.py index 0d4adea..6a35bfe 100644 --- a/smartem/devices/device.py +++ b/smartem/devices/device.py @@ -67,4 +67,16 @@ def set_last_value(self, device_id, name, value, val_dict): # Get location as lon, lat def get_lon_lat(self, val_dict): - return None, None + lon = None + lat = None + if 'longitude' in val_dict: + lon = float(val_dict['longitude']) + if lon < -90.0 or lon > 90.0: + return None, None + + if 'latitude' in val_dict: + lat = float(val_dict['latitude']) + if lat < -180.0 or lat > 180.0: + return None, None + + return lon, lat diff --git a/smartem/harvester/harvestluftdaten.py b/smartem/harvester/harvestluftdaten.py new file mode 100644 index 0000000..ccf5d0c --- /dev/null +++ b/smartem/harvester/harvestluftdaten.py @@ -0,0 +1,137 @@ +import time +from datetime import datetime +import json +from stetl.util import Util +from stetl.packet import FORMAT +from luftdateninput import LuftdatenInput + +log = Util.get_log("HarvesterLuftdatenInput") + + +class HarvesterLuftdatenInput(LuftdatenInput): + """ + Luftdaten.info timeseries (last hour) Values fetcher/formatter. + + For now mainly within one or more bbox-es, later other options. + API doc: https://github.com/opendata-stuttgart/meta/wiki/APIs + + API Example all last 5 min measurements in greater Nijmegen area + http://api.luftdaten.info/v1/filter/box=51.7,5.6,51.9,6.0 + One sensor: + http://api.luftdaten.info/v1/sensor/17008/ + + Last hour average all sensors (no bbox query possible): + http://api.luftdaten.info/static/v2/data.1h.json + + """ + + def __init__(self, configdict, section, produces=FORMAT.record_array): + LuftdatenInput.__init__(self, configdict, section, produces) + self.current_time_secs = lambda: int(round(time.time())) + + # Init all bbox id's + self.bboxes_vals = list() + if self.bboxes: + self.bboxes_vals = self.bboxes.values() + + # Format all LTD sensor item object to record (overridden from HttpInput). + def assemble(self, sensor_items): + d = datetime.utcfromtimestamp(self.current_time_secs()) + day = int(d.strftime('%Y%m%d')) + hour = d.hour + 1 + + device_records = dict() + for sensor_item in sensor_items: + location_id = 'unknown' + try: + for bbox in self.bboxes_vals: + # Single array of floats lowerleft (lat,lon), upperright (lat,lon) + location = sensor_item['location'] + location_id = location['id'] + longitude = float(location['longitude']) + latitude = float(location['latitude']) + + if latitude > bbox[0] and longitude > bbox[1] and latitude < bbox[2] and longitude < bbox[3]: + + sensor_record = self.sensor_item2record(sensor_item) + if not sensor_record: + log.warn('Error sensor_item2record location_id=%s - skipping' % str(location_id)) + continue + + device_name = sensor_record['device_name'] + + if device_name not in device_records: + log.info('Create new raw data record for device_name=%s' % device_name) + # + # -- Map this to + # CREATE TABLE smartem_raw.timeseries ( + # gid serial, + # unique_id character varying (16), + # insert_time timestamp with time zone default current_timestamp, + # device_id integer, + # day integer, + # hour integer, + # data json, + # complete boolean default false, + # PRIMARY KEY (gid) + # ); + + # Create record with JSON text blob with metadata + record = dict() + device_id = sensor_record['device_id'] + + # Timestamp of sample + record['device_id'] = device_id + record['device_type'] = self.device_type + record['unique_id'] = '%s-%s-%s' % (str(device_id), str(day), str(hour)) + record['device_version'] = self.device_version + record['day'] = day + record['hour'] = hour + + # Determine if hour is "complete" + record['complete'] = True + + # Add JSON text blob + for item in sensor_record['data']['timeseries']: + if 'time' in item: + del (item['time']) + item['latitude'] = latitude + item['longitude'] = longitude + record['data'] = { + 'id': device_id, + 'date': day, + 'hour': hour, + 'timeseries': sensor_record['data']['timeseries'] + } + + device_records[device_name] = record + else: + record = device_records[device_name] + for item in sensor_record['data']['timeseries']: + if 'time' in item: + del (item['time']) + item['latitude'] = latitude + item['longitude'] = longitude + + record['data']['timeseries'].append(item) + + log.info('Appending timeseries for device_name=%s' % device_name) + + + except Exception as e: + log.warn('Error location_id=%s, err= %s' % (str(location_id), str(e))) + continue + + records = device_records.values() + for record in records: + # Need json blob format for data field + json_data = json.dumps(record['data']) + record['data'] = json_data + + return records + + # Format all LTD sensor item object to record (overridden from HttpInput). + def format_data(self, data): + sensor_items = self.parse_json_str(data) + records = self.assemble(sensor_items) + return records diff --git a/smartem/harvester/luftdateninput.py b/smartem/harvester/luftdateninput.py index 0f64fb6..388c8ff 100644 --- a/smartem/harvester/luftdateninput.py +++ b/smartem/harvester/luftdateninput.py @@ -189,7 +189,11 @@ def sensor_item2record(self, sensor_item): sensor_meta = '%s-%d' % (sensor_type_name, sensor_id) unique_id = '%s-%s' % (device_name, sensor_meta) - altitude = int(round(float(location['altitude']))) + altitude = 0 + if 'altitude' in location: + altitude = int(round(float(location['altitude']))) + + # We MUST have lat/lon, otherwise exception and skip. longitude = location['longitude'] latitude = location['latitude'] point = 'SRID=4326;POINT(%s %s)' % (longitude, latitude) diff --git a/smartem/refiner/refiner.py b/smartem/refiner/refiner.py index 2151541..db70f15 100644 --- a/smartem/refiner/refiner.py +++ b/smartem/refiner/refiner.py @@ -173,19 +173,8 @@ def refine(self, record_in, sensor_names): record['device_meta'] = device_meta record['sensor_meta'] = self.device.get_sensor_meta_id(sensor_name, sensor_vals) - # Optional fields dependent on input record - if 'value_stale' in record_in: - record['value_stale'] = record_in['value_stale'] - - if 'device_name' in record_in: - record['device_name'] = record_in['device_name'] - - if 'unique_id' in record_in: - record['unique_id'] = '%s-%s' % (str(record_in['unique_id']), sensor_name) - else: - record['unique_id'] = '%d-%s' % (device_id, sensor_name) - if day > 0: + # Refined timeseries table record['day'] = day record['hour'] = hour @@ -201,7 +190,20 @@ def refine(self, record_in, sensor_names): record['time'] = datetime.strptime('%sGMT' % day_hour, '%Y%m%d%HGMT').replace(tzinfo=pytz.utc) else: + # For "last" values table + + # Optional fields dependent on input record + if 'value_stale' in record_in: + record['value_stale'] = record_in['value_stale'] + + if 'device_name' in record_in: + record['device_name'] = record_in['device_name'] + record['time'] = record_in['time'] + if 'unique_id' in record_in: + record['unique_id'] = '%s-%s' % (str(record_in['unique_id']), sensor_name) + else: + record['unique_id'] = '%d-%s' % (device_id, sensor_name) record['name'] = sensor_name record['label'] = sensor_def['label']