diff --git a/README.md b/README.md index 95bce52..69a7d16 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ A generic [fluentd][1] output plugin for sending logs to an HTTP endpoint read_timeout 10 # default: 60 raise_on_error false # default: true raise_on_http_failure true # default: false + ignore_http_status_code 300,400..499 # default: nil # do not raise on these http_hstatus codes authentication basic # default: none username alice # default: '' password bobpop # default: '', secret: true diff --git a/lib/fluent/plugin/out_http_ext.rb b/lib/fluent/plugin/out_http_ext.rb index d6bdeaa..04bec61 100644 --- a/lib/fluent/plugin/out_http_ext.rb +++ b/lib/fluent/plugin/out_http_ext.rb @@ -1,3 +1,12 @@ +require 'set' + +class Array + def to_set + Set.new(self) + end +end + + class Hash """ each traverse in hash @@ -19,6 +28,39 @@ def each_deep_detail(directory, &proc) end +class StatusCodeParser + """ + parse status code string to array of codes + """ + def self.range?(str) + # i.e. 200..399 => return true + return /^\d{3}..\d{3}$/ =~ str ? true : false + end + + def self.number?(str) + return /^\d{3}$/ =~ str ? true : false + end + + def self.get_array(str) + if self.range?(str) + ends = str.split('..').map{|d| Integer(d)} + return (ends[0]..ends[1]).to_a + elsif self.number?(str) + return [str.to_i] + else + raise "invalid status code range format" + end + end + + def self.convert(range_str) + elems = range_str.split(',') + status_codes = elems.flat_map do |elem| + self.get_array(elem) + end + return status_codes.to_set + end +end + class Fluent::HTTPOutput < Fluent::Output Fluent::Plugin.register_output('http_ext', self) @@ -27,6 +69,7 @@ def initialize require 'net/http' require 'uri' require 'yajl' + require 'set' end # Endpoint URL ex. localhost.local/api/ @@ -53,7 +96,7 @@ def initialize # Raise errors when HTTP response code was not successful. config_param :raise_on_http_failure, :bool, :default => false - + config_param :ignore_http_status_code, :string, :default => nil # nil | 'none' | 'basic' config_param :authentication, :string, :default => nil config_param :username, :string, :default => '' @@ -76,6 +119,12 @@ def configure(conf) :post end + @ignore_http_status_code = if @ignore_http_status_code.nil? + [].to_set + else + StatusCodeParser.convert(@ignore_http_status_code) + end + @auth = case @authentication when 'basic' then :basic else @@ -178,7 +227,14 @@ def send_request(req, uri) end warning = "failed to #{req.method} #{uri} (#{res_summary})" $log.warn warning - raise warning if @raise_on_http_failure + if @raise_on_http_failure + unless @ignore_http_status_code.include?(res.code.to_i) + raise warning + else + $log.debug "ignore http status code #{req.method}" + end + end + end #end unless end # end begin end # end send_request diff --git a/test/plugin/test_out_http_ext.rb b/test/plugin/test_out_http_ext.rb index 8708eab..b04cd11 100644 --- a/test/plugin/test_out_http_ext.rb +++ b/test/plugin/test_out_http_ext.rb @@ -69,6 +69,12 @@ def setup res.status = 200 res.body = 'slow_10' } + srv.mount_proc('/status_code') { |req,res| + r = Yajl.load(req.body) + code = r["code"] + res.status = code.to_s + res.body = '' + } srv.start ensure @@ -180,14 +186,36 @@ class HTTPOutputTest < HTTPOutputTestBase rate_limit_msec #{RATE_LIMIT_MSEC} ] + CONFIG_NOT_READ_TIMEOUT = %[ + endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/slow_5/ + read_timeout 7 + ] CONFIG_READ_TIMEOUT = %[ endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/slow_10/ read_timeout 7 ] - - CONFIG_NOT_READ_TIMEOUT = %[ - endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/slow_5/ - read_timeout 7 + CONFIG_IGNORE_NONE = %[ + endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/status_code/ + serializer json + raise_on_http_failure true + ] + CONFIG_IGNORE_409 = %[ + endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/status_code/ + serializer json + raise_on_http_failure true + ignore_http_status_code 409 + ] + CONFIG_IGNORE_4XX = %[ + endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/status_code/ + serializer json + raise_on_http_failure true + ignore_http_status_code 400..499 + ] + CONFIG_IGNORE_4XX_5XX = %[ + endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/status_code/ + serializer json + raise_on_http_failure true + ignore_http_status_code 400..599 ] def create_driver(conf=CONFIG, tag='test.metrics') @@ -342,6 +370,74 @@ def test_not_read_timeout end end + def test_ignore_none + d = create_driver CONFIG_IGNORE_NONE + assert_equal [].to_set, d.instance.ignore_http_status_code + + assert_raise do + d.emit({:code=> 409}) + d.run + end + + assert_raise do + d.emit({:code => 500}) + d.run + end + end + + def test_ignore_409 + d = create_driver CONFIG_IGNORE_409 + assert_equal [409].to_set, d.instance.ignore_http_status_code + + assert_nothing_raised do + d.emit({:code => 409}) + d.run + end + assert_raise do + d.emit({:code => 404}) + d.run + end + assert_raise do + d.emit({:code => 500}) + d.run + end + end + + def test_ignore_4XX + d = create_driver CONFIG_IGNORE_4XX + assert_equal (400..499).to_a.to_set, d.instance.ignore_http_status_code + + assert_nothing_raised do + d.emit({:code => 409}) + d.run + end + assert_nothing_raised do + d.emit({:code => 404}) + d.run + end + assert_raise do + d.emit({:code => 500}) + d.run + end + end + + def test_ignore_4XX_5XX + d = create_driver CONFIG_IGNORE_4XX_5XX + assert_equal (400..599).to_a.to_set, d.instance.ignore_http_status_code + assert_nothing_raised do + d.emit({:code => 409}) + d.run + end + assert_nothing_raised do + d.emit({:code => 404}) + d.run + end + assert_nothing_raised do + d.emit({:code => 500}) + d.run + end + end + def _current_msec Time.now.to_f * 1000 end @@ -379,4 +475,37 @@ def test_auth assert_equal 2, @prohibited end + def test_status_code_parser() + assert_equal (400..409).to_a.to_set, StatusCodeParser.convert("400..409") + assert_equal ((400..409).to_a + [300]).to_set, StatusCodeParser.convert("400..409,300") + assert_equal ((400..409).to_a + [300]).to_set, StatusCodeParser.convert("300,400..409") + assert_equal [404, 409].to_set, StatusCodeParser.convert("404,409") + assert_equal [404, 409, 300, 301, 302, 303].to_set, StatusCodeParser.convert("404,409,300..303") + assert_equal [409].to_set, StatusCodeParser.convert("409") + assert_equal [].to_set, StatusCodeParser.convert("") + assert_raise do + StatusCodeParser.convert("400...499") + end + assert_raise do + StatusCodeParser.convert("10..20") + end + assert_raise do + StatusCodeParser.convert("4XX") + end + assert_raise do + StatusCodeParser.convert("4XX..5XX") + end + assert_raise do + StatusCodeParser.convert("200.0..400") + end + assert_raise do + StatusCodeParser.convert("-200..400") + end + + end + + def test_array_extend + assert_equal [].to_set, Set.new([]) + assert_equal [1, 2].to_set, Set.new([1, 2]) + end end