From 693339af0ddd72bb6ea5ffd398e7ecb664d874b0 Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Fri, 17 Feb 2017 10:26:33 -0800 Subject: [PATCH 1/3] add a heka message decoder config that doesn't clobber the message metadata (default_headers) --- heka/io_modules/decoders/heka/json.lua | 36 +++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/heka/io_modules/decoders/heka/json.lua b/heka/io_modules/decoders/heka/json.lua index 9a92498a3..1b5502085 100644 --- a/heka/io_modules/decoders/heka/json.lua +++ b/heka/io_modules/decoders/heka/json.lua @@ -12,7 +12,21 @@ to inject_message so it needs to decode into a Heka message table described here: https://mozilla-services.github.io/lua_sandbox/heka/message.html ## Decoder Configuration Table -* none + +```lua +decoders_heka_json = { + -- Preserve the default_headers passed to decode by storing the json message + -- in Fields, after flattening the json message with a delimiter. + preserve_metadata = false, -- default + + -- Use the Timestamp from json when preserve_metadata is true. + preserve_metadata_use_timestamp = false, -- default + + -- Delimiter to use when flattening the Fields object of a json message. + -- Used only when preserve_metadata is true. + flatten_delimiter = ".", -- default +} +``` ## Functions @@ -34,6 +48,8 @@ Decode and inject the resulting message --]] -- Imports +local module_name = ... +local module_cfg = require "string".gsub(module_name, "%.", "_") local cjson = require "cjson" local pairs = pairs @@ -41,11 +57,29 @@ local type = type local inject_message = inject_message +local cfg = read_config(module_cfg) or {} +assert(type(cfg) == "table", module_cfg .. " must be a table") +cfg.flatten_delimiter = cfg.flatten_delimiter or "." +assert(type(cfg.flatten_delimiter) == "string", module_cfg .. ".flatten_delimiter must be a string") +fields_prefix = "Fields" .. cfg.flatten_delimiter + local M = {} setfenv(1, M) -- Remove external access to contain everything in the module function decode(data, dh) local msg = cjson.decode(data) + if cfg.preserve_metadata then + if type(msg.Fields) == "table" then + for k,v in pairs(msg.Fields.Fields) do + msg[fields_prefix..k] = v + end + msg.Fields = none + end + msg = {Fields=msg} + if cfg.preserve_metadata_use_timestamp then + msg.Timestamp = msg.Fields.Timestamp + end + end if dh then if not msg.Uuid then msg.Uuid = dh.Uuid end From 6b34eaed766ad67dec528754628042952c5392dd Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Mon, 20 Feb 2017 22:28:28 -0800 Subject: [PATCH 2/3] fix typo'd msg.Fields --- heka/io_modules/decoders/heka/json.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heka/io_modules/decoders/heka/json.lua b/heka/io_modules/decoders/heka/json.lua index 1b5502085..ce61d337b 100644 --- a/heka/io_modules/decoders/heka/json.lua +++ b/heka/io_modules/decoders/heka/json.lua @@ -70,7 +70,7 @@ function decode(data, dh) local msg = cjson.decode(data) if cfg.preserve_metadata then if type(msg.Fields) == "table" then - for k,v in pairs(msg.Fields.Fields) do + for k,v in pairs(msg.Fields) do msg[fields_prefix..k] = v end msg.Fields = none From 7e150dd27b8fd0a7abe794c9144ad4fd53636955 Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Thu, 2 Mar 2017 13:43:49 -0800 Subject: [PATCH 3/3] s/none/nil/ --- heka/io_modules/decoders/heka/json.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heka/io_modules/decoders/heka/json.lua b/heka/io_modules/decoders/heka/json.lua index ce61d337b..b6c941d35 100644 --- a/heka/io_modules/decoders/heka/json.lua +++ b/heka/io_modules/decoders/heka/json.lua @@ -73,7 +73,7 @@ function decode(data, dh) for k,v in pairs(msg.Fields) do msg[fields_prefix..k] = v end - msg.Fields = none + msg.Fields = nil end msg = {Fields=msg} if cfg.preserve_metadata_use_timestamp then