Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: muddydixon/fluent-plugin-datacalculator
base: 8f4d3a1dfc
...
head fork: muddydixon/fluent-plugin-datacalculator
compare: c7438f0fb4
  • 2 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
24 example.conf
View
@@ -1,15 +1,29 @@
<source>
type forward
</source>
-<match payment.**>
+
+<match payment.quest>
+ type datacalculator
+ tag result.quest
+ count_interval 5s
+ aggregate keys area_id, mission_id
+ formulas sum = amount * price, cnt = 1, total = amount
+ finalizer ave = cnt > 0 ? 1.00 * sum / cnt : 0
+ <unmatched>
+ type file
+ path unmatched
+ </unmatched>
+</match>
+
+<match payment.shop>
type datacalculator
- tag result
- count_interval 5s
+ tag result.shop
+ count_interval 5s
aggregate all
formulas sum = amount * price, cnt = 1, total = amount
- finalizer ave = cnt > 0 ? 1.00 * sum / cnt : 0
+ finalizer ave = cnt > 0 ? 1.00 * sum / cnt : 0
</match>
-<match result>
+<match result.**>
type stdout
</match>
8 example.json
View
@@ -0,0 +1,8 @@
+{"area_id": 1, "mission_id":1, "amount": 3, "price": 100}
+{"area_id": 2, "mission_id":2, "amount": 2, "price": 200}
+{"area_id": 3, "mission_id":1, "amount": 3, "price": 100}
+{"area_id": 4, "mission_id":1, "amount": 4, "price": 300}
+{"area_id": 5, "mission_id":2, "amount": 5, "price": 200}
+{"area_id": 1, "mission_id":1, "amount": 1, "price": 400}
+{"area_id": 4, "mission_id":1, "amount": 2, "price": 200}
+{"area_id": 3, "mission_id":2, "amount": 1, "price": 300}
2  fluent-plugin-datacalculator.gemspec
View
@@ -3,7 +3,7 @@ $:.push File.expand_path("../lib", __FILE__)
Gem::Specification.new do |s|
s.name = "fluent-plugin-datacalculator"
- s.version = "0.0.1"
+ s.version = "0.0.2"
s.authors = ["Muddy Dixon"]
s.email = ["muddydixon@gmail.com"]
s.homepage = "https://github.com/muddydixon/fluent-plugin-datacalculator"
138 lib/fluent/plugin/out_datacalculator.rb
View
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
class Fluent::DataCalculatorOutput < Fluent::Output
Fluent::Plugin.register_output('datacalculator', self)
@@ -8,11 +9,11 @@ class Fluent::DataCalculatorOutput < Fluent::Output
config_param :input_tag_remove_prefix, :string, :default => nil
config_param :formulas, :string
config_param :finalizer, :string, :default => nil
- config_param :outcast_unmatched, :bool, :default => false
attr_accessor :tick
attr_accessor :counts
attr_accessor :last_checked
+ attr_accessor :aggregate_keys
attr_accessor :_formulas
attr_accessor :_finalizer
@@ -30,19 +31,47 @@ def configure(conf)
raise RuntimeError, "@unit must be one of minute/hour/day"
end
end
+
+
+ conf.elements.each do |element|
+ element.keys.each do |k|
+ element[k]
+ end
+
+ case element.name
+ when 'unmatched'
+ @unmatched = element
+ end
+ end
+ # TODO: unmatchedの時に別のタグを付けて、ふってあげないと行けない気がする
+ # unmatchedの定義
+ # 1. aggregate_keys を持たないレコードが入ってきた時
+ # 2. fomulaで必要な要素がなかったレコードが入ってきた時
+ # 3. fomulaで集計可能な数値でない場合(文字列や真偽値、正規表現、ハッシュ、配列など)
+ @aggregate_keys = []
@aggregate = case @aggregate
when 'tag' then :tag
when 'all' then :all
else
- raise Fluent::ConfigError, "flowcounter aggregate allows tag/all"
+ if @aggregate.index('keys') == 0
+ @aggregate_keys = @aggregate.split(/\s/, 2)[1]
+ unless @aggregate_keys
+ raise Fluent::ConfigError, "aggregate_keys require in keys"
+ end
+ @aggregate_keys = @aggregate_keys.split(/\s*,\s*/)
+ @aggregate = 'keys'
+ else
+ raise Fluent::ConfigError, "flowcounter aggregate allows tag/all"
+ end
end
def createFunc (cnt, str)
str.strip!
left, right = str.split(/\s*=\s*/, 2)
- rights = right.scan(/[a-zA-Z][\w\d_\.\$]*/).uniq
-
+ # Fluent moduleだけはOK
+ rights = right.scan(/[a-zA-Z][\w\d_\.\$\:\@]*/).uniq.select{|x| x.index('Fluent') != 0}
+
begin
f = eval('lambda {|'+rights.join(',')+'| '+right + '}')
rescue SyntaxError
@@ -57,7 +86,7 @@ def execFunc (tag, obj, argv, formula)
tag = stripped_tag (tag)
end
_argv = []
-
+
argv.each {|arg|
if tag != nil and tag != 'all'
arg = tag + '_' + arg
@@ -67,11 +96,11 @@ def execFunc (tag, obj, argv, formula)
formula.call(*_argv)
end
- @_formulas = [[0, 'unmatched', nil, nil]]
+ @_formulas = []
if conf.has_key?('formulas')
fs = conf['formulas'].split(/\s*,\s*/)
fs.each_with_index { |str,i |
- @_formulas.push( createFunc(i + 1, str) )
+ @_formulas.push( createFunc(i, str) )
}
end
@@ -129,7 +158,7 @@ def countups(tag, counts)
if @aggregate == :all
tag = 'all'
end
-
+
@mutex.synchronize {
@counts[tag] ||= [0] * @_formulas.length
counts.each_with_index do |count, i|
@@ -146,14 +175,8 @@ def stripped_tag(tag)
end
def generate_output(counts, step)
- output = {}
if @aggregate == :all
- # index 0 is unmatched
- sum = if @outcast_unmatched
- counts['all'][1..-1].inject(:+)
- else
- counts['all'].inject(:+)
- end
+ output = {}
counts['all'].each_with_index do |count,i|
name = @_formulas[i][1]
output[name] = count
@@ -163,16 +186,37 @@ def generate_output(counts, step)
output[@_finalizer[1]] = execFunc('all', output, @_finalizer[2], @_finalizer[3])
end
- return output
+ return [output]
end
+ if @aggregate == 'keys'
+ outputs = []
+
+ counts.keys.each do |pat|
+ output = {}
+ pat_val = pat.split('_').map{|x| x.to_i }
+ counts[pat].each_with_index do |count, i|
+ name = @_formulas[i][1]
+ output[name] = count
+ end
+
+ @aggregate_keys.each_with_index do |key, i|
+ output[@aggregate_keys[i]] = pat_val[i]
+ end
+
+ if @_finalizer
+ output[@_finalizer[1]] = execFunc('all', output, @_finalizer[2], @_finalizer[3])
+ end
+
+ outputs.push(output)
+ end
+
+ return outputs
+ end
+
+ output = {}
counts.keys.each do |tag|
t = stripped_tag(tag)
- sum = if @outcast_unmatched
- counts[tag][1..-1].inject(:+)
- else
- counts[tag].inject(:+)
- end
counts[tag].each_with_index do |count,i|
name = @_formulas[i][1]
output[t + '_' + name] = count
@@ -181,16 +225,19 @@ def generate_output(counts, step)
output[t + '_' + @_finalizer[1]] = execFunc(tag, output, @_finalizer[2], @_finalizer[3])
end
end
- output
+ [output]
end
def flush(step)
- flushed,@counts = @counts,count_initialized(@counts.keys.dup)
+ flushed, @counts = @counts,count_initialized(@counts.keys.dup)
generate_output(flushed, step)
end
def flush_emit(step)
- Fluent::Engine.emit(@tag, Fluent::Engine.now, flush(step))
+ data = flush(step)
+ data.each do |dat|
+ Fluent::Engine.emit(@tag, Fluent::Engine.now, dat)
+ end
end
def start_watch
@@ -220,9 +267,45 @@ def checkArgs (obj, inkeys)
return true
end
- def emit(tag, es, chain)
- c = [0] * @_formulas.length
+ def emit (tag, es, chain)
+
+ if @aggregate == 'keys'
+ emit_aggregate_keys(tag, es, chain)
+ else
+ emit_single_tag(tag, es, chain)
+ end
+ end
+
+ def emit_aggregate_keys (tag, es, chain)
+ cs = {}
+ es.each do |time, record|
+ matched = false
+ pat = @aggregate_keys.map{ |key| record[key] }.join('_')
+ cs[pat] = [0] * @_formulas.length unless cs.has_key?(pat)
+ if @_formulas.length > 0
+ @_formulas.each do | index, outkey, inkeys, formula|
+ next unless formula and checkArgs(record, inkeys)
+
+ cs[pat][index] += execFunc('all', record, inkeys, formula)
+ matched = true
+ end
+ else
+ $log.warn index
+ end
+ cs[pat][0] += 1 unless matched
+ end
+
+ cs.keys.each do |pat|
+ countups(pat, cs[pat])
+ end
+
+ chain.next
+ end
+
+ def emit_single_tag (tag, es, chain)
+ c = [0] * @_formulas.length
+
es.each do |time,record|
matched = false
if @_formulas.length > 0
@@ -230,13 +313,14 @@ def emit(tag, es, chain)
next unless formula and checkArgs(record, inkeys)
c[index] += execFunc(nil, record, inkeys, formula)
- matched = true
+ matched = true
end
else
$log.warn index
end
c[0] += 1 unless matched
end
+
countups(tag, c)
chain.next
88 test/plugin/test_out_datacalculator.rb
View
@@ -28,6 +28,12 @@ def test_configure
formulas sum = 10 ab
]
}
+ # aggregateに必要な要素がない
+ assert_raise(Fluent::ConfigError) {
+ d = create_driver %[
+ aggregate keys
+ ]
+ }
# finalizerに必要な要素がない
assert_raise(Fluent::ConfigError) {
d = create_driver %[
@@ -51,7 +57,6 @@ def test_configure
assert_nil d.instance.input_tag_remove_prefix
assert_equal 'sum = amount * price, cnt = amount', d.instance.formulas
assert_equal 'ave = cnt > 0 ? sum / cnt : 0', d.instance.finalizer
- assert_equal false, d.instance.outcast_unmatched
end
@@ -60,7 +65,7 @@ def test_count_initialized
aggregate all
formulas sum = amount * price, cnt = amount
]
- assert_equal [0,0,0], d.instance.counts['all']
+ assert_equal [0,0], d.instance.counts['all']
end
def test_create_formula
@@ -69,25 +74,22 @@ def test_create_formula
formulas sum = amount * price, cnt = amount
]
assert_equal 0, d.instance._formulas[0][0]
- assert_equal 'unmatched', d.instance._formulas[0][1]
- assert_equal nil, d.instance._formulas[0][2]
+ assert_equal 'sum', d.instance._formulas[0][1]
+ assert_equal ['amount', 'price'], d.instance._formulas[0][2]
assert_equal 1, d.instance._formulas[1][0]
- assert_equal 'sum', d.instance._formulas[1][1]
- assert_equal ['amount', 'price'], d.instance._formulas[1][2]
- assert_equal 2, d.instance._formulas[2][0]
- assert_equal 'cnt', d.instance._formulas[2][1]
- assert_equal ['amount'], d.instance._formulas[2][2]
+ assert_equal 'cnt', d.instance._formulas[1][1]
+ assert_equal ['amount'], d.instance._formulas[1][2]
end
def test_countups
d = create_driver
assert_nil d.instance.counts['test.input']
- d.instance.countups('test.input', [0, 0, 0, 0])
- assert_equal [0,0,0,0], d.instance.counts['test.input']
- d.instance.countups('test.input', [1, 1, 1, 0])
- assert_equal [1,1,1,0], d.instance.counts['test.input']
- d.instance.countups('test.input', [0, 5, 1, 0])
- assert_equal [1,6,2,0], d.instance.counts['test.input']
+ d.instance.countups('test.input', [0, 0, 0])
+ assert_equal [0,0,0], d.instance.counts['test.input']
+ d.instance.countups('test.input', [1, 1, 0])
+ assert_equal [1,1,0], d.instance.counts['test.input']
+ d.instance.countups('test.input', [5, 1, 0])
+ assert_equal [6,2,0], d.instance.counts['test.input']
end
def test_stripped_tag
@@ -97,23 +99,29 @@ def test_stripped_tag
assert_equal 'input', d.instance.stripped_tag('input')
end
+ def test_aggregate_keys
+ d = create_driver %[
+ aggregate keys area_id, mission_id
+ formulas sum = amount * price, cnt = amount
+ ]
+ assert_equal 'keys', d.instance.aggregate
+ assert_equal ['area_id', 'mission_id'], d.instance.aggregate_keys
+ end
+
def test_generate_output
d = create_driver
- r1 = d.instance.generate_output({'test.input' => [60,240,120,180], 'test.input2' => [0,600,0,0]}, 60)
+ r1 = d.instance.generate_output({'test.input' => [240,120,180], 'test.input2' => [600,0,0]}, 60)[0]
- assert_equal 60, r1['input_unmatched']
assert_equal 240, r1['input_sum']
assert_equal 120, r1['input_amounts']
assert_equal 180, r1['input_record']
assert_equal 2, r1['input_ave']
- assert_equal 0, r1['input2_unmatched']
assert_equal 600, r1['input2_sum']
assert_equal 0, r1['input2_amounts']
assert_equal 0, r1['input2_record']
assert_equal 0, r1['input2_ave']
-
d = create_driver %[
aggregate all
input_tag_remove_prefix test
@@ -121,12 +129,12 @@ def test_generate_output
finalizer ave = amounts > 0 ? sum / amounts : 0
]
- r2 = d.instance.generate_output({'all' => [60,240,120,180]}, 60)
- assert_equal 60, r2['unmatched']
+ r2 = d.instance.generate_output({'all' => [240,120,180]}, 60)[0]
assert_equal 240, r2['sum']
assert_equal 120, r2['amounts']
assert_equal 180, r2['record']
assert_equal 2, r2['ave']
+
end
def test_emit
@@ -139,8 +147,7 @@ def test_emit
d1.emit({'amount' => 10, 'price' => 100})
end
end
- r1 = d1.instance.flush(60)
- assert_equal 0, r1['input_unmatched']
+ r1 = d1.instance.flush(60)[0]
assert_equal 132000, r1['input_sum']
assert_equal 1320, r1['input_amounts']
assert_equal 240, r1['input_record']
@@ -162,11 +169,42 @@ def test_emit
d2.emit({'amount' => 10, 'price' => 100})
end
end
- r2 = d2.instance.flush(60)
- assert_equal 0, r2['unmatched']
+ r2 = d2.instance.flush(60)[0]
assert_equal 132000, r2['sum']
assert_equal 1320, r2['amounts']
assert_equal 240, r2['record']
assert_equal 100.0, r2['ave']
+
+ d3 = create_driver(%[
+ unit minute
+ aggregate keys area_id, mission_id
+ formulas sum = amount * price, count = 1
+ <unmatched>
+ type stdout
+ </unmatched>
+ ], 'test.input3')
+
+ sums = {}
+ counts = {}
+ d3.run do
+ 240.times do
+ area_id = rand(5)
+ mission_id = rand(5)
+ amount = rand(10)
+ price = rand(5) * 100
+ pat = [area_id, mission_id].join(',')
+ d3.emit({'amount' => amount, 'price' => price, 'area_id' => area_id, 'mission_id' => mission_id})
+ sums[pat] = 0 unless sums.has_key?(pat)
+ counts[pat] = 0 unless counts.has_key?(pat)
+ sums[pat] += amount * price
+ counts[pat] += 1
+ end
+ end
+ r3 = d3.instance.flush(60)
+ r3.each do |r|
+ pat = [r['area_id'], r['mission_id']].join(',')
+ assert_equal sums[pat], r['sum']
+ assert_equal counts[pat], r['count']
+ end
end
end

No commit comments for this range

Something went wrong with that request. Please try again.