This repository has been archived by the owner on Apr 2, 2024. It is now read-only.
/
stat_graph.lua
152 lines (126 loc) · 5.43 KB
/
stat_graph.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
--[[
Converts stat values extracted from statmetric messages (see
:ref:`config_stat_accum_input`) to circular buffer data and periodically emits
messages containing this data to be graphed by a DashboardOutput. Note that
this filter expects the stats data to be available in the message fields, so
the StatAccumInput *must* be configured with `emit_in_fields` set to true for
this filter to work correctly.
Config:
- title (string, optional, default "Stats"):
Title for the graph output generated by this filter.
- rows (uint, optional, default 300):
The number of rows to store in our circular buffer. Each row represents
one time interval.
- sec_per_row (uint, optional, default 1):
The number of seconds in each circular buffer time interval.
- stats (string):
Space separated list of stat names. Each specified stat will be expected
to be found in the fields of the received statmetric messages, and will be
extracted and inserted into its own column in the accumulated circular
buffer.
- stat_labels (string):
Space separated list of header label names to use for the extracted stats.
Must be in the same order as the specified stats. Any label longer than 15
characters will be truncated.
- anomaly_config (string, optional):
Anomaly detection configuration, see :ref:`sandbox_anomaly_module`.
- preservation_version (uint, optional, default 0):
If `preserve_data = true` is set in the SandboxFilter configuration, then
this value should be incremented every time any edits are made to your
`rows`, `sec_per_row`, `stats`, or `stat_labels` values, or else Heka
will fail to start because the preserved data will no longer match the
filter's data structure.
- stat_aggregation (string, optional, default "sum"):
Controls how the column data is aggregated when combining multiple circular buffers.
"sum" - The total is computed for the time/column (default).
"min" - The smallest value is retained for the time/column.
"max" - The largest value is retained for the time/column.
"none" - No aggregation will be performed the column.
- stat_unit (string, optional, default "count"):
The unit of measure (maximum 7 characters). Alpha numeric, '/', and '*'
characters are allowed everything else will be converted to underscores.
i.e. KiB, Hz, m/s (default: count).
*Example Heka Configuration*
.. code-block:: ini
[stat-graph]
type = "SandboxFilter"
filename = "lua_filters/stat_graph.lua"
ticker_interval = 10
preserve_data = true
message_matcher = "Type == 'heka.statmetric'"
[stat-graph.config]
title = "Hits and Misses"
rows = 1440
stat_aggregation = "none"
stat_unit = "count"
sec_per_row = 10
stats = "stats.counters.hits.count stats.counters.misses.count"
stat_labels = "hits misses"
anomaly_config = 'roc("Hits and Misses", 1, 15, 0, 1.5, true, false) roc("Hits and Misses", 2, 15, 0, 1.5, true, false)'
preservation_version = 0
--]]
_PRESERVATION_VERSION = read_config("preservation_version") or 0
require("circular_buffer")
require("string")
local alert = require "alert"
local annotation = require "annotation"
local anomaly = require "anomaly"
local title = read_config("title") or "Stats"
local rows = read_config("rows") or 300
local sec_per_row = read_config("sec_per_row") or 1
local stats_config = read_config("stats") or error("stats configuration must be specified")
local stat_labels_config = read_config("stat_labels") or error("stat_labels configuration must be specified")
local stat_aggregation = read_config("stat_aggregation") or "sum"
local stat_unit = read_config("stat_unit") or "count"
local anomaly_config = anomaly.parse_config(read_config("anomaly_config"))
annotation.set_prune(title, rows * sec_per_row * 1e9)
local stats = {}
local i = 1
for stat in string.gmatch(stats_config, "%S+") do
stats[i] = stat
i = i + 1
end
stats_config = nil
local stat_labels = {}
i = 1
for stat_label in string.gmatch(stat_labels_config, "%S+") do
stat_labels[i] = stat_label
i = i + 1
end
stat_labels_config = nil
if #stats ~= #stat_labels then error("stats and stat_labels configuration must have the same number of items") end
cbuf = circular_buffer.new(rows, #stats, sec_per_row)
local field_names = {}
for i, stat_name in pairs(stats) do
cbuf:set_header(i, stat_labels[i], stat_unit, stat_aggregation)
field_names[i] = string.format("Fields[%s]", stat_name)
end
function process_message()
local ts = read_message("Fields[timestamp]")
if type(ts) ~= "number" then return -1 end
ts = ts * 1e9
for i, stat_name in pairs(stats) do
local stat_value = read_message(field_names[i])
if type(stat_value) == "number" then
cbuf:add(ts, i, stat_value)
end
end
return 0
end
function timer_event(ns)
if anomaly_config then
if not alert.throttled(ns) then
local msg, annos = anomaly.detect(ns, title, cbuf, anomaly_config)
if msg then
annotation.concat(title, annos)
alert.send(ns, msg)
end
end
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf)
else
inject_payload("cbuf", title, cbuf)
end
end