Skip to content

Commit

Permalink
add simple sniffer for simple proxy/lb cases
Browse files Browse the repository at this point in the history
When going through a proxy or load balancer to Elasticsearch,
do not try to use the _nodes discovery mechanism - instead, just
assume the given host(s) are the only hostnames to use.
This is useful e.g. in the case of Kubernetes going through an
Elasticsearch Service, where you want to periodically reconnect
the long-lived http connections to evenly spread the load throughout
the cluster.
  • Loading branch information
richm committed Aug 10, 2018
1 parent aba3318 commit 1ffe376
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
25 changes: 25 additions & 0 deletions README.md
Expand Up @@ -56,6 +56,8 @@ Note: For Amazon Elasticsearch Service please consider using [fluent-plugin-aws-
+ [Buffered output options](#buffered-output-options)
+ [Hash flattening](#hash-flattening)
+ [Generate Hash ID](#generate-hash-id)
+ [sniffer_class_name](#sniffer_class_name)
+ [reload_after](#reload_after)
+ [Not seeing a config you need?](#not-seeing-a-config-you-need)
+ [Dynamic configuration](#dynamic-configuration)
* [Contact](#contact)
Expand Down Expand Up @@ -576,6 +578,29 @@ Here is a sample config:
</match>
```

### Sniffer Class Name

The default Sniffer used by the `Elasticsearch::Transport` class works well when Fluentd has a direct connection
to all of the Elasticsearch servers and can make effective use of the `_nodes` API. This doesn't work well
when Fluentd must connect through a load balancer or proxy. The parameter `sniffer_class_name` gives you the
ability to provide your own Sniffer class to implement whatever connection reload logic you require. In addition,
there is a new `Fluent::ElasticsearchSimpleSniffer` class which reuses the hosts given in the configuration, which
is typically the hostname of the load balancer or proxy. For example, a configuration like this would cause
connections to `logging-es` to reload every 100 operations:

```
host logging-es
port 9200
reload_connections true
sniffer_class_name Fluent::ElasticsearchSimpleSniffer
reload_after 100
```

### Reload After

When `reload_connections true`, this is the integer number of operations after which the plugin will
reload the connections. The default value is 10000.

### Not seeing a config you need?

We try to keep the scope of this plugin small and not add too many configuration options. If you think an option would be useful to others, feel free to open an issue or contribute a Pull Request.
Expand Down
10 changes: 10 additions & 0 deletions lib/fluent/plugin/elasticsearch_simple_sniffer.rb
@@ -0,0 +1,10 @@
require 'elasticsearch'

class Fluent::ElasticsearchSimpleSniffer < Elasticsearch::Transport::Transport::Sniffer

def hosts
@transport.logger.debug "In Fluent::ElasticsearchSimpleSniffer hosts #{@transport.hosts}" if @transport.logger
@transport.hosts
end

end
18 changes: 16 additions & 2 deletions lib/fluent/plugin/out_elasticsearch.rb
Expand Up @@ -86,6 +86,8 @@ def initialize(retry_stream)
config_param :pipeline, :string, :default => nil
config_param :with_transporter_log, :bool, :default => false
config_param :emit_error_for_missing_id, :bool, :default => false
config_param :sniffer_class_name, :string, :default => nil
config_param :reload_after, :integer, :default => -1

include Fluent::ElasticsearchIndexTemplate
include Fluent::ElasticsearchConstants
Expand Down Expand Up @@ -147,6 +149,13 @@ def configure(conf)
log.warn "Consider to specify log_level with @log_level." unless log_level
end

@sniffer_class = nil
begin
@sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
rescue Exception => ex
raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
end

end

def create_meta_config_map
Expand Down Expand Up @@ -189,9 +198,13 @@ def client
@_es ||= begin
excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
adapter_conf = lambda {|f| f.adapter :excon, excon_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > -1
local_reload_connections = @reload_after
end
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
options: {
reload_connections: @reload_connections,
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
retry_on_failure: 5,
Expand All @@ -204,7 +217,8 @@ def client
http: {
user: @user,
password: @password
}
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
es = Elasticsearch::Client.new transport: transport

Expand Down
16 changes: 16 additions & 0 deletions test/plugin/test_out_elasticsearch.rb
Expand Up @@ -1716,4 +1716,20 @@ def test_create_should_write_create_op
assert(index_cmds[0].has_key?("create"))
end

def test_use_simple_sniffer
require 'fluent/plugin/elasticsearch_simple_sniffer'
driver.configure("sniffer_class_name Fluent::ElasticsearchSimpleSniffer
log_level debug
with_transporter_log true
reload_connections true
reload_after 1")
stub_elastic_ping
stub_elastic
driver.emit(sample_record)
driver.run
log = driver.instance.router.emit_error_handler.log
# 2 - one for the ping, one for the _bulk
assert_logs_include(log.out.logs, /In Fluent::ElasticsearchSimpleSniffer hosts/, 2)
end

end

0 comments on commit 1ffe376

Please sign in to comment.