Skip to content

Commit

Permalink
Bugzilla 1368196 Create a moz_ingest generic JSON decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
trink committed Sep 19, 2017
1 parent 35860f7 commit 3b71e3b
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 9 deletions.
8 changes: 7 additions & 1 deletion moz_ingest/io_modules/decoders/moz_ingest/common.lua
Expand Up @@ -12,7 +12,8 @@ handling before passing the data off to the correct subdecoder.
```lua
decoders_moz_ingest_common = {
sub_decoders = { -- required
-- _namespace_ (string) - Decoder module name
-- _namespace_ (string) = Decoder module name
-- a namespace of "*" can be used to specify a default decoder
telemetry = "decoders.moz_ingest.telemetry",
},
Expand Down Expand Up @@ -72,6 +73,7 @@ local table = require "table"
local assert = assert
local pairs = pairs
local pcall = pcall
local setmetatable = setmetatable
local create_stream_reader = create_stream_reader
local decode_message = decode_message
Expand Down Expand Up @@ -123,6 +125,10 @@ local function load_decoder_cfg()
local tm = require(v).transform_message
assert(type(tm) == "function", "sub_decoders, no transform_message function defined: " .. k)
sub_decoders[k] = tm
if k == "*" then
local mt = {__index = function(t, k) return tm end }
setmetatable(sub_decoders, mt);
end
sd_cnt = sd_cnt + 1
elseif t == "boolean" and t and k == "test" then
sd_cnt = 1
Expand Down
138 changes: 138 additions & 0 deletions moz_ingest/io_modules/decoders/moz_ingest/json.lua
@@ -0,0 +1,138 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
# Mozilla Telemetry Decoder Module
## Decoder Configuration Table
```lua
decoders_moz_ingest_json = {
-- String used to specify the root namespace directory. The path should
-- contain one directory for each namespace. Each namespace directory should
-- contain one directory for each docType and the files in the directory
-- must be named <docType>.<version>.schema.json. If the schema file is not
-- found for a namespace/docType/version combination, an error is generated.
namespace_path = "/mnt/work/mozilla-pipeline-schemas/schemas",
-- array of namespace directories to ignore
-- namespace_ignore = {"heka", "metadata", "pioneer-study", "telemetry"},
}
```
## Functions
### transform_message
Transform and inject the message using the provided stream reader.
*Arguments*
- hsr (hsr) - stream reader with the message to process
*Return*
- throws on error
### decode
Decode and inject the message given as argument, using a module-internal stream reader.
*Arguments*
- msg (string) - Heka protobuf string to decode
*Return*
- throws on error
--]]
-- Imports
local module_name = ...
local string = require "string"
local module_cfg = string.gsub(module_name, "%.", "_")
local rjson = require "rjson"
local miu = require "moz_ingest.util"
local lfs = require "lfs"
local read_config = read_config
local assert = assert
local error = error
local create_stream_reader = create_stream_reader
local inject_message = inject_message
local type = type
local pcall = pcall
local M = {}
setfenv(1, M) -- Remove external access to contain everything in the module
local cfg = read_config(module_cfg)
assert(type(cfg) == "table", module_cfg .. " must be a table")
assert(type(cfg.namespace_path) == "string", "namespace_path must be set")
if cfg.namespace_ignore == nil then
cfg.namespace_ignore = {"heka", "metadata", "pioneer-study", "telemetry"}
end
assert(type(cfg.namespace_ignore) == "table", "namespace_ignore must be a table")
local function load_namespaces(path)
local t = {}
for dn in lfs.dir(path) do
local fqdn = string.format("%s/%s", path, dn)
local mode = lfs.attributes(fqdn, "mode")
if mode == "directory" and not dn:match("^%.") and not cfg.namespace_ignore[dn] then
t[dn] = miu.load_json_schemas(fqdn)
end
end
return t
end
local namespaces = load_namespaces(cfg.namespace_path)
local submissionField = {value = nil, representation = "json"}
local doc = rjson.parse("{}") -- reuse this object to avoid creating a lot of GC
local function process_json(hsr, msg)
local ok, err = pcall(doc.parse_message, doc, hsr, "Fields[content]", nil, nil, true)
if not ok then
error(string.format("json\tinvalid submission: %s", err), 0)
end
local schema = namespaces[msg.Logger]
if schema then schema = schema[msg.Fields.docType] end
if schema then schema = schema[msg.Fields.docVersion] end
if schema then
ok, err = doc:validate(schema)
else
err = "schema not found"
end
if err then
error(string.format("json\tnamespace: %s schema: %s version: %d error: %s",
msg.Logger, msg.Fields.docType, msg.Fields.docVersion, err), 0)
end
end
function transform_message(hsr, msg)
if not msg then
msg = miu.new_message(hsr)
end
process_json(hsr, msg)
-- Migrate the original message data after the validation (avoids Field duplication in the error message)
msg.Hostname = hsr:read_message("Hostname")
msg.Fields.Host = hsr:read_message("Fields[Host]")
local ok, err = pcall(inject_message, msg)
if not ok then
error("inject_message\t" .. err, 0)
end
end
local hsr = create_stream_reader("decoders.moz_ingest.json")
function decode(msg)
hsr:decode_message(msg)
transform_message(hsr)
end
return M
14 changes: 6 additions & 8 deletions moz_ingest/tests/hindsight/input.hpb
@@ -1,34 +1,32 @@
`
 ���PDܤ�
|ܝDaȵ������
:�E�ӛIߣs�S>u�����۽��
common.raw"
moz_ingestJ example.comR
remote_addr"192.30.255.112p
����O-���
���ٍ������
6�.��mCu�pN�~������۽��
common.raw"
moz_ingestJ example.comR
uri"/foobarR
remote_addr"192.30.255.112�
�l�-`H1����<(��������
�틩��I,���V�-j4����۽��
common.raw"
moz_ingestJ example.comRC
uri"</submit/common/foobar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A01R
remote_addr"192.30.255.112�
ڼ"���Ln��xE������������
ӘӥKߧi��X�[����۽��
common.raw"
moz_ingestJ example.comRC
uri"</submit/common/foobar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A01R
remote_addr"192.30.255.112�
sY���A����r� L��������
FJ��K�{]��I|���۽��
common.raw"
moz_ingestJ example.comR
geoCity"HalifaxR
remote_addr"192.30.255.112RD
uri"=/submit/common/widget/99/0055FAC4-8A1A-4FCA-B380-EBFDC8571A02R

geoCountry"CA�
�,�e�DëL����V�Ɣ�����
_!e3<D����+L~������۽��
common.raw"
moz_ingestJ example.comR
remote_addr"192.30.255.112RD
Expand Down
Binary file added moz_ingest/tests/hindsight/json.hpb
Binary file not shown.
16 changes: 16 additions & 0 deletions moz_ingest/tests/hindsight/namespaces/foo/bar/bar.1.schema.json
@@ -0,0 +1,16 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"properties": {
"exampleString": {
"description": "An example string.",
"maxLength": 1024,
"minLength": 1,
"type": "string"
}
},
"required": [
"exampleString"
],
"title": "foo",
"type": "object"
}
@@ -0,0 +1,3 @@
filename = "verify_json_decoder.lua"
message_matcher = "Logger != 'moz_ingest' && Logger != 'common'"
ticker_interval = 0
87 changes: 87 additions & 0 deletions moz_ingest/tests/hindsight/run/analysis/verify_json_decoder.lua
@@ -0,0 +1,87 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
# Verifies the test data for moz_ingest common decoder
--]]
require "string"
local messages = {
{Logger = "foo", Type = "validated", Hostname = "example.com", Fields = {
docVersion = 1,
docType = "bar",
geoCity = "New York",
geoCountry = "US",
documentId = "0055FAC4-8A1A-4FCA-B380-EBFDC8571A01"
}
},
{Logger = "foo", Type = "error", Hostname = "example.com", Fields = {
uri = "/submit/foo/bar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A02",
documentId = "0055FAC4-8A1A-4FCA-B380-EBFDC8571A02",
docType = "bar",
DecodeErrorType = "json",
DecodeError = "invalid submission: failed to parse offset:0 Invalid value.",
geoCity = "New York",
geoCountry = "US",
docVersion = 1,
content = ""
}
},
{Logger = "foo", Type = "error", Hostname = "example.com", Fields = {
uri = "/submit/foo/bar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A03",
documentId = "0055FAC4-8A1A-4FCA-B380-EBFDC8571A03",
docType = "bar",
DecodeErrorType = "json",
DecodeError = "namespace: foo schema: bar version: 1 error: SchemaURI: # Keyword: required DocumentURI: #",
geoCity = "New York",
geoCountry = "US",
docVersion = 1,
content = [[{"xString":"string one"}]]
}
},
{Logger = "bar", Type = "error", Hostname = "example.com", Fields = {
uri = "/submit/bar/bar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A01",
documentId = "0055FAC4-8A1A-4FCA-B380-EBFDC8571A01",
docType = "bar",
DecodeErrorType = "json",
DecodeError = "namespace: bar schema: bar version: 1 error: schema not found",
geoCity = "New York",
geoCountry = "US",
docVersion = 1,
content = "{}"
}
},
}

local function verify_fields(idx, fields)
for k,v in pairs(fields) do
local name = string.format("Fields[%s]", k)
local r = read_message(name)
assert(v == r, string.format("Test %d Fields[%s] = %s", idx, k, tostring(r)))
end
end

local function verify_message(idx)
local msg = messages[idx]
for k,v in pairs(msg) do
if k == "Fields" then
verify_fields(idx, v)
else
local r = read_message(k)
assert(v == r, string.format("Test %d %s = %s", idx, k, tostring(r)))
end
end
end

local cnt = 0
function process_message()
cnt = cnt + 1
verify_message(cnt)
return 0
end

function timer_event(ns)
assert(cnt == #messages, string.format("%d of %d tests ran", cnt, #messages))
end
21 changes: 21 additions & 0 deletions moz_ingest/tests/hindsight/run/input/generate_data.lua
Expand Up @@ -44,5 +44,26 @@ function process_message()
msg.Fields["X-Forwarded-For"] = "72.229.28.185, 127.0.0.1"
inject_message(msg)
-- valid
msg.Type = "json.raw"
msg.Fields.uri = "/submit/foo/bar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A01"
msg.Fields.content = [[{"exampleString":"string one"}]]
inject_message(msg)

-- fails parsing
msg.Fields.uri = "/submit/foo/bar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A02"
msg.Fields.content = ""
inject_message(msg)

-- fails validation
msg.Fields.uri = "/submit/foo/bar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A03"
msg.Fields.content = [[{"xString":"string one"}]]
inject_message(msg)

-- fails schema lookup
msg.Fields.uri = "/submit/bar/bar/1/0055FAC4-8A1A-4FCA-B380-EBFDC8571A01"
msg.Fields.content = "{}"
inject_message(msg)

return 0
end
21 changes: 21 additions & 0 deletions moz_ingest/tests/hindsight/run/input/test_json_decoder.cfg
@@ -0,0 +1,21 @@
filename = "test_json_decoder.lua"

decoders_moz_ingest_common = {
sub_decoders = {
["*"] = "decoders.moz_ingest.json"
},
error_on_missing_sub_decoder = true,

-- String used to specify GeoIP city database location on disk.
city_db_file = "/usr/share/geoip/city.db", -- optional, if not specified no geoip lookup is performed

-- number of items in the de-duping cuckoo filter
cf_items = 1000, -- optional, if not provided de-duping is disabled

-- interval size in minutes for cuckoo filter pruning
cf_interval_size = 1
}

decoders_moz_ingest_json = {
namespace_path = "namespaces",
}

0 comments on commit 3b71e3b

Please sign in to comment.