/
metrics.rb
257 lines (229 loc) · 8.16 KB
/
metrics.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# encoding: utf-8
require "securerandom"
require "logstash/filters/base"
require "logstash/namespace"
# The metrics filter is useful for aggregating metrics.
#
# IMPORTANT: Elasticsearch 2.0 no longer allows field names with dots. Version 3.0
# of the metrics filter plugin changes behavior to use nested fields rather than
# dotted notation to avoid colliding with versions of Elasticsearch 2.0+. Please
# note the changes in the documentation (underscores and sub-fields used).
#
# For example, if you have a field 'response' that is
# a http response code, and you want to count each
# kind of response, you can do this:
# [source,ruby]
# filter {
# metrics {
# meter => [ "http_%{response}" ]
# add_tag => "metric"
# }
# }
#
# Metrics are flushed every 5 seconds by default or according to
# 'flush_interval'. Metrics appear as
# new events in the event stream and go through any filters
# that occur after as well as outputs.
#
# In general, you will want to add a tag to your metrics and have an output
# explicitly look for that tag.
#
# The event that is flushed will include every 'meter' and 'timer'
# metric in the following way:
#
# #### 'meter' values
#
# For a `meter => "something"` you will receive the following fields:
#
# * "[thing][count]" - the total count of events
# * "[thing][rate_1m]" - the 1-minute rate (sliding)
# * "[thing][rate_5m]" - the 5-minute rate (sliding)
# * "[thing][rate_15m]" - the 15-minute rate (sliding)
#
# #### 'timer' values
#
# For a `timer => [ "thing", "%{duration}" ]` you will receive the following fields:
#
# * "[thing][count]" - the total count of events
# * "[thing][rate_1m]" - the 1-minute rate of events (sliding)
# * "[thing][rate_5m]" - the 5-minute rate of events (sliding)
# * "[thing][rate_15m]" - the 15-minute rate of events (sliding)
# * "[thing][min]" - the minimum value seen for this metric
# * "[thing][max]" - the maximum value seen for this metric
# * "[thing][stddev]" - the standard deviation for this metric
# * "[thing][mean]" - the mean for this metric
# * "[thing][pXX]" - the XXth percentile for this metric (see `percentiles`)
#
# #### Example: computing event rate
#
# For a simple example, let's track how many events per second are running
# through logstash:
# [source,ruby]
# ----
# input {
# generator {
# type => "generated"
# }
# }
#
# filter {
# if [type] == "generated" {
# metrics {
# meter => "events"
# add_tag => "metric"
# }
# }
# }
#
# output {
# # only emit events with the 'metric' tag
# if "metric" in [tags] {
# stdout {
# codec => line {
# format => "rate: %{[events][rate_1m]}"
# }
# }
# }
# }
# ----
#
# Running the above:
# [source,ruby]
# % bin/logstash -f example.conf
# rate: 23721.983566819246
# rate: 24811.395722536377
# rate: 25875.892745934525
# rate: 26836.42375967113
#
# We see the output includes our 'events' 1-minute rate.
#
# In the real world, you would emit this to graphite or another metrics store,
# like so:
# [source,ruby]
# output {
# graphite {
# metrics => [ "events.rate_1m", "%{[events][rate_1m]}" ]
# }
# }
class LogStash::Filters::Metrics < LogStash::Filters::Base
config_name "metrics"
# syntax: `meter => [ "name of metric", "name of metric" ]`
config :meter, :validate => :array, :default => []
# syntax: `timer => [ "name of metric", "%{time_value}" ]`
config :timer, :validate => :hash, :default => {}
# Don't track events that have @timestamp older than some number of seconds.
#
# This is useful if you want to only include events that are near real-time
# in your metrics.
#
# Example, to only count events that are within 10 seconds of real-time, you
# would do this:
#
# filter {
# metrics {
# meter => [ "hits" ]
# ignore_older_than => 10
# }
# }
config :ignore_older_than, :validate => :number, :default => 0
# The flush interval, when the metrics event is created. Must be a multiple of 5s.
config :flush_interval, :validate => :number, :default => 5
# The clear interval, when all counter are reset.
#
# If set to -1, the default value, the metrics will never be cleared.
# Otherwise, should be a multiple of 5s.
config :clear_interval, :validate => :number, :default => -1
# The rates that should be measured, in minutes.
# Possible values are 1, 5, and 15.
config :rates, :validate => :array, :default => [1, 5, 15]
# The percentiles that should be measured
config :percentiles, :validate => :array, :default => [1, 5, 10, 90, 95, 99, 100]
def register
require "metriks"
require "socket"
require "atomic"
require "thread_safe"
@last_flush = Atomic.new(0) # how many seconds ago the metrics where flushed.
@last_clear = Atomic.new(0) # how many seconds ago the metrics where cleared.
@random_key_preffix = SecureRandom.hex
unless (@rates - [1, 5, 15]).empty?
raise LogStash::ConfigurationError, "Invalid rates configuration. possible rates are 1, 5, 15. Rates: #{rates}."
end
@metric_meters = ThreadSafe::Cache.new { |h,k| h[k] = Metriks.meter metric_key(k) }
@metric_timers = ThreadSafe::Cache.new { |h,k| h[k] = Metriks.timer metric_key(k) }
end # def register
def filter(event)
# TODO(piavlo): This should probably be moved to base filter class.
if @ignore_older_than > 0 && Time.now - event.timestamp.time > @ignore_older_than
@logger.debug("Skipping metriks for old event", :event => event)
return
end
@meter.each do |m|
@metric_meters[event.sprintf(m)].mark
end
@timer.each do |name, value|
@metric_timers[event.sprintf(name)].update(event.sprintf(value).to_f)
end
end # def filter
def flush(options = {})
# Add 5 seconds to @last_flush and @last_clear counters
# since this method is called every 5 seconds.
@last_flush.update { |v| v + 5 }
@last_clear.update { |v| v + 5 }
# Do nothing if there's nothing to do ;)
return unless should_flush?
event = LogStash::Event.new
event.set("message", Socket.gethostname)
@metric_meters.each_pair do |name, metric|
flush_rates event, name, metric
metric.clear if should_clear?
end
@metric_timers.each_pair do |name, metric|
flush_rates event, name, metric
# These 4 values are not sliding, so they probably are not useful.
event.set("[#{name}][min]", metric.min)
event.set("[#{name}][max]", metric.max)
# timer's stddev currently returns variance, fix it.
event.set("[#{name}][stddev]", metric.stddev ** 0.5)
event.set("[#{name}][mean]", metric.mean)
@percentiles.each do |percentile|
event.set("[#{name}][p#{percentile}]", metric.snapshot.value(percentile / 100.0))
end
metric.clear if should_clear?
end
# Reset counter since metrics were flushed
@last_flush.value = 0
if should_clear?
#Reset counter since metrics were cleared
@last_clear.value = 0
@metric_meters.clear
@metric_timers.clear
end
filter_matched(event)
return [event]
end
# this is a temporary fix to enable periodic flushes without using the plugin config:
# config :periodic_flush, :validate => :boolean, :default => true
# because this is not optional here and should not be configurable.
# this is until we refactor the periodic_flush mechanism per
# https://github.com/elasticsearch/logstash/issues/1839
def periodic_flush
true
end
private
def flush_rates(event, name, metric)
event.set("[#{name}][count]", metric.count)
event.set("[#{name}][rate_1m]", metric.one_minute_rate) if @rates.include? 1
event.set("[#{name}][rate_5m]", metric.five_minute_rate) if @rates.include? 5
event.set("[#{name}][rate_15m]", metric.fifteen_minute_rate) if @rates.include? 15
end
def metric_key(key)
"#{@random_key_preffix}_#{key}"
end
def should_flush?
@last_flush.value >= @flush_interval && (!@metric_meters.empty? || !@metric_timers.empty?)
end
def should_clear?
@clear_interval > 0 && @last_clear.value >= @clear_interval
end
end # class LogStash::Filters::Metrics