Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: free resources on close #173

Merged
merged 3 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.15.3
- Fixes a memory leak that occurs when a pipeline containing this filter terminates, which could become significant if the pipeline is cycled repeatedly.
yaauie marked this conversation as resolved.
Show resolved Hide resolved

## 3.15.2
- Added checking for `query` and `query_template`. [#171](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/171)

Expand Down
10 changes: 6 additions & 4 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "logstash/json"
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
require "logstash/plugin_mixins/normalize_config_support"
require "monitor"

require_relative "elasticsearch/client"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
Expand Down Expand Up @@ -139,7 +140,8 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base

include LogStash::PluginMixins::NormalizeConfigSupport

attr_reader :clients_pool
include MonitorMixin
attr_reader :shared_client

##
# @override to handle proxy => '' as if none was set
Expand All @@ -159,8 +161,6 @@ def self.validate_value(value, validator)
end

def register
@clients_pool = java.util.concurrent.ConcurrentHashMap.new

#Load query if it exists
if @query_template
if File.zero?(@query_template)
Expand Down Expand Up @@ -352,7 +352,9 @@ def new_client
end

def get_client
@clients_pool.computeIfAbsent(Thread.current, lambda { |x| new_client })
@shared_client || synchronize do
@shared_client ||= new_client
end
end

# get an array of path elements from a path reference
Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def initialize(logger, hosts, options = {})
transport_options[:headers].merge!(setup_api_key(api_key))
transport_options[:headers].merge!({ 'user-agent' => "#{user_agent}" })

transport_options[:pool_max] = 1000
transport_options[:pool_max_per_route] = 100
Comment on lines +25 to +26
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edmocosta to avoid adding config options in a patch release, I have added these with the same default values that we have in the referenced Elasticsearch Output plugin, which should allow us to completely eliminate connection contention on pipelines with fewer than 100 workers and 10 hosts. We can make this configurable if/when necessary.


logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')
transport_options[:proxy] = proxy.to_s if proxy && !proxy.eql?('')

Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.15.2'
s.version = '3.15.3'
s.licenses = ['Apache License (2.0)']
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
45 changes: 28 additions & 17 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,6 @@
Thread.current[:filter_elasticsearch_client] = nil
end

# Since the Elasticsearch Ruby client is not thread safe
# and under high load we can get error with the connection pool
# we have decided to create a new instance per worker thread which
# will be lazy created on the first call to `#filter`
#
# I am adding a simple test case for future changes
it "uses a different connection object per thread wait" do
expect(plugin.clients_pool.size).to eq(0)

Thread.new { plugin.filter(event) }.join
Thread.new { plugin.filter(event) }.join

expect(plugin.clients_pool.size).to eq(2)
end

it "should enhance the current event with new data" do
plugin.filter(event)
expect(event.get("code")).to eq(404)
Expand Down Expand Up @@ -466,6 +451,32 @@ def wait_receive_request
Thread.current[:filter_elasticsearch_client] = nil
end

it 'uses a threadsafe transport adapter' do
client = plugin.send(:get_client).client
# we currently rely on the threadsafety guarantees provided by Manticore
# this spec is a safeguard to trigger an assessment of thread-safety should
# we choose a different transport adapter in the future.
transport_class = extract_transport(client).options.fetch(:transport_class)
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
end

it 'uses a client with sufficient connection pool size' do
client = plugin.send(:get_client).client
transport_options = extract_transport(client).options.fetch(:transport_options)
# pool_max and pool_max_per_route are manticore-specific transport options
expect(transport_options).to include(:pool_max => 1000, :pool_max_per_route => 100)
end

it 'uses a single shared client across threads' do
q = Queue.new
10.times.map do
Thread.new(plugin) { |instance| q.push instance.send(:get_client) }
end.map(&:join)

first = q.pop
expect(q.pop).to be(first) until q.empty?
end

describe "cloud.id" do
let(:valid_cloud_id) do
'sample:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvJGFjMzFlYmI5MDI0MTc3MzE1NzA0M2MzNGZkMjZmZDQ2OjkyNDMkYTRjMDYyMzBlNDhjOGZjZTdiZTg4YTA3NGEzYmIzZTA6OTI0NA=='
Expand Down Expand Up @@ -712,9 +723,9 @@ def wait_receive_request
end

it "should read and send non-ascii query" do
expect(client).to receive(:search).with(
expect(client).to receive(:search).with({
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: rspec's message expectations differentiate between bare keyword arguments or a hash in Ruby 3.x (and so: JRuby 9.4.x from Logstash 8.10), so making this expect a hash fixes the build)

:body => { "query" => { "terms" => { "lock" => [ "잠금", "uzávěr" ] } } },
:index => "")
:index => ""})

plugin.filter(LogStash::Event.new)
end
Expand Down