/
harvestluftdaten.py
137 lines (110 loc) · 5.71 KB
/
harvestluftdaten.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
from datetime import datetime
import json
from stetl.util import Util
from stetl.packet import FORMAT
from luftdateninput import LuftdatenInput
log = Util.get_log("HarvesterLuftdatenInput")
class HarvesterLuftdatenInput(LuftdatenInput):
"""
Luftdaten.info timeseries (last hour) Values fetcher/formatter.
For now mainly within one or more bbox-es, later other options.
API doc: https://github.com/opendata-stuttgart/meta/wiki/APIs
API Example all last 5 min measurements in greater Nijmegen area
http://api.luftdaten.info/v1/filter/box=51.7,5.6,51.9,6.0
One sensor:
http://api.luftdaten.info/v1/sensor/17008/
Last hour average all sensors (no bbox query possible):
http://api.luftdaten.info/static/v2/data.1h.json
"""
def __init__(self, configdict, section, produces=FORMAT.record_array):
LuftdatenInput.__init__(self, configdict, section, produces)
self.current_time_secs = lambda: int(round(time.time()))
# Init all bbox id's
self.bboxes_vals = list()
if self.bboxes:
self.bboxes_vals = self.bboxes.values()
# Format all LTD sensor item object to record (overridden from HttpInput).
def assemble(self, sensor_items):
d = datetime.utcfromtimestamp(self.current_time_secs())
day = int(d.strftime('%Y%m%d'))
hour = d.hour + 1
device_records = dict()
for sensor_item in sensor_items:
location_id = 'unknown'
try:
for bbox in self.bboxes_vals:
# Single array of floats lowerleft (lat,lon), upperright (lat,lon)
location = sensor_item['location']
location_id = location['id']
longitude = float(location['longitude'])
latitude = float(location['latitude'])
if latitude > bbox[0] and longitude > bbox[1] and latitude < bbox[2] and longitude < bbox[3]:
sensor_record = self.sensor_item2record(sensor_item)
if not sensor_record:
log.warn('Error sensor_item2record location_id=%s - skipping' % str(location_id))
continue
device_name = sensor_record['device_name']
if device_name not in device_records:
log.info('Create new raw data record for device_name=%s' % device_name)
#
# -- Map this to
# CREATE TABLE smartem_raw.timeseries (
# gid serial,
# unique_id character varying (16),
# insert_time timestamp with time zone default current_timestamp,
# device_id integer,
# day integer,
# hour integer,
# data json,
# complete boolean default false,
# PRIMARY KEY (gid)
# );
# Create record with JSON text blob with metadata
record = dict()
device_id = sensor_record['device_id']
# Timestamp of sample
record['device_id'] = device_id
record['device_type'] = self.device_type
record['unique_id'] = '%s-%s-%s' % (str(device_id), str(day), str(hour))
record['device_version'] = self.device_version
record['day'] = day
record['hour'] = hour
# Determine if hour is "complete"
record['complete'] = True
# Add JSON text blob
for item in sensor_record['data']['timeseries']:
if 'time' in item:
del (item['time'])
item['latitude'] = latitude
item['longitude'] = longitude
record['data'] = {
'id': device_id,
'date': day,
'hour': hour,
'timeseries': sensor_record['data']['timeseries']
}
device_records[device_name] = record
else:
record = device_records[device_name]
for item in sensor_record['data']['timeseries']:
if 'time' in item:
del (item['time'])
item['latitude'] = latitude
item['longitude'] = longitude
record['data']['timeseries'].append(item)
log.info('Appending timeseries for device_name=%s' % device_name)
except Exception as e:
log.warn('Error location_id=%s, err= %s' % (str(location_id), str(e)))
continue
records = device_records.values()
for record in records:
# Need json blob format for data field
json_data = json.dumps(record['data'])
record['data'] = json_data
return records
# Format all LTD sensor item object to record (overridden from HttpInput).
def format_data(self, data):
sensor_items = self.parse_json_str(data)
records = self.assemble(sensor_items)
return records