Skip to content

Commit

Permalink
Add support to generate hash id for preventing duplicate records
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmo0920 committed Nov 16, 2017
1 parent fca9e5c commit ff2ad5b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 1 deletion.
36 changes: 36 additions & 0 deletions lib/fluent/plugin/generate_hash_id_support.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require 'securerandom'

module Fluent
module Plugin
module GenerateHashIdSupport
def self.included(klass)
klass.instance_eval {
config_section :hash, param_name: :hash_config, required: false, multi: false do
config_param :hash_type, :enum, list: [:md5, :sha1, :sha256, :sha512], default: :sha256
config_param :hash_id_key, :string, default: '_id'
end
}
end

def hash_hex(str)
case @hash_config.hash_type
when :md5
Digest::MD5.hexdigest(str)
when :sha1
Digest::SHA1.hexdigest(str)
when :sha256
Digest::SHA256.hexdigest(str)
when :sha512
Digest::SHA512.hexdigest(str)
end
end

def generate_hash_id_key(record)
s = ""
s += hash_hex(SecureRandom.hex(20))
record[@hash_config.hash_id_key] = s
record
end
end
end
end
6 changes: 6 additions & 0 deletions lib/fluent/plugin/out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

require 'fluent/plugin/output'
require_relative 'elasticsearch_index_template'
require_relative 'generate_hash_id_support'

module Fluent::Plugin
class ElasticsearchOutput < Output
Expand Down Expand Up @@ -79,6 +80,7 @@ class ConnectionFailure < StandardError; end
end

include Fluent::ElasticsearchIndexTemplate
include Fluent::Plugin::GenerateHashIdSupport

def initialize
super
Expand Down Expand Up @@ -340,6 +342,10 @@ def write(chunk)
record = flatten_record(record)
end

if @hash_config
record = generate_hash_id_key(record)
end

dt = nil
if @logstash_format || @include_timestamp
if record.has_key?(TIMESTAMP_FIELD)
Expand Down
6 changes: 6 additions & 0 deletions lib/fluent/plugin/out_elasticsearch_dynamic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class ElasticsearchOutputDynamic < ElasticsearchOutput
DYNAMIC_PARAM_NAMES = %W[hosts host port include_timestamp logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation]
DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym }

include Fluent::Plugin::GenerateHashIdSupport

attr_reader :dynamic_config

def configure(conf)
Expand Down Expand Up @@ -130,6 +132,10 @@ def write(chunk)
chunk.msgpack_each do |time, record|
next unless record.is_a? Hash

if @hash_config
record = generate_hash_id_key(record)
end

begin
# evaluate all configurations here
DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
Expand Down
41 changes: 41 additions & 0 deletions test/plugin/test_out_elasticsearch.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
require 'helper'
require 'date'
require 'fluent/test/helpers'
require 'fluent/test/driver/output'
require 'flexmock/test_unit'

class ElasticsearchOutput < Test::Unit::TestCase
include FlexMock::TestCase
include Fluent::Test::Helpers

attr_accessor :index_cmds, :index_command_counts

Expand Down Expand Up @@ -438,6 +440,45 @@ def test_writes_to_speficied_index
assert_equal('myindex', index_cmds.first['index']['_index'])
end

class AdditionalHashIdMechanismTest < self
data("default" => {"hash_type" => :sha256,
"method" => Digest::SHA256,
"hash_id_key" => '_id'},
"md5" => {"hash_type" => :md5,
"method" => Digest::MD5,
"hash_id_key" => '_id'},
"sha1" => {"hash_type" => :sha1,
"method" => Digest::SHA1,
"hash_id_key" => '_id'},
"sha512" => {"hash_type" => :sha512,
"method" => Digest::SHA512,
"hash_id_key" => '_id'},
)
def test_writes_with_genrate_hash(data)
driver.configure(Fluent::Config::Element.new(
'ROOT', '', {
'@type' => 'elasticsearch',
'id_key' => data["hash_id_key"],
}, [
Fluent::Config::Element.new('hash', '', {
'keys' => ['request_id'],
'hash_type' => data["hash_type"],
'hash_id_key' => data["hash_id_key"],
}, [])
]
))
stub_elastic_ping
stub_elastic
flexmock(SecureRandom).should_receive(:hex)
.with(20).and_return("191cb6a04822a8098b920e07881f682dbf710525")
time = event_time("2017-10-15 15:00:23.34567890 UTC")
driver.run(default_tag: 'test') do
driver.feed(time, sample_record.merge('request_id' => 'elastic'))
end
assert_equal(data["method"].hexdigest(SecureRandom.hex(20)), index_cmds[1]["#{data["hash_id_key"]}"])
end
end

class IndexNamePlaceholdersTest < self
def test_writes_to_speficied_index_with_tag_placeholder
driver.configure("index_name myindex.${tag}\n")
Expand Down
43 changes: 42 additions & 1 deletion test/plugin/test_out_elasticsearch_dynamic.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
require 'helper'
require 'date'
require 'fluent/test/helpers'
require 'fluent/test/driver/output'
require 'flexmock/test_unit'

class ElasticsearchOutputDynamic < Test::Unit::TestCase
include FlexMock::TestCase
include Fluent::Test::Helpers

attr_accessor :index_cmds, :index_command_counts

Expand Down Expand Up @@ -316,6 +318,45 @@ def test_writes_to_multi_hosts
assert_equal(2000, total)
end

class AdditionalHashIdMechanismTest < self
data("default" => {"hash_type" => :sha256,
"method" => Digest::SHA256,
"hash_id_key" => '_id'},
"md5" => {"hash_type" => :md5,
"method" => Digest::MD5,
"hash_id_key" => '_id'},
"sha1" => {"hash_type" => :sha1,
"method" => Digest::SHA1,
"hash_id_key" => '_id'},
"sha512" => {"hash_type" => :sha512,
"method" => Digest::SHA512,
"hash_id_key" => '_id'},
)
def test_writes_with_genrate_hash(data)
driver.configure(Fluent::Config::Element.new(
'ROOT', '', {
'@type' => 'elasticsearch',
'id_key' => data["hash_id_key"],
}, [
Fluent::Config::Element.new('hash', '', {
'keys' => ['request_id'],
'hash_type' => data["hash_type"],
'hash_id_key' => data["hash_id_key"],
}, [])
]
))
stub_elastic_ping
stub_elastic
flexmock(SecureRandom).should_receive(:hex)
.with(20).and_return("49609b8df6693fd244d3b2c6e6170d04841457cf")
time = event_time("2017-10-15 15:00:23.34567890 UTC")
driver.run(default_tag: 'test') do
driver.feed(time, sample_record.merge('request_id' => 'elastic'))
end
assert_equal(data["method"].hexdigest(SecureRandom.hex(20)), index_cmds[1]["#{data["hash_id_key"]}"])
end
end

def test_makes_bulk_request
stub_elastic_ping
stub_elastic
Expand Down Expand Up @@ -734,7 +775,7 @@ def test_reconnect_on_error_disabled
stub_request(:post, "http://localhost:9200/_bulk").with do |req|
raise ZeroDivisionError, "any not host_unreachable_exceptions exception"
end

driver.configure("reconnect_on_error false\n")

assert_raise(ZeroDivisionError) {
Expand Down

0 comments on commit ff2ad5b

Please sign in to comment.