Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

How to convert a PayloadRegex MultiDecoder to a SandboxDecoder using an LPeg Grammar

rafrombrc edited this page Oct 23, 2014 · 1 revision

Below is a user's configuration file that parses entries from a syslog file using a MultiDecoder setup. It extracts three unique message types and leaves everything else in the message payload. This tutorial is a result of a question posed on IRC on how to turn it into a single SandboxDecoder using LPeg. The regexs are pretty typical of what we see in the real world so they will be left as-is even though some optimizations are possible.

[LogstreamerInput]
log_directory = "/var/log"
file_match = "syslog"
decoder = "ServiceDecoder"
 
[ServiceDecoder]
type = "MultiDecoder"
subs = ['SphinxRequestDecoder', 'SphinxErrorDecoder', 'GearmanAdminDecoder', 'ElseDecoder']
cascade_strategy = "first-wins"
 
[SphinxRequestDecoder]
type = "PayloadRegexDecoder"
match_regex = '.+ (?P<Hostname>\S+) sphinx: (?P<Timestamp>.+) \[(?P<Uuid>.+)\] REQUEST: path=(?P<Path>\S+) remoteaddr=(?P<Remoteaddr>\S+) (?P<Headers>.+)'
timestamp_layout = "2006/01/02 15:04:05"
 
[SphinxRequestDecoder.message_fields]
Type = "SphinxRequest"
dsn = "{{ pillar.sentry_dsn }}"
Hostname = "%Hostname%"
Uuid = "%Uuid%"
Path = "%Path%"
Remoteaddr|ipv4 = "%remoteaddr%"
Headers = "%Headers%"
Payload = ""
 
[SphinxErrorDecoder]
type = "PayloadRegexDecoder"
match_regex = '.+ (?P<Hostname>\S+) sphinx: (?P<Timestamp>.+) \[(?P<Uuid>.+)\] ERROR: (?P<Message>.+)'
timestamp_layout = "2006/01/02 15:04:05"
 
[SphinxErrorDecoder.message_fields]
Type = "SphinxError"
dsn = "{{ pillar.sentry_dsn }}"
Hostname = "%Hostname%"
Uuid = "%Uuid%"
Message = "%Message%"
Payload = ""
 
[GearmanAdminDecoder]
type = "PayloadRegexDecoder"
match_regex = 'Job \S+ (?:finished|failed), marking complete: (?P<Data>.+)'
 
[GearmanAdminDecoder.message_fields]
Type = "GearmanAdmin"
Data|json = "%Data%"
Payload = ""
  
# Just to prevent logspam - otherwise heka outputs a log line for everything it can't match
[ElseDecoder]
type = "PayloadRegexDecoder"
match_regex = '(.*)'
  
[ElseDecoder.message_fields]
Type = "Ignore"

SphinxRequestDecoder

Sample input (we are only interested in the syslog message part): Feb 9 14:17:01 trink-x230 sphinx: 2006/01/02 15:04:05 [BD48B609-8922-4E59-A358-C242075CE088] REQUEST: path=/var/tmp remoteaddr=192.168.1.1 header data

Regex (from above): .+ (?P<Hostname>\S+) sphinx: (?P<Timestamp>.+) \[(?P<Uuid>.+)\] REQUEST: path=(?P<Path>\S+) remoteaddr=(?P<Remoteaddr>\S+) (?P<Headers>.+)

LPeg Grammar (read the asterisk as followed by)

timestamp * sp * uuid * sp * request * sp * path * sp * remoteaddr * sp * headers

timestamp

  1. Use the strftime grammar builder dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S")
  2. Convert the time to a Unix nanosecond timestamp dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S") / dt.time_to_ns
  3. Capture the value as "Timestamp" l.Cg(dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S") / dt.time_to_ns, "Timestamp")

sp

  1. l.space

uuid

  1. Define a block of four hex digits (x4) l.xdigit * l.xdigit * l.xdigit * l.xdigit
  2. Construct the UUID grammar: bracket followed by a UUID captured as "Uuid" followed by a bracket l.P"[" * l.Cg(x4 * x4 * "-" * x4 * "-" * x4 * "-" * x4 * "-" * x4 * x4 * x4, "Uuid") * "]"

request

  1. A literal "REQUEST:" followed by a constant capture setting the "Type" l.P"REQUEST:" * l.Cg(l.Cc"SphinxRequest", "Type")

path

  1. A literal 'path=' followed by 1 or more printable US ASCII values captured into "Path" l.P"path=" * l.Cg(l.R"!~"^1, "Path")

remoteaddr

  1. Use the ip_address module: ip.v4

headers

  1. Capture zero or more characters until the end of the line into "Header" l.Cg(l.P(1)^0, "Headers")

Test the grammar

http://lpeg.trink.com/share/6395413193107056095

SphinxErrorDecoder

Sample input: Feb 9 14:17:01 trink-x230 sphinx: 2006/01/02 15:04:05 [BD48B609-8922-4E59-A358-C242075CE088] ERROR: bad things happened

Regex (from above): .+ (?P<Hostname>\S+) sphinx: (?P<Timestamp>.+) \[(?P<Uuid>.+)\] ERROR: (?P<Message>.+)

LPeg Grammar

timestamp * sp * uuid * sp * err * sp * message

timestamp, sp, and uuid are the same as above (they will be turned into a 'sphinx_header' grammar)

err

  1. A literal "ERROR:" followed by a constant capture setting the "Type" l.P"ERROR:" * l.Cg(l.Cc"SphinxError", "Type")

message

  1. Capture zero or more characters until the end of the line into "Message" l.Cg(l.P(1)^0, "Message")

Test the grammar

http://lpeg.trink.com/share/12793449786352980261

GearmanAdmin

Sample input: Feb 9 14:17:01 trink-x230 sphinx: Job DoSomething finished, marking complete: 100 widgets were processed

Regex (from above): Job \S+ (?:finished|failed), marking complete: (?P<Data>.+)

LPeg Grammar

job * sp * status * data_header * sp * data

job

  1. A literal 'Job' followed by a space, followed by 1 or more non space characters with a constant capture setting the "Type" l.P"Job" * sp * (l.P(1) - sp)^1 * l.Cg(l.Cc("GearmanAdmin"), "Type")

status

  1. finished or failed l.P"finished" + "failed"

data_header

  1. A literal ', marking complete' l.P", marking complete:"

data

  1. Capture zero or more characters until the end of the line into "Data" l.Cg(l.P(1)^0, "Data")

Test the grammar

http://lpeg.trink.com/share/18152294687818003697

Putting it all together

Configuration

[SphinxDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/sphinx.lua"

	[SphinxDecoder.config]
	dsn = "{{ pillar.sentry_dsn }}"
	tz = "America/Los_Angeles"

Lua Decoder

local dt     = require "date_time"
local ip     = require "ip_address"
local l      = require 'lpeg'
local syslog = require "syslog"
l.locale(l)

local msg = {
Timestamp   = nil,
Hostname    = nil,
Payload     = nil,
Pid         = nil,
Fields      = nil
}

local dsn = read_config("dsn") or ""

local syslog_grammar = syslog.build_rsyslog_grammar("%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n")

local sp           = l.space
local timestamp    = l.Cg(dt.build_strftime_grammar("%Y/%m/%d %H:%M:%S") / dt.time_to_ns, "Timestamp")
local x4           = l.xdigit * l.xdigit * l.xdigit * l.xdigit
local uuid         = l.P"[" * l.Cg(x4 * x4 * "-" * x4 * "-" * x4 * "-" * x4 * "-" * x4 * x4 * x4, "Uuid") * "]"
local request      = l.P"REQUEST:" * l.Cg(l.Cc"SphinxRequest", "Type")
local err          = l.P"ERROR:" * l.Cg(l.Cc"SphinxError", "Type")
local path         = l.P"path=" * l.Cg(l.R"!~"^1, "Path")
local remoteaddr   = l.P"remoteaddr=" * l.Cg(l.Ct(l.Cg(ip.v4, "value") * l.Cg(l.Cc"ipv4", "representation")), "Remoteaddr")
local headers      = l.Cg(l.P(1)^0, "Headers")
local message      = l.Cg(l.P(1)^0, "Message")
local job          = l.P"Job" * sp * (l.P(1) - sp)^1 * l.Cg(l.Cc("GearmanAdmin"), "Type")
local status       = l.P"finished" + "failed"
local data_header  = l.P", marking complete:"
local data         = l.Cg(l.P(1)^0, "Data")

local sphinx_header = timestamp * sp * uuid * sp
local request_type  = request * sp * path * sp * remoteaddr * sp * headers
local error_type    = err * sp * message
local job_type      = job * sp * status * data_header * sp * data

local msg_grammar   = l.Ct((sphinx_header * (request_type + error_type)) + job_type)

function process_message ()
    local log = read_message("Payload")
    local fields = syslog_grammar:match(log)
    if not fields then return -1 end

    msg.Timestamp = fields.timestamp
    fields.timestamp = nil

    fields.programname = fields.syslogtag.programname
    msg.Pid = fields.syslogtag.pid or nil
    fields.syslogtag = nil

    msg.Hostname = fields.hostname
    fields.hostname = nil

    local m = msg_grammar:match(fields.msg)
    if m then
        msg.Type = m.Type
        msg.Payload = nil
        if m.Type == "SphinxRequest" then
            msg.Timestamp = m.Timestamp
            fields.Uuid = m.Uuid
            fields.Path = m.Path
            fields.Remoteaddr = m.RemoteAddr
            fields.Headers = m.Headers
            fields.dsn = dsn
        elseif m.Type == "SphinxError" then
            msg.Timestamp = m.Timestamp
            fields.Uuid = m.Uuid
            fields.Message = m.Message
            fields.dsn = dsn
        elseif m.Type == "GearmanAdmin" then
            fields.Data = {}
            fields.Data.value = m.Data
            fields.Data.representation = "json"
        end
    else
        msg.Type = "Ignore"
        msg.Payload = fields.msg
   end
    fields.msg = nil

    msg.Fields = fields
    inject_message(msg)
    return 0
end

Comparison

The output isn't identical but it is very close. The SandboxDecoder adds the Uuid as a field instead of in the header. Also, the SandboxDecoder always processes the syslog message variables so the 'Job' message will receive the correct timestamp and hostname.

MultiDecoder

9436 messages per second

SandboxDecoder

31257 message per second