Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttle groups #14

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ When a group reaches its limit and as long as it is not reset, a warning
message with the current log rate of the group is emitted repeatedly. This is
the delay between every repetition.

#### group\_override

Allows overriding the defaults per group. To use this value, configure a JSON
object with the group name value as the object name, and the above group_*
parameters as entries. For example:
```
group_override {"group_bucket_1": {
"group_bucket_period_s": 1,
"group_bucket_limit": 7,
"group_drop_logs": true
}}
```
This will configure an override for a group name of `group_bucket_1` with
different throttling limits than others.

## License

Apache License, Version 2.0
Expand Down
120 changes: 84 additions & 36 deletions lib/fluent/plugin/filter_throttle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,86 @@ class ThrottleFilter < Filter
DESC
config_param :group_warning_delay_s, :integer, :default => 10

desc <<~DESC
Override the default rate limit for a specific group. Example hash:
{"group_key1_value,group_key2_value": { # comma separated if multiple group_key values are given
"group_bucket_period_s": 60, # Remaining values match the default value names
"group_bucket_limit": 10000,
"group_drop_logs": true}
}
DESC
config_param :group_override, :hash, :default => {}

BucketConfig = Struct.new(
:period_s,
:limit,
:drop_logs,
:rate_limit,
:gc_timeout_s,
:reset_rate_s,
:warning_delay_s)


Group = Struct.new(
:rate_count,
:rate_last_reset,
:aprox_rate,
:bucket_count,
:bucket_last_reset,
:last_warning)
:last_warning,
:config)

def create_override_bucket_configs(config_hash)
config_hash.each do |group_key, config|
group_key_value = group_key.split(',')
period = config.fetch("group_bucket_period_s", @group_bucket_period_s)
limit = config.fetch("group_bucket_limit", @group_bucket_limit)
drop_logs = config.fetch("group_drop_logs", @group_drop_logs)
rate_limit = (limit / period)
gc_timeout_s = 2 * period
reset_rate_s = config.fetch("group_reset_rate_s", rate_limit)
warning_delay_s = config.fetch("group_warning_delay_s", @group_warning_delay_s)
b = BucketConfig.new(period, limit, drop_logs, rate_limit, gc_timeout_s, reset_rate_s, warning_delay_s)
@bucket_configs[group_key_value] = b
end
end

def validate_bucket(n, b)
raise "#{n} period_s must be > 0" unless b.period_s > 0
raise "#{n} limit must be > 0" unless b.limit > 0
raise "#{n} reset_rate_s must be > -1" unless b.reset_rate_s >= -1
raise "#{n} reset_rate_s must be > limit \\ period_s" unless b.reset_rate_s <= b.rate_limit
raise "#{n} warning_delay_s must be >= 1" unless b.warning_delay_s >= 1
end

def configure(conf)
super

@group_key_paths = group_key.map { |key| key.split(".") }

raise "group_bucket_period_s must be > 0" \
unless @group_bucket_period_s > 0

@group_gc_timeout_s = 2 * @group_bucket_period_s

raise "group_bucket_limit must be > 0" \
unless @group_bucket_limit > 0

@group_rate_limit = (@group_bucket_limit / @group_bucket_period_s)

@group_reset_rate_s = @group_rate_limit \
if @group_reset_rate_s == nil
# Set up default bucket & calculate derived values
default_rate_limit = (@group_bucket_limit / @group_bucket_period_s)
default_reset_rate_s = @group_reset_rate_s.nil? ? default_rate_limit : @group_reset_rate_s
default_gc_timeout_s = 2 * @group_bucket_period_s
default_bucket_config = BucketConfig.new(
@group_bucket_period_s,
@group_bucket_limit,
@group_drop_logs,
default_rate_limit,
default_gc_timeout_s,
default_reset_rate_s,
@group_warning_delay_s)

validate_bucket("default", default_bucket_config)
@bucket_configs = Hash.new(default_bucket_config)
# Parse override configs and add to bucket_configs
create_override_bucket_configs(@group_override)

# Make sure the config for each bucket are valid
@bucket_configs.each do |key_path, config|
validate_bucket(key_path, config)
end

raise "group_reset_rate_s must be >= -1" \
unless @group_reset_rate_s >= -1
raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \
unless @group_reset_rate_s <= @group_rate_limit

raise "group_warning_delay_s must be >= 1" \
unless @group_warning_delay_s >= 1
@group_key_paths = group_key.map { |key| key.split(".") }
end

def start
Expand All @@ -83,12 +130,13 @@ def shutdown

def filter(tag, time, record)
now = Time.now
rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
group = extract_group(record)

# Ruby hashes are ordered by insertion.
bucket_config = @bucket_configs[group]
rate_limit_exceeded = bucket_config.drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record

# Ruby hashes are ordered by insertion.
# Deleting and inserting moves the item to the end of the hash (most recently used)
counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil)
counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil, bucket_config)

counter.rate_count += 1
since_last_rate_reset = now - counter.rate_last_reset
Expand All @@ -101,17 +149,17 @@ def filter(tag, time, record)

# try to evict the least recently used group
lru_group, lru_counter = @counters.first
if !lru_group.nil? && now - lru_counter.rate_last_reset > @group_gc_timeout_s
if !lru_group.nil? && now - lru_counter.rate_last_reset > counter.config.gc_timeout_s
@counters.delete(lru_group)
end

if (now.to_i / @group_bucket_period_s) \
> (counter.bucket_last_reset.to_i / @group_bucket_period_s)
if (now.to_i / counter.config.period_s) \
> (counter.bucket_last_reset.to_i / counter.config.period_s)
# next time period reached.

# wait until rate drops back down (if enabled).
if counter.bucket_count == -1 and @group_reset_rate_s != -1
if counter.aprox_rate < @group_reset_rate_s
if counter.bucket_count == -1 and counter.config.reset_rate_s != -1
if counter.aprox_rate < counter.config.reset_rate_s
log_rate_back_down(now, group, counter)
else
log_rate_limit_exceeded(now, group, counter)
Expand All @@ -133,7 +181,7 @@ def filter(tag, time, record)
counter.bucket_count += 1

# if we are out of credit, we drop logs for the rest of the time period.
if counter.bucket_count > @group_bucket_limit
if counter.bucket_count > counter.config.limit
log_rate_limit_exceeded(now, group, counter)
counter.bucket_count = -1
return rate_limit_exceeded
Expand All @@ -152,7 +200,7 @@ def extract_group(record)

def log_rate_limit_exceeded(now, group, counter)
emit = counter.last_warning == nil ? true \
: (now - counter.last_warning) >= @group_warning_delay_s
: (now - counter.last_warning) >= counter.config.warning_delay_s
if emit
log.warn("rate exceeded", log_items(now, group, counter))
counter.last_warning = now
Expand All @@ -171,10 +219,10 @@ def log_items(now, group, counter)

{'group_key': group,
'rate_s': rate,
'period_s': @group_bucket_period_s,
'limit': @group_bucket_limit,
'rate_limit_s': @group_rate_limit,
'reset_rate_s': @group_reset_rate_s}
'period_s': counter.config.period_s,
'limit': counter.config.limit,
'rate_limit_s': counter.config.rate_limit,
'reset_rate_s': counter.config.reset_rate_s}
end
end
end
54 changes: 54 additions & 0 deletions test/fluent/plugin/filter_throttle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,44 @@ def create_driver(conf='')
assert_equal(5, groups["b"].size)
end

it 'rejects override configurations with invalid values' do
assert_raises { create_driver <<~CONF
group_key "group"
group_bucket_period_s 1
group_bucket_limit 5
group_override {"group_bucket_1":{
"group_bucket_period_s": -1,
"group_bucket_limit": 7,
"group_drop_logs": true
}}
CONF
}
end

it 'throttles with different rates in override configs' do
driver = create_driver <<~CONF
group_key "group"
group_bucket_period_s 1
group_bucket_limit 5
group_override {"group_bucket_1":{
"group_bucket_period_s": 1,
"group_bucket_limit": 7,
"group_drop_logs": true
}}
CONF

driver.run(default_tag: "test") do
driver.feed([[event_time, {"msg": "test", "group": "a"}]] * 10)
driver.feed([[event_time, {"msg": "test", "group": "b"}]] * 10)
driver.feed([[event_time, {"msg": "test", "group": "group_bucket_1"}]] * 10)
end

groups = driver.filtered_records.group_by { |r| r[:group] }
assert_equal(5, groups["a"].size)
assert_equal(5, groups["b"].size)
assert_equal(7, groups["group_bucket_1"].size)
end

it 'allows composite group keys' do
driver = create_driver <<~CONF
group_key "group1,group2"
Expand Down Expand Up @@ -117,6 +155,22 @@ def create_driver(conf='')
], messages_per_minute
end

it 'removes lru groups after 2*period' do
driver = create_driver <<~CONF
group_key "group"
group_bucket_period_s 2
group_bucket_limit 6
group_reset_rate_s 2
CONF

driver.run(default_tag: "test") do
Time.stubs(now: Time.at(1))
driver.feed([[event_time, {"msg": "test", "group": "a"}]] * 2)
Time.stubs(now: Time.at(10))
driver.feed([[event_time, {"msg": "test", "group": "b"}]] * 2)
end
# TODO: Figure out how to assert the group was removed from the private variable
end

it 'does not throttle when in log only mode' do
driver = create_driver <<~CONF
Expand Down