/
throttle.rb
320 lines (284 loc) · 12.4 KB
/
throttle.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
require "logstash/filters/base"
require "logstash/namespace"
require "thread_safe"
require "atomic"
# The throttle filter is for throttling the number of events. The filter is
# configured with a lower bound, the "before_count", and upper bound, the "after_count",
# and a period of time. All events passing through the filter will be counted based on
# their key and the event timestamp. As long as the count is less than the "before_count"
# or greater than the "after_count", the event will be "throttled" which means the filter
# will be considered successful and any tags or fields will be added (or removed).
#
# The plugin is thread-safe and properly tracks past events.
#
# For example, if you wanted to throttle events so you only receive an event after 2
# occurrences and you get no more than 3 in 10 minutes, you would use the configuration:
# [source,ruby]
# period => 600
# max_age => 1200
# before_count => 3
# after_count => 5
#
# Which would result in:
# ==========================
# event 1 - throttled (successful filter, period start)
# event 2 - throttled (successful filter)
# event 3 - not throttled
# event 4 - not throttled
# event 5 - not throttled
# event 6 - throttled (successful filter)
# event 7 - throttled (successful filter)
# event x - throttled (successful filter)
# period end
# event 1 - throttled (successful filter, period start)
# event 2 - throttled (successful filter)
# event 3 - not throttled
# event 4 - not throttled
# event 5 - not throttled
# event 6 - throttled (successful filter)
# ...
# ==========================
# Another example is if you wanted to throttle events so you only
# receive 1 event per hour, you would use the configuration:
# [source,ruby]
# period => 3600
# max_age => 7200
# before_count => -1
# after_count => 1
#
# Which would result in:
# ==========================
# event 1 - not throttled (period start)
# event 2 - throttled (successful filter)
# event 3 - throttled (successful filter)
# event 4 - throttled (successful filter)
# event x - throttled (successful filter)
# period end
# event 1 - not throttled (period start)
# event 2 - throttled (successful filter)
# event 3 - throttled (successful filter)
# event 4 - throttled (successful filter)
# ...
# ==========================
# A common use case would be to use the throttle filter to throttle events before 3 and
# after 5 while using multiple fields for the key and then use the drop filter to remove
# throttled events. This configuration might appear as:
# [source,ruby]
# filter {
# throttle {
# before_count => 3
# after_count => 5
# period => 3600
# max_age => 7200
# key => "%{host}%{message}"
# add_tag => "throttled"
# }
# if "throttled" in [tags] {
# drop { }
# }
# }
#
# Another case would be to store all events, but only email non-throttled events
# so the op's inbox isn't flooded with emails in the event of a system error.
# This configuration might appear as:
# [source,ruby]
# filter {
# throttle {
# before_count => 3
# after_count => 5
# period => 3600
# max_age => 7200
# key => "%{message}"
# add_tag => "throttled"
# }
# }
# output {
# if "throttled" not in [tags] {
# email {
# from => "logstash@mycompany.com"
# subject => "Production System Alert"
# to => "ops@mycompany.com"
# via => "sendmail"
# body => "Alert on %{host} from path %{path}:\n\n%{message}"
# options => { "location" => "/usr/sbin/sendmail" }
# }
# }
# elasticsearch_http {
# host => "localhost"
# port => "19200"
# }
# }
#
# When an event is received, the event key is stored in a key_cache. The key references
# a timeslot_cache. The event is allocated to a timeslot (created dynamically) based on
# the timestamp of the event. The timeslot counter is incremented. When the next event is
# received (same key), within the same "period", it is allocated to the same timeslot.
# The timeslot counter is incremented once again.
#
# The timeslot expires if the maximum age has been exceeded. The age is calculated
# based on the latest event timestamp and the max_age configuration option.
#
# ---[::.. DESIGN ..::]---
#
# +- [key_cache] -+ +-- [timeslot_cache] --+
# | | | @created: 1439839636 |
# | @latest: 1439839836 |
# [a.b.c] => +----------------------+
# | [1439839636] => 1 |
# | [1439839736] => 3 |
# | [1439839836] => 2 |
# +----------------------+
#
# +-- [timeslot_cache] --+
# | @created: eeeeeeeeee |
# | @latest: llllllllll |
# [x.y.z] => +----------------------+
# | [0000000060] => x |
# | [0000000120] => y |
# | | | [..........] => N |
# +---------------+ +----------------------+
#
# Frank de Jong (@frapex)
# Mike Pilone (@mikepilone)
#
class ThreadSafe::TimeslotCache < ThreadSafe::Cache
attr_reader :created
def initialize(epoch, options = nil, &block)
@created = epoch
@latest = Atomic.new(epoch)
super(options, &block)
end
def latest
@latest.value
end
def latest=(val)
# only update if greater than current
@latest.update { |v| v = (val > v) ? val : v }
end
end
class LogStash::Filters::Throttle < LogStash::Filters::Base
# The name to use in configuration files.
config_name "throttle"
# The memory control mechanism automatically ajusts the maximum age
# of a timeslot based on the maximum number of counters.
MC_MIN_PCT = 5 # Lower bound percentage.
MC_MAX_PCT = 100 # Upper bound percentage.
MC_INCR_PCT = 80 # Increase if total below percentage.
MC_STEP_PCT = 5 # Increase/decrease by this percentage at a time.
# Call the filter flush method at regular interval. It is used by the memory
# control mechanism. Set to false if you like your VM to go (B)OOM.
config :periodic_flush, :validate => :boolean, :default => true
# The key used to identify events. Events with the same key are grouped together.
# Field substitutions are allowed, so you can combine multiple fields.
config :key, :validate => :string, :required => true
# Events less than this count will be throttled. Setting this value to -1, the
# default, will cause no events to be throttled based on the lower bound.
config :before_count, :validate => :number, :default => -1, :required => false
# Events greater than this count will be throttled. Setting this value to -1, the
# default, will cause no events to be throttled based on the upper bound.
config :after_count, :validate => :number, :default => -1, :required => false
# The period in seconds after the first occurrence of an event until a new timeslot
# is created. This period is tracked per unique key and per timeslot.
# Field substitutions are allowed in this value. This allows you to specify that
# certain kinds of events throttle for a specific period of time.
config :period, :validate => :string, :default => "60", :required => false
# The maximum age of a timeslot. Higher values allow better tracking of an asynchronous
# flow of events, but require more memory. As a rule of thumb you should set this value
# to at least twice the period. Or set this value to period + maximum time offset
# between unordered events with the same key. Values below the specified period give
# unexpected results if unordered events are processed simultaneously.
config :max_age, :validate => :number, :default => 3600, :required => false
# The maximum number of counters to store before decreasing the maximum age of a timeslot.
# Setting this value to -1 will prevent an upper bound with no constraint on the
# number of counters. This configuration value should only be used as a memory
# control mechanism and can cause early counter expiration if the value is reached.
# It is recommended to leave the default value and ensure that your key is selected
# such that it limits the number of counters required (i.e. don't use UUID as the key).
config :max_counters, :validate => :number, :default => 100000, :required => false
# performs initialization of the filter
public
def register
@key_cache = ThreadSafe::Cache.new
@max_age_orig = @max_age
end # def register
# filters the event
public
def filter(event)
key = event.sprintf(@key) # substitute field
period = event.sprintf(@period).to_i # substitute period
period = 60 if period == 0 # fallback if unparsable
epoch = event.timestamp.to_i # event epoch time
while true
# initialise timeslot cache (if required)
@key_cache.compute_if_absent(key) { ThreadSafe::TimeslotCache.new(epoch) }
timeslot_cache = @key_cache[key] # try to get timeslot cache
break unless timeslot_cache.nil? # retry until succesful
@logger.warn? and @logger.warn(
"filters/#{self.class.name}: timeslot cache disappeared, increase max_counters to prevent this.")
end
timeslot_cache.latest = epoch # update to latest epoch
# find target timeslot
timeslot_key = epoch - (epoch - timeslot_cache.created) % period
while true
# initialise timeslot and counter (if required)
timeslot_cache.compute_if_absent(timeslot_key) { Atomic.new(0) }
timeslot = timeslot_cache[timeslot_key] # try to get timeslot
break unless timeslot.nil? # retry until succesful
@logger.warn? and @logger.warn(
"filters/#{self.class.name}: timeslot disappeared, increase max_age to prevent this.")
end
timeslot.update { |v| v + 1 } # increment counter
count = timeslot.value # get latest counter value
@logger.debug? and @logger.debug(
"filters/#{self.class.name}: counter incremented",
{ key: key, epoch: epoch, timeslot: timeslot_key, count: count }
)
# throttle event if counter value not in range
if ((@before_count != -1 && count < @before_count) ||
(@after_count != -1 && count > @after_count))
@logger.debug? and @logger.debug(
"filters/#{self.class.name}: throttling event",
{ key: key, epoch: epoch }
)
filter_matched(event)
end
# Delete expired timeslots older than the latest. Do not use variable
# timeslot_cache.latest for this. If used, it might delete the latest timeslot.
latest_timeslot = timeslot_cache.keys.max || 0
timeslot_cache.each_key { |key| timeslot_cache.delete(key) if key < (latest_timeslot - @max_age) }
end # def filter
public
def flush(options = {})
max_latest = 0 # get maximum epoch
@key_cache.each_value { |tc| max_latest = tc.latest if tc.latest > max_latest }
total_counters = 0
@key_cache.each_pair do |key,timeslot_cache|
if timeslot_cache.latest < max_latest - @max_age
@key_cache.delete(key) # delete expired timeslot cache
else
total_counters += timeslot_cache.size # get total number of counters
end
end
@logger.debug? and @logger.debug(
"filters/#{self.class.name}: statistics",
{ total_counters: total_counters, max_age: @max_age }
)
# memory control mechanism
if @max_counters != -1
over_limit = total_counters - @max_counters
# decrease max age of timeslot cache by x percent
if (over_limit > 0) && (@max_age > @max_age_orig * MC_MIN_PCT / 100)
@max_age -= @max_age_orig * MC_STEP_PCT / 100
@logger.warn? and @logger.warn(
"filters/#{self.class.name}: Decreased timeslot max_age to #{@max_age} because " +
"max_counters exceeded by #{over_limit}. Use a better key to prevent too many unique event counters.")
# increase max age of timeslot cache by x percent
elsif (@max_age < @max_age_orig * MC_MAX_PCT / 100) && (total_counters < (@max_counters * MC_INCR_PCT / 100))
@max_age += @max_age_orig * MC_STEP_PCT / 100
@logger.warn? and @logger.warn(
"filters/#{self.class.name}: Increased timeslot max_age to #{@max_age} because max_counters no longer exceeded.")
end
end
return
end # def flush
end # class LogStash::Filters::Throttle