From 270f64e24aa7e57615d076fa764419b0a5b5cc8e Mon Sep 17 00:00:00 2001 From: sonots Date: Wed, 14 Aug 2013 15:18:04 +0900 Subject: [PATCH] add store_file option --- lib/fluent/plugin/out_datacounter.rb | 48 ++++++++++++++++++++++++++++ test/plugin/test_out_datacounter.rb | 35 ++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/lib/fluent/plugin/out_datacounter.rb b/lib/fluent/plugin/out_datacounter.rb index c49b719..e543833 100644 --- a/lib/fluent/plugin/out_datacounter.rb +++ b/lib/fluent/plugin/out_datacounter.rb @@ -13,6 +13,7 @@ class Fluent::DataCounterOutput < Fluent::Output config_param :count_key, :string config_param :outcast_unmatched, :bool, :default => false config_param :output_messages, :bool, :default => false + config_param :store_file, :string, :default => nil # pattern0 reserved as unmatched counts config_param :pattern1, :string # string: NAME REGEXP @@ -78,12 +79,20 @@ def configure(conf) @removed_length = @removed_prefix_string.length end + if @store_file + f = Pathname.new(@store_file) + if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?) + raise Fluent::ConfigError, "#{@store_file} is not writable" + end + end + @counts = count_initialized @mutex = Mutex.new end def start super + load_from_file start_watch end @@ -91,6 +100,7 @@ def shutdown super @watcher.terminate @watcher.join + store_to_file end def count_initialized(keys=nil) @@ -242,4 +252,42 @@ def emit(tag, es, chain) chain.next end + + def store_to_file + return unless @store_file + + begin + Pathname.new(@store_file).open('wb') do |f| + Marshal.dump({ + :counts => @counts, + :aggregate => @aggregate, + :count_key => @count_key, + :patterns => @patterns, + }, f) + end + rescue => e + $log.warn "out_datacounter: Can't write store_file #{e.class} #{e.message}" + end + end + + def load_from_file + return unless @store_file + return unless (f = Pathname.new(@store_file)).exist? + + begin + f.open('rb') do |f| + stored = Marshal.load(f) + if stored[:aggregate] == @aggregate and + stored[:count_key] == @count_key and + stored[:patterns] == @patterns + @counts = stored[:counts] + else + $log.warn "out_datacounter: configuration param was changed. ignore stored data" + end + end + rescue => e + $log.warn "out_datacounter: Can't load store_file #{e.class} #{e.message}" + end + end + end diff --git a/test/plugin/test_out_datacounter.rb b/test/plugin/test_out_datacounter.rb index 90c3d57..7db8b7a 100644 --- a/test/plugin/test_out_datacounter.rb +++ b/test/plugin/test_out_datacounter.rb @@ -653,4 +653,39 @@ def test_zer_tags_per_tag d.instance.flush_emit(60) assert_equal 2, d.emits.size # +0 end + + def test_store_file + dir = "test/tmp" + Dir.mkdir dir unless Dir.exist? dir + file = "#{dir}/test.dat" + File.unlink file if File.exist? file + + # test store + d = create_driver(CONFIG + %[store_file #{file}]) + d.run do + d.instance.flush_emit(60) + d.emit({'target' => 1}) + d.emit({'target' => 1}) + d.emit({'target' => 1}) + d.instance.shutdown + end + stored_counts = d.instance.counts + assert File.exist? file + + # test load + d = create_driver(CONFIG + %[store_file #{file}]) + d.run do + loaded_counts = d.instance.counts + assert_equal stored_counts, loaded_counts + end + + # test not to load if config is changed + d = create_driver(CONFIG + %[count_key foobar store_file #{file}]) + d.run do + loaded_counts = d.instance.counts + assert_equal({}, loaded_counts) + end + + File.unlink file + end end