Skip to content

Commit

Permalink
Add tests for v0.14
Browse files Browse the repository at this point in the history
  • Loading branch information
sonots committed Nov 24, 2017
1 parent 3805aa6 commit 4d6719f
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Rakefile
Expand Up @@ -4,7 +4,7 @@ require "bundler/gem_tasks"
require 'rake/testtask'
Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.pattern = 'test/**/*.rb'
test.pattern = 'test/**/test_*.rb'
test.verbose = true
test.warning = false
end
Expand Down
File renamed without changes.
4 changes: 3 additions & 1 deletion lib/fluent/plugin/out_copy_ex/v12.rb
@@ -1,9 +1,11 @@
require 'fluent/plugin/out_copy'

module Fluent
class CopyOutputEx < CopyOutput
class CopyExOutput < CopyOutput
Plugin.register_output('copy_ex', self)

attr_reader :ignore_errors

def initialize
super
@ignore_errors = []
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/plugin/out_copy_ex/v14.rb
@@ -1,9 +1,11 @@
require 'fluent/plugin/out_copy'

module Fluent::Plugin
class CopyOutputEx < CopyOutput
class CopyExOutput < CopyOutput
Fluent::Plugin.register_output('copy_ex', self)

attr_reader :ignore_errors

def initialize
super
@ignore_errors = []
Expand Down
68 changes: 23 additions & 45 deletions test/plugin/test_out_copy_ex.rb → test/out_copy_ex/v12.rb
@@ -1,10 +1,9 @@
require 'fluent/test'
require_relative '../helper'

class CopyExOutputTest < Test::Unit::TestCase
class << self
def startup
spec = Gem::Specification.find { |s| s.name == 'fluentd' }
$LOAD_PATH.unshift File.join(spec.full_gem_path, 'test', 'scripts')
$LOAD_PATH.unshift "#{Gem.loaded_specs['fluentd'].full_gem_path}/test/scripts"
require 'fluent/plugin/out_test'
end

Expand All @@ -17,10 +16,6 @@ def setup
Fluent::Test.setup
end

def config_element(name = 'test', argument = '', params = {}, elements = [])
Fluent::Config::Element.new(name, argument, params, elements)
end

CONFIG = %[
<store>
type test
Expand Down Expand Up @@ -60,9 +55,9 @@ def test_configure

outputs = d.instance.outputs
assert_equal 3, outputs.size
assert_equal Fluent::Plugin::TestOutput, outputs[0].class
assert_equal Fluent::Plugin::TestOutput, outputs[1].class
assert_equal Fluent::Plugin::TestOutput, outputs[2].class
assert_equal Fluent::TestOutput, outputs[0].class
assert_equal Fluent::TestOutput, outputs[1].class
assert_equal Fluent::TestOutput, outputs[2].class
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
Expand All @@ -88,39 +83,35 @@ def test_emit

d.instance.outputs.each {|o|
assert_equal [
[time, {"a"=>1}],
[time, {"a"=>2}],
], o.events
[time, {"a"=>1}],
[time, {"a"=>2}],
], o.events
}

d.instance.outputs.each {|o|
assert_not_nil o.router
}
end

def test_msgpack_es_emit_bug
d = Fluent::Test::OutputTestDriver.new(Fluent::CopyExOutput)
d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput)

emit_procs = []
outputs = %w(p1 p2).map do |pname|
p = Fluent::Plugin.new_output('test')
p.configure(config_element('ROOT', '', {'name' => pname}))
p.configure('name' => pname)
p.define_singleton_method(:emit) do |tag, es, chain|
es.each do |time, record|
super(tag, [[time, record]], chain)
end
end
emit_proc = if p.respond_to?(:emit_events)
Proc.new {|p, tag, es, _chain| p.emit_events(tag, es)}
else
Proc.new {|p, tag, es, _chain| p.emit(tag, es, NullOutputChain.instance)}
end
emit_procs << emit_proc
p
end

d.instance.instance_eval { @outputs = outputs }
d.instance.instance_eval { @emit_procs = emit_procs }

es = if defined?(MessagePack::Packer)
time = Time.parse("2013-05-26 06:37:22 UTC").to_i
packer = MessagePack::Packer.new
packer = Fluent::Engine.msgpack_factory.packer
packer.pack([time, {"a" => 1}])
packer.pack([time, {"a" => 2}])
Fluent::MessagePackEventStream.new(packer.to_s)
Expand All @@ -142,42 +133,30 @@ def test_msgpack_es_emit_bug
def create_event_test_driver(is_deep_copy = false)
deep_copy_config = %[
deep_copy true
]
]

output1 = Fluent::Plugin.new_output('test')
output1.configure(config_element('ROOT', '', {'name' => 'output1'}))
output1.define_singleton_method(:emit_events) do |tag, es|
output1.configure('name' => 'output1')
output1.define_singleton_method(:emit) do |tag, es, chain|
es.each do |time, record|
record['foo'] = 'bar'
super(tag, [[time, record]])
super(tag, [[time, record]], chain)
end
end
proc1 = if output1.respond_to?(:emit_events)
Proc.new {|output1, tag, es, _chain| output1.emit_events(tag, es)}
else
Proc.new {|output1, tag, es, _chain| output1.emit(tag, es, NullOutputChain.instance)}
end

output2 = Fluent::Plugin.new_output('test')
output2.configure(config_element('ROOT', '', {'name' => 'output2'}))
output2.define_singleton_method(:emit_events) do |tag, es|
output2.configure('name' => 'output2')
output2.define_singleton_method(:emit) do |tag, es, chain|
es.each do |time, record|
super(tag, [[time, record]])
super(tag, [[time, record]], chain)
end
end
proc2 = if output2.respond_to?(:emit_events)
Proc.new {|output2, tag, es, _chain| output2.emit_events(tag, es)}
else
Proc.new {|output2, tag, es, _chain| output2.emit(tag, es, NullOutputChain.instance)}
end

outputs = [output1, output2]
emit_procs = [proc1, proc2]

d = Fluent::Test::OutputTestDriver.new(Fluent::CopyExOutput)
d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput)
d = d.configure(deep_copy_config) if is_deep_copy
d.instance.instance_eval { @outputs = outputs }
d.instance.instance_eval { @emit_procs = emit_procs }
d
end

Expand Down Expand Up @@ -241,4 +220,3 @@ def test_ignore_error
assert_nothing_raised { d.emit({"a"=>1}, time) }
end
end

203 changes: 203 additions & 0 deletions test/out_copy_ex/v14.rb
@@ -0,0 +1,203 @@
require_relative '../helper'
require "fluent/test/driver/multi_output"

class CopyExOutputTest < Test::Unit::TestCase
class << self
def startup
$LOAD_PATH.unshift "#{Gem.loaded_specs['fluentd'].full_gem_path}/test/scripts"
require 'fluent/plugin/out_test'
require 'fluent/plugin/out_test2'
end

def shutdown
$LOAD_PATH.shift
end
end

def setup
Fluent::Test.setup
end

def config_element(name = 'test', argument = '', params = {}, elements = [])
Fluent::Config::Element.new(name, argument, params, elements)
end

CONFIG = %[
<store>
@type test
name c0
</store>
<store>
@type test2
name c1
</store>
<store>
@type test
name c2
</store>
]

IGNORE_ERROR_CONFIG = %[
<store ignore_error>
@type test
name c0
</store>
<store ignore_error>
@type test
name c1
</store>
<store>
@type test
name c2
</store>
]

def create_driver(conf = CONFIG)
Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyExOutput).configure(conf)
end

def test_configure
d = create_driver

outputs = d.instance.outputs
assert_equal 3, outputs.size
assert_equal Fluent::Plugin::TestOutput, outputs[0].class
assert_equal Fluent::Plugin::Test2Output, outputs[1].class
assert_equal Fluent::Plugin::TestOutput, outputs[2].class
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
end

def test_configure_ignore_error
d = create_driver(IGNORE_ERROR_CONFIG)

outputs = d.instance.outputs
ignore_errors = d.instance.ignore_errors
assert_equal outputs.size, ignore_errors.size
assert_equal true, ignore_errors[0]
assert_equal true, ignore_errors[1]
assert_equal false, ignore_errors[2]
end

def test_feed_events
d = create_driver

assert !d.instance.outputs[0].has_router?
assert_not_nil d.instance.outputs[1].router
assert !d.instance.outputs[2].has_router?

time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test') do
d.feed(time, {"a" => 1})
d.feed(time, {"a" => 2})
end

d.instance.outputs.each {|o|
assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end

def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream
d = create_driver

time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
source = Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"a" => 2}] ])
es = Fluent::MessagePackEventStream.new(source.to_msgpack_stream)

d.run(default_tag: 'test') do
d.feed(es)
end

d.instance.outputs.each { |o|
assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end

def create_event_test_driver(does_deep_copy = false)
config = %[
deep_copy #{does_deep_copy}
<store>
@type test
name output1
</store>
<store>
@type test
name output2
</store>
]

d = Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(config)
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
es.each do |time, record|
record['foo'] = 'bar'
end
super(tag, es)
end
d
end

time = Fluent::EventTime.parse("2013-05-26 06:37:22 UTC")
mes0 = Fluent::MultiEventStream.new
mes0.add(time, {"a" => 1})
mes0.add(time, {"b" => 1})
mes1 = Fluent::MultiEventStream.new
mes1.add(time, {"a" => 1})
mes1.add(time, {"b" => 1})

data(
"OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})],
"OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})],
"ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
"ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
"MultiEventStream without deep_copy" => [false, mes0],
"MultiEventStream with deep_copy" => [true, mes1],
)
def test_deep_copy_controls_shallow_or_deep_copied(data)
does_deep_copy, es = data

d = create_event_test_driver(does_deep_copy)

d.run(default_tag: 'test') do
d.feed(es)
end

events = d.instance.outputs.map(&:events)

if does_deep_copy
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last

assert{ record0.object_id != record1.object_id }
assert_equal "bar", record0["foo"]
assert !record1.has_key?("foo")
end
else
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last

assert{ record0.object_id == record1.object_id }
assert_equal "bar", record0["foo"]
assert_equal "bar", record1["foo"]
end
end
end

def test_ignore_error
d = create_driver(IGNORE_ERROR_CONFIG)

# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end
end
9 changes: 9 additions & 0 deletions test/test_out_copy_ex.rb
@@ -0,0 +1,9 @@
require_relative "helper"

require 'fluent/version'
major, minor, patch = Fluent::VERSION.split('.').map(&:to_i)
if major > 0 || (major == 0 && minor >= 14)
require_relative 'out_copy_ex/v14'
else
require_relative 'out_copy_ex/v12'
end

0 comments on commit 4d6719f

Please sign in to comment.