Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
82 lines (74 sloc) 2.44 KB
# Smart Emission Data Refiner ETL - Stetl config
#
# Just van den Broecke - 2016-2018
#
# This config reads the raw timeseries measurements from the DB, refines these
# and writes results in the measurements DB.
# The main Stetl ETL chain
[etl]
chains = input_raw_sensor_db | refine_filter| (output_postgres_insert) (component_sieve|output_influxdb_write)
# chains = input_raw_sensor_db|refine_filter|output_postgres_insert
# read raw data from timeseries table
[input_raw_sensor_db]
class = smartem.rawdbinput.RawDbInput
host = {pg_host}
database = {pg_database}
user = {pg_user}
password = {pg_password}
schema = {pg_schema_raw}
table = timeseries
output_format = record
last_gid_query = SELECT gid_raw from smartem_refined.refiner_progress
max_input_records = {refiner_max_input_records}
gids_query = SELECT gid from timeseries WHERE gid > %d AND gid <= %d AND complete = TRUE ORDER BY gid
data_query = SELECT * from timeseries WHERE gid = %d
read_once = {refiner_raw_read_once}
# Refines raw records for specified sensor names
[refine_filter]
class = smartem.refiner.refinefilter.RefineFilter
sensor_names = temperature,humidity,pressure,noiseavg,noiselevelavg,co2,o3,co,no,no2,o3raw,coraw,noraw,no2raw,pm10,pm2_5
host = {pg_host}
database = {pg_database}
user = {pg_user}
password = {pg_password}
schema = {pg_schema_calibrated}
process_name = refiner
# Sieve out only gas-components
[component_sieve]
class = filters.sieve.AttrValueRecordSieve
input_format = record_array
output_format = record_array
attr_name = name
attr_values = o3,co,no,no2,co2
# for testing/debugging
[output_std]
class = outputs.standardoutput.StandardOutput
# Insert file records
[output_postgres_insert]
class = outputs.dboutput.PostgresInsertOutput
input_format = record_array
host = {pg_host}
database = {pg_database}
user = {pg_user}
password = {pg_password}
schema = {pg_schema_refined}
table = timeseries
replace=True
# Write records to InfluxDB server
[output_influxdb_write]
class = smartem.influxdboutput.InfluxDbOutput
input_format = record_array
method = POST
list_fanout = False
content_type = application/x-www-form-urlencoded
host = {influx_host}
port = {influx_port}
database = {influx_se_database}
measurement = {influx_se_measurement_refined}
tags_map = {{'station': 'device_id', 'component': 'name' }}
fields_map = {{'value': 'value'}}
time_attr = time
# geohash_map = {{'lat': 'lat', 'lon': 'lon' }}
geohash_wkt_attr = point
user = {influx_se_writer}
password = {influx_se_writer_password}
You can’t perform that action at this time.