diff --git a/bin/build_pipeline_heka.sh b/bin/build_pipeline_heka.sh index 1176e66..b56884c 100755 --- a/bin/build_pipeline_heka.sh +++ b/bin/build_pipeline_heka.sh @@ -57,7 +57,7 @@ cp -R $BASE/heka/cmd/heka-export ./cmd/ cp -R $BASE/heka/cmd/heka-s3list ./cmd/ cp -R $BASE/heka/cmd/heka-s3cat ./cmd/ -echo 'Installing/updating lua filters/modules/decoders' +echo 'Installing/updating lua filters/modules/decoders/encoders' rsync -vr $BASE/heka/sandbox/ ./sandbox/lua/ echo 'Updating plugins with local changes' diff --git a/heka/sandbox/encoders/combine_telemetry_objects.lua b/heka/sandbox/encoders/combine_telemetry_objects.lua new file mode 100644 index 0000000..855e5ac --- /dev/null +++ b/heka/sandbox/encoders/combine_telemetry_objects.lua @@ -0,0 +1,34 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +require "cjson" +local l = require "lpeg" + +local grammar = (l.C"payload" + l.C"environment") * l.P"." * l.C(l.P(1)^1) + +function process_message() + local raw = read_message("raw") + local ok, msg = pcall(decode_message, raw) + if not ok then return -1, msg end + + if type(msg.Fields) ~= "table" then return -1, "missing Fields" end + + local ok, json = pcall(cjson.decode, read_message("Payload")) + if not ok then return -1, json end + + for i=1, #msg.Fields do + local section, name = grammar:match(msg.Fields[i].name) + if section then + local ok, object = pcall(cjson.decode, msg.Fields[i].value[1]) + if ok then + json[section][name] = object + end + end + end + local ok, payload = pcall(cjson.encode, json) + if not ok then return -1, payload end + + inject_payload("txt", "output", msg.clientId, "\t", payload, "\n") + return 0 +end