Skip to content

Commit

Permalink
remove_tag_slice and aggregate out_tag
Browse files Browse the repository at this point in the history
  • Loading branch information
sonots committed Aug 6, 2014
1 parent f41f87f commit 5183f21
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 24 deletions.
18 changes: 18 additions & 0 deletions README.md
Expand Up @@ -126,6 +126,24 @@ Then, output bocomes as belows (indented). You can see the `message` field is jo

Remove tag suffix for output message

* remove_tag_slice *min..max*

Remove tag parts by slice function. FYI: This option behaves like `tag.split('.').slice(min..max)`.

For example,

remove_tag_slice 0..-2

changes an input tag `foo.bar.host1` to `foo.bar`.

* aggregate

Aggregation unit. One of `all`, `in_tag`, `out_tag` can be specified. Default is `all`.

* `all` counts summation for all input messages and emit one message in each interval.
* `in_tag` counts summation for each input tag seperately.
* `out_tag` counts summation for each tag *modified* by `add_tag_prefix`, `remove_tag_prefix`, or `remove_tag_slice`.

- delimiter

Output matched messages after `join`ed with the specified delimiter.
Expand Down
84 changes: 60 additions & 24 deletions lib/fluent/plugin/out_grepcounter.rb
Expand Up @@ -32,6 +32,7 @@ def initialize
config_param :remove_tag_prefix, :string, :default => nil
config_param :add_tag_suffix, :string, :default => nil
config_param :remove_tag_suffix, :string, :default => nil
config_param :remove_tag_slice, :string, :default => nil
config_param :output_with_joined_delimiter, :string, :default => nil # obsolete
config_param :delimiter, :string, :default => nil
config_param :aggregate, :string, :default => 'tag'
Expand Down Expand Up @@ -94,19 +95,22 @@ def configure(conf)
end
end

if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil?
if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? and @remove_tag_slice.nil?
@add_tag_prefix = 'count' # not ConfigError to support lower version compatibility
end
@tag_proc = tag_proc

case @aggregate
when 'all'
raise Fluent::ConfigError, "grepcounter: `tag` must be specified with aggregate all" if @tag.nil?
when 'tag'
# raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil?
when 'tag' # obsolete
@aggregate = 'in_tag'
when 'in_tag'
when 'out_tag'
else
raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all"
raise Fluent::ConfigError, "grepcounter: aggregate allows all/in_tag/out_tag"
end
@aggregate_proc = aggregate_proc(@tag_proc)

if @store_file
f = Pathname.new(@store_file)
Expand Down Expand Up @@ -156,12 +160,14 @@ def emit(tag, es, chain)
count += 1
end
end

aggregate_key = @aggregate_proc.call(tag)
# thread safe merge
@counts[tag] ||= 0
@matches[tag] ||= []
@counts[aggregate_key] ||= 0
@matches[aggregate_key] ||= []
@mutex.synchronize do
@counts[tag] += count
@matches[tag] += matches
@counts[aggregate_key] += count
@matches[aggregate_key] += matches
end

chain.next
Expand Down Expand Up @@ -192,22 +198,29 @@ def flush_emit(step)
time = Fluent::Engine.now
flushed_counts, flushed_matches, @counts, @matches = @counts, @matches, {}, {}

if @aggregate == 'all'
count = 0; matches = []
flushed_counts.keys.each do |tag|
count += flushed_counts[tag]
matches += flushed_matches[tag]
end
case @aggregate
when 'all'
count = flushed_counts[:all]
matches = flushed_matches[:all]
output = generate_output(count, matches)
Fluent::Engine.emit(@tag, time, output) if output
else
when 'out_tag'
flushed_counts.keys.each do |out_tag|
count = flushed_counts[out_tag]
matches = flushed_matches[out_tag]
output = generate_output(count, matches)
if output
Fluent::Engine.emit(out_tag, time, output)
end
end
else # in_tag
flushed_counts.keys.each do |tag|
count = flushed_counts[tag]
matches = flushed_matches[tag]
output = generate_output(count, matches, tag)
if output
emit_tag = @tag_proc.call(tag)
Fluent::Engine.emit(emit_tag, time, output)
out_tag = @tag_proc.call(tag)
Fluent::Engine.emit(out_tag, time, output)
end
end
end
Expand All @@ -234,24 +247,47 @@ def generate_output(count, matches, tag = nil)
output
end

def aggregate_proc(tag_proc)
case @aggregate
when 'all'
Proc.new {|tag| :all }
when 'in_tag'
Proc.new {|tag| tag }
when 'out_tag'
Proc.new {|tag| tag_proc.call(tag) }
end
end

def tag_proc
tag_slice_proc =
if @remove_tag_slice
lindex, rindex = @remove_tag_slice.split('..', 2)
if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
raise Fluent::ConfigError, "out_grepcounter: remove_tag_slice must be formatted like [num]..[num]"
end
l, r = lindex.to_i, rindex.to_i
Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') }
else
Proc.new {|tag| tag }
end

rstrip = Proc.new {|str, substr| str.chomp(substr) }
lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str }
tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix
tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix
tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix
tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix
tag_fixed = @tag if @tag
if tag_fixed
Proc.new {|tag| tag_fixed }
elsif tag_prefix_match and tag_suffix_match
Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag, tag_prefix_match), tag_suffix_match)}#{tag_suffix}" }
if tag_prefix_match and tag_suffix_match
Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag_slice_proc.call(tag), tag_prefix_match), tag_suffix_match)}#{tag_suffix}" }
elsif tag_prefix_match
Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag, tag_prefix_match)}#{tag_suffix}" }
Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag_slice_proc.call(tag), tag_prefix_match)}#{tag_suffix}" }
elsif tag_suffix_match
Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag, tag_suffix_match)}#{tag_suffix}" }
Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag_slice_proc.call(tag), tag_suffix_match)}#{tag_suffix}" }
elsif tag_prefix || @remove_tag_slice || tag_suffix
Proc.new {|tag| "#{tag_prefix}#{tag_slice_proc.call(tag)}#{tag_suffix}" }
else
Proc.new {|tag| "#{tag_prefix}#{tag}#{tag_suffix}" }
Proc.new {|tag| tag_fixed }
end
end

Expand Down
49 changes: 49 additions & 0 deletions spec/out_grepcounter_spec.rb
Expand Up @@ -340,6 +340,16 @@ def delete!(key)
it { emit }
end

context 'remove_tag_slice' do
let(:config) { CONFIG + %[remove_tag_slice 0..-2] }
let(:tag) { 'syslog.host1' }
before do
allow(Fluent::Engine).to receive(:now).and_return(time)
expect(Fluent::Engine).to receive(:emit).with("syslog", time, expected)
end
it { emit }
end

context 'all tag options' do
let(:config) { CONFIG + %[
add_tag_prefix foo
Expand Down Expand Up @@ -439,6 +449,45 @@ def delete!(key)
it { emit }
end

context 'aggregate in_tag' do
let(:messages) { ['foobar', 'foobar'] }
let(:emit) do
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } }
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } }
driver.instance.flush_emit(0)
end

let(:config) { CONFIG + %[aggregate tag \n remove_tag_slice 0..-2] }
before do
allow(Fluent::Engine).to receive(:now).and_return(time)
expect(Fluent::Engine).to receive(:emit).with("foo", time, {
"count"=>2, "message"=>["foobar", "foobar"], "input_tag"=>"foo.bar", "input_tag_last"=>"bar"
})
expect(Fluent::Engine).to receive(:emit).with("foo", time, {
"count"=>2, "message"=>["foobar", "foobar"], "input_tag"=>"foo.bar2", "input_tag_last"=>"bar2"
})
end
it { emit }
end

context 'aggregate out_tag' do
let(:messages) { ['foobar', 'foobar'] }
let(:emit) do
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar') } }
driver.run { messages.each {|message| driver.emit_with_tag({'message' => message}, time, 'foo.bar2') } }
driver.instance.flush_emit(0)
end

let(:config) { CONFIG + %[aggregate out_tag \n remove_tag_slice 0..-2] }
before do
allow(Fluent::Engine).to receive(:now).and_return(time)
expect(Fluent::Engine).to receive(:emit).with("foo", time, {
"count"=>4, "message"=>["foobar", "foobar", "foobar", "foobar"]
})
end
it { emit }
end

context 'replace_invalid_sequence' do
let(:config) { CONFIG + %[regexp WARN \n replace_invalid_sequence true] }
let(:messages) { [ "\xff".force_encoding('UTF-8') ] }
Expand Down

0 comments on commit 5183f21

Please sign in to comment.