Skip to content

FiltersProgrammable

Brian L. Troutwine edited this page Oct 5, 2017 · 1 revision

The programmable filter is a facility to doing arbitrary in-flight manipulation of telemetry and logs, programmable by end-users. The programming environment available is lua 5.3. The full API is documented below.

Configuration

The programmable filter configuration options are as follows:

  • script :: the lua program to read into cernan
  • forwards :: the filters and/or sinks to forward into [default: []]

The value for script must map to a file in the globally set scripts-directory. Read more about that here.

The forward concept is described in detail here.

You may configure multiple programmable filters.

Example

Let's say we want to write a filter to strip instance metadata from collectd metrics. Recall that collectd metric names look like so:

collectd.computer_name.interface-lo.if_errors.tx

and what we'd like to do is avoid emitting the 'computer_name' to our sinks. Enabling filters in configuration is much like enabling a sink or source.

scripts-directory = "examples/scripts/"

[sources]
  [sources.graphite.primary]
  enabled = true
  port = 2004
  forwards = ["filters.programmable.collectd_scrub"]

[filters]
  [filters.programmable.collectd_scrub]
  script = "collectd_scrub.lua"
  forwards = ["sinks.console"]

[sinks]
  [sinks.console]
  bin_width = 1

Note that scripts-directory must exist and contain the file collectd_scrub.lua. The scrub program:

count_per_tick = 0

function process_metric(pyld)
   count_per_tick = count_per_tick + 1

   local old_name = payload.metric_name(pyld, 1)
   local collectd, rest = string.match(old_name, "^(collectd)[%.@][%w_]+(.*)")
   if collectd ~= nil then
      local new_name = string.format("%s%s", collectd, rest)
      payload.set_metric_name(pyld, 1, new_name)
   end
end

function process_log(pyld)
end

function tick(pyld)
   payload.push_metric(pyld, "count_per_tick", count_per_tick)
   payload.push_log(pyld, string.format("count_per_tick: %s", count_per_tick))
   count_per_tick = 0
end

Cernan will pass every metric through the function process_metric and this program will inspect the name, extract the offending bit of metadata and reset the name in the payload metric.

There's a little more than filtering going on here. Every flush-interval a filter's tick will be called. collectd_scrub is pushing a new metric called count_per_tick into the payload, as well as a new log line.

Payloads are indexed from 1. Negative offsets are also supported, as in Lua. This is a break from similar systems like heka that index from 0. Metrics and logs are indexed separately.

API

Filters are currently programmed in Lua. The API is split between manipulation of telemetry and log lines. For historical reasons--to be resolved with postmates/cernan#184--the functions that manipulate telemetry are named metric_*. Those that manipulate log lines are named log_*.

Every filter script must have three basic functions. This is the most minimal script:

function process_metric(pyld)
end

function process_log(pyld)
end

function tick(pyld)
end

Each function is passed an opaque payload structure. This structure, in effect, is two arrays of log lines and telemetry structures. Each array can be independently indexed. The filters API indexes from one. The process_metric and process_log functions are called when a telemetry or log line structure is available for the filter. tick is called once every flush-interval. See the configuration page in this wiki for more information.

You can find example scripts in examples/scripts/ in the root of this project.

The functions available for manipulation and querying follow.

log_remove_tag

This function removes a tag--if it exists--from the referenced log line structure. In the following function we remove the tag with key "bizz" from the log line at index 1 in the payload:

function process_log(pyld)
   payload.log_remove_tag(pyld, 1, "bizz")
end

The old value will be returned if it existed, else nil.

log_set_tag

This function adds a tag to the referenced log line structure. In the following function we insert the key/value pair "bizz"/"bazz" into the log line at index 1 in the payload:

function process_log(pyld)
   payload.log_set_tag(pyld, 1, "bizz", "bazz")
end

The old value will be returned if it existed, else nil.

log_tag_value

This function returns the value of a tag in the referenced log line structure. In the following function we query for the value of key "bizz".

function process_log(pyld)
   payload.log_tag_value(pyld, 1, "bizz")
end

The value will be returned if it exists, else nil.

metric_name

This function returns the name of the referenced telemetry structure. It always returns a string.

function process_metric(pyld)
   print(payload.metric_name(pyld, 1))
end

metric_query

This function performs a percentile query against the stored samples of the telemetry structure. A number is returned if there are values in the telemetry, else nil.

function process_metric(pyld)
   print(payload.metric_query(pyld, 0.999, 1))
end

metric_remove_tag

This function removes a tag--if it exists--from the referenced telemetry structure. In the following function we remove the tag with key "bizz" from the telemetry at index 1 in the payload:

function process_metric(pyld)
   payload.metric_remove_tag(pyld, 1, "bizz")
end

The old value will be returned if it existed, else nil.

metric_set_tag

This function adds a tag to the referenced telemetry structure. In the following function we insert the key/value pair "bizz"/"bazz" into the log line at index 1 in the payload:

function process_metric(pyld)
   payload.metric_set_tag(pyld, 1, "bizz", "bazz")
end

The old value will be returned if it existed, else nil.

metric_tag_value

This function returns the value of a tag in the referenced telemetry structure. In the following function we query for the value of key "bizz".

function process_metric(pyld)
   payload.metric_tag_value(pyld, 1, "bizz")
end

The value will be returned if it exists, else nil.

metric_value

This function performs a value query against the stored samples of the telemetry structure, obeying its AggregationMethod. A number is returned if there are values in the telemetry, else nil.

function process_metric(pyld)
   print(payload.metric_value(pyld, 1))
end

push_log

This function appends a log line structure to the payload. This always succeeds and there is no return value.

count_per_tick = 0

function process_metric(pyld)
   count_per_tick = count_per_tick + 1
end

function process_log(pyld)
   count_per_tick = count_per_tick + 1
end

function tick(pyld)
   payload.push_log(pyld, string.format("count_per_tick: %s", count_per_tick))
   count_per_tick = 0
end

In the above we count the number of telemetry and log lines that come through this filter and periodically write a log line describing how many have come through.

push_metric

This function appends a telemetry structure to the payload. This always succeeds and there is no return value.

count_per_tick = 0

function process_metric(pyld)
   count_per_tick = count_per_tick + 1
end

function process_log(pyld)
   count_per_tick = count_per_tick + 1
end

function tick(pyld)
   payload.push_metric(pyld, "count_per_tick", count_per_tick)
   count_per_tick = 0
end

In the above we count the number of telemetry and log lines that come through this filter and periodically write a log line describing how many have come through.

set_metric_name

This function sets the name of the referenced telemetry structure. It always succeeds.

function process_metric(pyld)
   print(payload.set_metric_name(pyld, 1, "new_metric_name"))
end