Skip to content

Commit

Permalink
Merge pull request #8 from cosmo0920/add-filter-plugin
Browse files Browse the repository at this point in the history
Add filter plugin
  • Loading branch information
kentaro committed Nov 9, 2016
2 parents bb334cd + 4e494c6 commit 1dddbdb
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 81 deletions.
49 changes: 49 additions & 0 deletions README.md
Expand Up @@ -220,6 +220,55 @@ will be replaced by the second rule as usual.
{ "path" => "/baz" }
```

### RewriteFilter

Filter plugin to modify messages' values along with pattern
matching and filter them.

Note that filter version of rewrite plugin does not have append/add tags functionality.

Thus, this filter version does not able to specify `append_to_tag`, `tag`, and `fallback` rules.

## Synopsis

```
<filter apache.log.**>
type rewrite
<rule>
key path
pattern \\?.+$
replace
</rule>
<rule>
key path
pattern (/[^/]+)\\?([^=]+)=(\\d)
replace \\1/\\2/\\3
</rule>
<rule>
key status
pattern ^500$
ignore true
</rule>
</match>
```

## Configuration

Note: This filter version of rewrite plugin does not have `remove_prefix` and `add_prefix` configuration.

### rule: replace

Same as OutputRewrite section's [rule: replace](#rule-replace).

### rule: ignore

Same as OutputRewrite section's [rule: ignore](#rule-ignore).

### rule: last

Same as OutputRewrite section's [rule: last](#rule-last).

## Installation

Add this line to your application's Gemfile:
Expand Down
26 changes: 26 additions & 0 deletions lib/fluent/plugin/filter_rewrite.rb
@@ -0,0 +1,26 @@
module Fluent
class RewriteFilter < Filter
Fluent::Plugin.register_filter('rewrite', self)

attr_reader :rewrite_rule

def configure(conf)
require 'fluent/plugin/rewrite_rule'

super

@rewrite_rule = RewriteRule.new(self, conf)
end

def filter_stream(tag, es)
new_es = MultiEventStream.new

es.each do |time, record|
record = @rewrite_rule.rewrite(record)
new_es.add(time, record) if record
end

new_es
end
end if defined?(Filter)
end
66 changes: 5 additions & 61 deletions lib/fluent/plugin/out_rewrite.rb
Expand Up @@ -11,9 +11,11 @@ class RewriteOutput < Output
config_param :add_prefix, :string, :default => nil
config_param :enable_warnings, :bool, :default => false

attr_reader :rules
attr_reader :rewrite_rule

def configure(conf)
require 'fluent/plugin/rewrite_rule'

super

if @remove_prefix
Expand All @@ -24,15 +26,7 @@ def configure(conf)
@added_prefix_string = @add_prefix + '.'
end

@rules = conf.elements.select {|element| element.name == 'rule' }.map do |element|
rule = {}
element.keys.each do |key|
# read and throw away to supress unread configuration warning
rule[key] = element[key]
end
rule["regex"] = Regexp.new(element["pattern"]) if element.has_key?("pattern")
rule
end
@rewrite_rule = RewriteRule.new(self, conf)
end

def start
Expand All @@ -56,7 +50,7 @@ def emit(tag, es, chain)
end

es.each do |time, record|
filtered_tag, record = rewrite(tag, record)
filtered_tag, record = @rewrite_rule.rewrite(tag, record)
if filtered_tag && record && _tag != filtered_tag
router.emit(filtered_tag, time, record)
else
Expand All @@ -68,55 +62,5 @@ def emit(tag, es, chain)

chain.next
end

def rewrite(tag, record)
rules.each do |rule|
tag, record, last = apply_rule(rule, tag, record)

break if last
return if !tag && !record
end

[tag, record]
end

def apply_rule(rule, tag, record)
tag_prefix = tag && tag.length > 0 ? "." : ""
key = rule["key"]
pattern = rule["pattern"]
last = nil

return [tag, record] if !key || !record.has_key?(key)
return [tag, record] unless pattern

if matched = record[key].match(rule["regex"])
return if rule["ignore"]

if rule["replace"]
replace = rule["replace"]
record[key] = record[key].gsub(rule["regex"], replace)
end

if rule["append_to_tag"]
if rule["tag"]
tag += (tag_prefix + rule["tag"])
else
matched.captures.each do |m|
tag += (tag_prefix + "#{m}")
end
end
end

if rule["last"]
last = true
end
else
if rule["append_to_tag"] && rule["fallback"]
tag += (tag_prefix + rule["fallback"])
end
end

[tag, record, last]
end
end
end
73 changes: 73 additions & 0 deletions lib/fluent/plugin/rewrite_rule.rb
@@ -0,0 +1,73 @@
module Fluent
class RewriteRule
attr_reader :rules

def initialize(plugin, conf)
@plugin = plugin
@rules = conf.elements.select {|element| element.name == 'rule' }.map do |element|
rule = {}
element.keys.each do |key|
# read and throw away to supress unread configuration warning
rule[key] = element[key]
end
rule["regex"] = Regexp.new(element["pattern"]) if element.has_key?("pattern")
rule
end
end

def rewrite(tag=nil, record)
@rules.each do |rule|
tag, record, last = apply_rule(rule, tag, record)

break if last
if @plugin.is_a?(Fluent::Output)
return if !tag && !record
else
return if !record
end
end

return [record] if not @plugin.is_a?(Fluent::Output)
return [tag, record] if @plugin.is_a?(Fluent::Output)
end

def apply_rule(rule, tag=nil, record)
tag_prefix = tag && tag.length > 0 ? "." : ""
key = rule["key"]
pattern = rule["pattern"]
last = nil

return [tag, record] if !key || !record.has_key?(key)
return [tag, record] unless pattern

if matched = record[key].match(rule["regex"])
return if rule["ignore"]

if rule["replace"]
replace = rule["replace"]
record[key] = record[key].gsub(rule["regex"], replace)
end

if rule["append_to_tag"] && @plugin.is_a?(Fluent::Output)
if rule["tag"]
tag += (tag_prefix + rule["tag"])
else
matched.captures.each do |m|
tag += (tag_prefix + "#{m}")
end
end
end

if rule["last"]
last = true
end
else
if rule["append_to_tag"] && rule["fallback"] && @plugin.is_a?(Fluent::Output)
tag += (tag_prefix + rule["fallback"])
end
end

return [tag, record, last]
end
end
end

0 comments on commit 1dddbdb

Please sign in to comment.