From 3ee70c093cc504131efd93023f624e1ebdf807c5 Mon Sep 17 00:00:00 2001 From: Ji Oh Yoo Date: Mon, 7 Dec 2015 04:05:06 -0800 Subject: [PATCH] changed to retry put_record_batch when the server gives response with error --- lib/fluent/plugin/out_kinesis_firehose.rb | 40 +++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/out_kinesis_firehose.rb b/lib/fluent/plugin/out_kinesis_firehose.rb index 597fac1..9cc8a4f 100644 --- a/lib/fluent/plugin/out_kinesis_firehose.rb +++ b/lib/fluent/plugin/out_kinesis_firehose.rb @@ -24,6 +24,7 @@ class Fluent::KinesisFirehoseOutput < Fluent::BufferedOutput config_param :delivery_stream_name, :string config_param :data_key, :string, :default => nil config_param :append_new_line, :bool, :default => true + config_param :retries_on_putrecordbatch, :integer,:default => 3 def initialize super @@ -86,19 +87,44 @@ def put_records(data_list) false end }.each {|chunk| - put_record_batch(chunk) + put_record_batch_with_retry(chunk) } end - def put_record_batch(data_list) + def put_record_batch_with_retry(data_list, retry_count=0) records = data_list.map do |data| {:data => data} end - client.put_record_batch( + response = client.put_record_batch( :delivery_stream_name => @delivery_stream_name, :records => records ) + + if response[:failed_put_count] && response[:failed_put_count] > 0 + failed_records = [] + response[:request_responses].each_with_index{|record, index| + if record[:error_code] + failed_records.push({body: records[index], error_code: record[:error_code]}) + end + } + + if retry_count < @retries_on_putrecordbatch + sleep(calculate_sleep_duration(retry_count)) + retry_count += 1 + log.warn sprintf('Retrying to put records. Retry count: %d', retry_count) + put_record_batch_with_retry(failed_records.map{|record| record[:body]}, retry_count) + else + failed_records.each{|record| + log.error sprintf( + 'Could not put record, Error: %s, Record: %s', + record[:error_code], + JSON.dump(record[:body]) + ) + } + end + + end end def client @@ -127,4 +153,12 @@ def client @client = Aws::Firehose::Client.new(options) end + + def calculate_sleep_duration(current_retry) + Array.new(@retries_on_putrecordbatch){|n| ((2 ** n) * scaling_factor)}[current_retry] + end + + def scaling_factor + 0.5 + Kernel.rand * 0.1 + end end