Skip to content

Commit

Permalink
Merge pull request #3 from yoojioh/master
Browse files Browse the repository at this point in the history
Retry put_record_batch when the server gives response with error.
  • Loading branch information
Genki Sugawara committed Dec 9, 2015
2 parents 20ca49f + 3ee70c0 commit 0fa2d74
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions lib/fluent/plugin/out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Fluent::KinesisFirehoseOutput < Fluent::BufferedOutput
config_param :delivery_stream_name, :string
config_param :data_key, :string, :default => nil
config_param :append_new_line, :bool, :default => true
config_param :retries_on_putrecordbatch, :integer,:default => 3

def initialize
super
Expand Down Expand Up @@ -86,19 +87,44 @@ def put_records(data_list)
false
end
}.each {|chunk|
put_record_batch(chunk)
put_record_batch_with_retry(chunk)
}
end

def put_record_batch(data_list)
def put_record_batch_with_retry(data_list, retry_count=0)
records = data_list.map do |data|
{:data => data}
end

client.put_record_batch(
response = client.put_record_batch(
:delivery_stream_name => @delivery_stream_name,
:records => records
)

if response[:failed_put_count] && response[:failed_put_count] > 0
failed_records = []
response[:request_responses].each_with_index{|record, index|
if record[:error_code]
failed_records.push({body: records[index], error_code: record[:error_code]})
end
}

if retry_count < @retries_on_putrecordbatch
sleep(calculate_sleep_duration(retry_count))
retry_count += 1
log.warn sprintf('Retrying to put records. Retry count: %d', retry_count)
put_record_batch_with_retry(failed_records.map{|record| record[:body]}, retry_count)
else
failed_records.each{|record|
log.error sprintf(
'Could not put record, Error: %s, Record: %s',
record[:error_code],
JSON.dump(record[:body])
)
}
end

end
end

def client
Expand Down Expand Up @@ -127,4 +153,12 @@ def client

@client = Aws::Firehose::Client.new(options)
end

def calculate_sleep_duration(current_retry)
Array.new(@retries_on_putrecordbatch){|n| ((2 ** n) * scaling_factor)}[current_retry]
end

def scaling_factor
0.5 + Kernel.rand * 0.1
end
end

0 comments on commit 0fa2d74

Please sign in to comment.