Skip to content

Commit

Permalink
fix: free resources on close
Browse files Browse the repository at this point in the history
Our transport class Manticore is explicitly threadsafe and by default creates
a pool with up to 50 connections; therefore we do not need an individual
instance of Manticore per worker thread. Using a shared instance resolves a
memory leak that was caused by using a strong reference to the worker threads
themselves as a key in the routing to per-thread clients, which prevented
those threads from being garbage-collected.

Additionally, Manticore gives us the opportunity to explicitly close the
client to free its resources in advance of garbage collection, so we should
propagate `LogStash::Filters::Base#close` to the shared client if it exists.
  • Loading branch information
yaauie committed Aug 15, 2023
1 parent b716fd4 commit 2cdb7de
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.15.3
- Fixes a memory leak that occurs when a pipeline containing this filter terminates, which could become significant if the pipeline is cycled repeatedly.

## 3.15.2
- Added checking for `query` and `query_template`. [#171](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/171)

Expand Down
17 changes: 13 additions & 4 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "logstash/json"
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
require "logstash/plugin_mixins/normalize_config_support"
require "monitor"

require_relative "elasticsearch/client"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
Expand Down Expand Up @@ -139,7 +140,8 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base

include LogStash::PluginMixins::NormalizeConfigSupport

attr_reader :clients_pool
include MonitorMixin
attr_reader :shared_client

##
# @override to handle proxy => '' as if none was set
Expand All @@ -159,8 +161,6 @@ def self.validate_value(value, validator)
end

def register
@clients_pool = java.util.concurrent.ConcurrentHashMap.new

#Load query if it exists
if @query_template
if File.zero?(@query_template)
Expand Down Expand Up @@ -244,6 +244,13 @@ def filter(event)
end
end # def filter

def close
synchronize do
@shared_client&.close
end
super
end

# public only to be reuse in testing
def prepare_user_agent
os_name = java.lang.System.getProperty('os.name')
Expand Down Expand Up @@ -352,7 +359,9 @@ def new_client
end

def get_client
@clients_pool.computeIfAbsent(Thread.current, lambda { |x| new_client })
@shared_client || synchronize do
@shared_client ||= new_client
end
end

# get an array of path elements from a path reference
Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.15.2'
s.version = '3.15.3'
s.licenses = ['Apache License (2.0)']
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
33 changes: 18 additions & 15 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,6 @@
Thread.current[:filter_elasticsearch_client] = nil
end

# Since the Elasticsearch Ruby client is not thread safe
# and under high load we can get error with the connection pool
# we have decided to create a new instance per worker thread which
# will be lazy created on the first call to `#filter`
#
# I am adding a simple test case for future changes
it "uses a different connection object per thread wait" do
expect(plugin.clients_pool.size).to eq(0)

Thread.new { plugin.filter(event) }.join
Thread.new { plugin.filter(event) }.join

expect(plugin.clients_pool.size).to eq(2)
end

it "should enhance the current event with new data" do
plugin.filter(event)
expect(event.get("code")).to eq(404)
Expand Down Expand Up @@ -466,6 +451,24 @@ def wait_receive_request
Thread.current[:filter_elasticsearch_client] = nil
end

it 'uses a threadsafe transport adapter' do
client = plugin.send(:get_client).client
# we currently rely on the threadsafety guarantees provided by Manticore
# this spec is a safeguard to trigger an assessment of thread-safety should
# we choose a different transport adapter in the future.
expect(extract_transport(client).to_s).to include('Manticore')
end

it 'uses a single shared client across threads' do
q = Queue.new
10.times.map do
Thread.new(plugin) { |instance| q.push instance.send(:get_client) }
end.map(&:join)

first = q.pop
expect(q.pop).to be(first) until q.empty?
end

describe "cloud.id" do
let(:valid_cloud_id) do
'sample:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvJGFjMzFlYmI5MDI0MTc3MzE1NzA0M2MzNGZkMjZmZDQ2OjkyNDMkYTRjMDYyMzBlNDhjOGZjZTdiZTg4YTA3NGEzYmIzZTA6OTI0NA=='
Expand Down

0 comments on commit 2cdb7de

Please sign in to comment.