Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a heka message decoder config that doesn't clobber the message metadata #79

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion heka/io_modules/decoders/heka/json.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,21 @@ to inject_message so it needs to decode into a Heka message table described
here: https://mozilla-services.github.io/lua_sandbox/heka/message.html

## Decoder Configuration Table
* none

```lua
decoders_heka_json = {
-- Preserve the default_headers passed to decode by storing the json message
-- in Fields, after flattening the json message with a delimiter.
preserve_metadata = false, -- default

-- Use the Timestamp from json when preserve_metadata is true.
preserve_metadata_use_timestamp = false, -- default

-- Delimiter to use when flattening the Fields object of a json message.
-- Used only when preserve_metadata is true.
flatten_delimiter = ".", -- default
}
```

## Functions

Expand All @@ -34,18 +48,38 @@ Decode and inject the resulting message
--]]

-- Imports
local module_name = ...
local module_cfg = require "string".gsub(module_name, "%.", "_")
local cjson = require "cjson"

local pairs = pairs
local type = type

local inject_message = inject_message

local cfg = read_config(module_cfg) or {}
assert(type(cfg) == "table", module_cfg .. " must be a table")
cfg.flatten_delimiter = cfg.flatten_delimiter or "."
assert(type(cfg.flatten_delimiter) == "string", module_cfg .. ".flatten_delimiter must be a string")
fields_prefix = "Fields" .. cfg.flatten_delimiter

local M = {}
setfenv(1, M) -- Remove external access to contain everything in the module

function decode(data, dh)
local msg = cjson.decode(data)
if cfg.preserve_metadata then
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this comment means as you have access to everything in the current message and the current sandbox environment.

dh.Type and msg.Type may have different values here.

With the previous behavior there was no way to preserve dh.Type so that it could be used in a message matcher on an output.

With the new behavior proposed here, setting cfg.preserve_metadata would allow them to be referenced in a message matcher as Type and Fields[Type] respectively, with the caveat that msg.Fields.agent (or similar) would have to be referenced in a message matcher as Fields[Fields.agent]

if type(msg.Fields) == "table" then
for k,v in pairs(msg.Fields.Fields) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to see your original input as this implies it does not conform to the Heka message schema as there is no nested Fields of Fields hash.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a typo, it is supposed to be msg.Fields

msg[fields_prefix..k] = v
end
msg.Fields = none
end
msg = {Fields=msg}
if cfg.preserve_metadata_use_timestamp then
msg.Timestamp = msg.Fields.Timestamp
end
end

if dh then
if not msg.Uuid then msg.Uuid = dh.Uuid end
Expand Down