From 4d6719ffe0be7ffa0619695d7c478782c95587de Mon Sep 17 00:00:00 2001 From: sonots Date: Fri, 24 Nov 2017 22:18:00 +0900 Subject: [PATCH] Add tests for v0.14 --- Rakefile | 2 +- ...ble_stdout.conf => stdout_and_stdout.conf} | 0 lib/fluent/plugin/out_copy_ex/v12.rb | 4 +- lib/fluent/plugin/out_copy_ex/v14.rb | 4 +- .../v12.rb} | 68 ++---- test/out_copy_ex/v14.rb | 203 ++++++++++++++++++ test/test_out_copy_ex.rb | 9 + 7 files changed, 242 insertions(+), 48 deletions(-) rename example/{double_stdout.conf => stdout_and_stdout.conf} (100%) rename test/{plugin/test_out_copy_ex.rb => out_copy_ex/v12.rb} (69%) create mode 100644 test/out_copy_ex/v14.rb create mode 100644 test/test_out_copy_ex.rb diff --git a/Rakefile b/Rakefile index eee31c4..2031c1d 100644 --- a/Rakefile +++ b/Rakefile @@ -4,7 +4,7 @@ require "bundler/gem_tasks" require 'rake/testtask' Rake::TestTask.new(:test) do |test| test.libs << 'lib' << 'test' - test.pattern = 'test/**/*.rb' + test.pattern = 'test/**/test_*.rb' test.verbose = true test.warning = false end diff --git a/example/double_stdout.conf b/example/stdout_and_stdout.conf similarity index 100% rename from example/double_stdout.conf rename to example/stdout_and_stdout.conf diff --git a/lib/fluent/plugin/out_copy_ex/v12.rb b/lib/fluent/plugin/out_copy_ex/v12.rb index 297a5d1..245869f 100644 --- a/lib/fluent/plugin/out_copy_ex/v12.rb +++ b/lib/fluent/plugin/out_copy_ex/v12.rb @@ -1,9 +1,11 @@ require 'fluent/plugin/out_copy' module Fluent - class CopyOutputEx < CopyOutput + class CopyExOutput < CopyOutput Plugin.register_output('copy_ex', self) + attr_reader :ignore_errors + def initialize super @ignore_errors = [] diff --git a/lib/fluent/plugin/out_copy_ex/v14.rb b/lib/fluent/plugin/out_copy_ex/v14.rb index 9806ac2..b30ffb1 100644 --- a/lib/fluent/plugin/out_copy_ex/v14.rb +++ b/lib/fluent/plugin/out_copy_ex/v14.rb @@ -1,9 +1,11 @@ require 'fluent/plugin/out_copy' module Fluent::Plugin - class CopyOutputEx < CopyOutput + class CopyExOutput < CopyOutput Fluent::Plugin.register_output('copy_ex', self) + attr_reader :ignore_errors + def initialize super @ignore_errors = [] diff --git a/test/plugin/test_out_copy_ex.rb b/test/out_copy_ex/v12.rb similarity index 69% rename from test/plugin/test_out_copy_ex.rb rename to test/out_copy_ex/v12.rb index 2e0c44c..45f7803 100644 --- a/test/plugin/test_out_copy_ex.rb +++ b/test/out_copy_ex/v12.rb @@ -1,10 +1,9 @@ -require 'fluent/test' +require_relative '../helper' class CopyExOutputTest < Test::Unit::TestCase class << self def startup - spec = Gem::Specification.find { |s| s.name == 'fluentd' } - $LOAD_PATH.unshift File.join(spec.full_gem_path, 'test', 'scripts') + $LOAD_PATH.unshift "#{Gem.loaded_specs['fluentd'].full_gem_path}/test/scripts" require 'fluent/plugin/out_test' end @@ -17,10 +16,6 @@ def setup Fluent::Test.setup end - def config_element(name = 'test', argument = '', params = {}, elements = []) - Fluent::Config::Element.new(name, argument, params, elements) - end - CONFIG = %[ type test @@ -60,9 +55,9 @@ def test_configure outputs = d.instance.outputs assert_equal 3, outputs.size - assert_equal Fluent::Plugin::TestOutput, outputs[0].class - assert_equal Fluent::Plugin::TestOutput, outputs[1].class - assert_equal Fluent::Plugin::TestOutput, outputs[2].class + assert_equal Fluent::TestOutput, outputs[0].class + assert_equal Fluent::TestOutput, outputs[1].class + assert_equal Fluent::TestOutput, outputs[2].class assert_equal "c0", outputs[0].name assert_equal "c1", outputs[1].name assert_equal "c2", outputs[2].name @@ -88,39 +83,35 @@ def test_emit d.instance.outputs.each {|o| assert_equal [ - [time, {"a"=>1}], - [time, {"a"=>2}], - ], o.events + [time, {"a"=>1}], + [time, {"a"=>2}], + ], o.events + } + + d.instance.outputs.each {|o| + assert_not_nil o.router } end def test_msgpack_es_emit_bug - d = Fluent::Test::OutputTestDriver.new(Fluent::CopyExOutput) + d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput) - emit_procs = [] outputs = %w(p1 p2).map do |pname| p = Fluent::Plugin.new_output('test') - p.configure(config_element('ROOT', '', {'name' => pname})) + p.configure('name' => pname) p.define_singleton_method(:emit) do |tag, es, chain| es.each do |time, record| super(tag, [[time, record]], chain) end end - emit_proc = if p.respond_to?(:emit_events) - Proc.new {|p, tag, es, _chain| p.emit_events(tag, es)} - else - Proc.new {|p, tag, es, _chain| p.emit(tag, es, NullOutputChain.instance)} - end - emit_procs << emit_proc p end d.instance.instance_eval { @outputs = outputs } - d.instance.instance_eval { @emit_procs = emit_procs } es = if defined?(MessagePack::Packer) time = Time.parse("2013-05-26 06:37:22 UTC").to_i - packer = MessagePack::Packer.new + packer = Fluent::Engine.msgpack_factory.packer packer.pack([time, {"a" => 1}]) packer.pack([time, {"a" => 2}]) Fluent::MessagePackEventStream.new(packer.to_s) @@ -142,42 +133,30 @@ def test_msgpack_es_emit_bug def create_event_test_driver(is_deep_copy = false) deep_copy_config = %[ deep_copy true -] + ] output1 = Fluent::Plugin.new_output('test') - output1.configure(config_element('ROOT', '', {'name' => 'output1'})) - output1.define_singleton_method(:emit_events) do |tag, es| + output1.configure('name' => 'output1') + output1.define_singleton_method(:emit) do |tag, es, chain| es.each do |time, record| record['foo'] = 'bar' - super(tag, [[time, record]]) + super(tag, [[time, record]], chain) end end - proc1 = if output1.respond_to?(:emit_events) - Proc.new {|output1, tag, es, _chain| output1.emit_events(tag, es)} - else - Proc.new {|output1, tag, es, _chain| output1.emit(tag, es, NullOutputChain.instance)} - end output2 = Fluent::Plugin.new_output('test') - output2.configure(config_element('ROOT', '', {'name' => 'output2'})) - output2.define_singleton_method(:emit_events) do |tag, es| + output2.configure('name' => 'output2') + output2.define_singleton_method(:emit) do |tag, es, chain| es.each do |time, record| - super(tag, [[time, record]]) + super(tag, [[time, record]], chain) end end - proc2 = if output2.respond_to?(:emit_events) - Proc.new {|output2, tag, es, _chain| output2.emit_events(tag, es)} - else - Proc.new {|output2, tag, es, _chain| output2.emit(tag, es, NullOutputChain.instance)} - end outputs = [output1, output2] - emit_procs = [proc1, proc2] - d = Fluent::Test::OutputTestDriver.new(Fluent::CopyExOutput) + d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput) d = d.configure(deep_copy_config) if is_deep_copy d.instance.instance_eval { @outputs = outputs } - d.instance.instance_eval { @emit_procs = emit_procs } d end @@ -241,4 +220,3 @@ def test_ignore_error assert_nothing_raised { d.emit({"a"=>1}, time) } end end - diff --git a/test/out_copy_ex/v14.rb b/test/out_copy_ex/v14.rb new file mode 100644 index 0000000..95a3c78 --- /dev/null +++ b/test/out_copy_ex/v14.rb @@ -0,0 +1,203 @@ +require_relative '../helper' +require "fluent/test/driver/multi_output" + +class CopyExOutputTest < Test::Unit::TestCase + class << self + def startup + $LOAD_PATH.unshift "#{Gem.loaded_specs['fluentd'].full_gem_path}/test/scripts" + require 'fluent/plugin/out_test' + require 'fluent/plugin/out_test2' + end + + def shutdown + $LOAD_PATH.shift + end + end + + def setup + Fluent::Test.setup + end + + def config_element(name = 'test', argument = '', params = {}, elements = []) + Fluent::Config::Element.new(name, argument, params, elements) + end + + CONFIG = %[ + + @type test + name c0 + + + @type test2 + name c1 + + + @type test + name c2 + + ] + + IGNORE_ERROR_CONFIG = %[ + + @type test + name c0 + + + @type test + name c1 + + + @type test + name c2 + + ] + + def create_driver(conf = CONFIG) + Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyExOutput).configure(conf) + end + + def test_configure + d = create_driver + + outputs = d.instance.outputs + assert_equal 3, outputs.size + assert_equal Fluent::Plugin::TestOutput, outputs[0].class + assert_equal Fluent::Plugin::Test2Output, outputs[1].class + assert_equal Fluent::Plugin::TestOutput, outputs[2].class + assert_equal "c0", outputs[0].name + assert_equal "c1", outputs[1].name + assert_equal "c2", outputs[2].name + end + + def test_configure_ignore_error + d = create_driver(IGNORE_ERROR_CONFIG) + + outputs = d.instance.outputs + ignore_errors = d.instance.ignore_errors + assert_equal outputs.size, ignore_errors.size + assert_equal true, ignore_errors[0] + assert_equal true, ignore_errors[1] + assert_equal false, ignore_errors[2] + end + + def test_feed_events + d = create_driver + + assert !d.instance.outputs[0].has_router? + assert_not_nil d.instance.outputs[1].router + assert !d.instance.outputs[2].has_router? + + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a" => 1}) + d.feed(time, {"a" => 2}) + end + + d.instance.outputs.each {|o| + assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events + } + end + + def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream + d = create_driver + + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + source = Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"a" => 2}] ]) + es = Fluent::MessagePackEventStream.new(source.to_msgpack_stream) + + d.run(default_tag: 'test') do + d.feed(es) + end + + d.instance.outputs.each { |o| + assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events + } + end + + def create_event_test_driver(does_deep_copy = false) + config = %[ + deep_copy #{does_deep_copy} + + @type test + name output1 + + + @type test + name output2 + + ] + + d = Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(config) + d.instance.outputs[0].define_singleton_method(:process) do |tag, es| + es.each do |time, record| + record['foo'] = 'bar' + end + super(tag, es) + end + d + end + + time = Fluent::EventTime.parse("2013-05-26 06:37:22 UTC") + mes0 = Fluent::MultiEventStream.new + mes0.add(time, {"a" => 1}) + mes0.add(time, {"b" => 1}) + mes1 = Fluent::MultiEventStream.new + mes1.add(time, {"a" => 1}) + mes1.add(time, {"b" => 1}) + + data( + "OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})], + "OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})], + "ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])], + "ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])], + "MultiEventStream without deep_copy" => [false, mes0], + "MultiEventStream with deep_copy" => [true, mes1], + ) + def test_deep_copy_controls_shallow_or_deep_copied(data) + does_deep_copy, es = data + + d = create_event_test_driver(does_deep_copy) + + d.run(default_tag: 'test') do + d.feed(es) + end + + events = d.instance.outputs.map(&:events) + + if does_deep_copy + events[0].each_with_index do |entry0, i| + record0 = entry0.last + record1 = events[1][i].last + + assert{ record0.object_id != record1.object_id } + assert_equal "bar", record0["foo"] + assert !record1.has_key?("foo") + end + else + events[0].each_with_index do |entry0, i| + record0 = entry0.last + record1 = events[1][i].last + + assert{ record0.object_id == record1.object_id } + assert_equal "bar", record0["foo"] + assert_equal "bar", record1["foo"] + end + end + end + + def test_ignore_error + d = create_driver(IGNORE_ERROR_CONFIG) + + # override to raise an error + d.instance.outputs[0].define_singleton_method(:process) do |tag, es| + raise ArgumentError, 'Failed' + end + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + assert_nothing_raised do + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + end + end +end diff --git a/test/test_out_copy_ex.rb b/test/test_out_copy_ex.rb new file mode 100644 index 0000000..78a0148 --- /dev/null +++ b/test/test_out_copy_ex.rb @@ -0,0 +1,9 @@ +require_relative "helper" + +require 'fluent/version' +major, minor, patch = Fluent::VERSION.split('.').map(&:to_i) +if major > 0 || (major == 0 && minor >= 14) + require_relative 'out_copy_ex/v14' +else + require_relative 'out_copy_ex/v12' +end