Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: free resources on close #173

Merged
merged 3 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.15.3
- Fixes a memory leak that occurs when a pipeline containing this filter terminates, which could become significant if the pipeline is cycled repeatedly.
yaauie marked this conversation as resolved.
Show resolved Hide resolved

## 3.15.2
- Added checking for `query` and `query_template`. [#171](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/171)

Expand Down
10 changes: 6 additions & 4 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "logstash/json"
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
require "logstash/plugin_mixins/normalize_config_support"
require "monitor"

require_relative "elasticsearch/client"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
Expand Down Expand Up @@ -139,7 +140,8 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base

include LogStash::PluginMixins::NormalizeConfigSupport

attr_reader :clients_pool
include MonitorMixin
attr_reader :shared_client

##
# @override to handle proxy => '' as if none was set
Expand All @@ -159,8 +161,6 @@ def self.validate_value(value, validator)
end

def register
@clients_pool = java.util.concurrent.ConcurrentHashMap.new

#Load query if it exists
if @query_template
if File.zero?(@query_template)
Expand Down Expand Up @@ -352,7 +352,9 @@ def new_client
end

def get_client
@clients_pool.computeIfAbsent(Thread.current, lambda { |x| new_client })
@shared_client || synchronize do
@shared_client ||= new_client
end
end

# get an array of path elements from a path reference
Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.15.2'
s.version = '3.15.3'
s.licenses = ['Apache License (2.0)']
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
33 changes: 18 additions & 15 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,6 @@
Thread.current[:filter_elasticsearch_client] = nil
end

# Since the Elasticsearch Ruby client is not thread safe
# and under high load we can get error with the connection pool
# we have decided to create a new instance per worker thread which
# will be lazy created on the first call to `#filter`
#
# I am adding a simple test case for future changes
it "uses a different connection object per thread wait" do
expect(plugin.clients_pool.size).to eq(0)

Thread.new { plugin.filter(event) }.join
Thread.new { plugin.filter(event) }.join

expect(plugin.clients_pool.size).to eq(2)
end

it "should enhance the current event with new data" do
plugin.filter(event)
expect(event.get("code")).to eq(404)
Expand Down Expand Up @@ -466,6 +451,24 @@ def wait_receive_request
Thread.current[:filter_elasticsearch_client] = nil
end

it 'uses a threadsafe transport adapter' do
client = plugin.send(:get_client).client
# we currently rely on the threadsafety guarantees provided by Manticore
# this spec is a safeguard to trigger an assessment of thread-safety should
# we choose a different transport adapter in the future.
expect(extract_transport(client).to_s).to include('Manticore')
end

it 'uses a single shared client across threads' do
q = Queue.new
10.times.map do
Thread.new(plugin) { |instance| q.push instance.send(:get_client) }
end.map(&:join)

first = q.pop
expect(q.pop).to be(first) until q.empty?
end

describe "cloud.id" do
let(:valid_cloud_id) do
'sample:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvJGFjMzFlYmI5MDI0MTc3MzE1NzA0M2MzNGZkMjZmZDQ2OjkyNDMkYTRjMDYyMzBlNDhjOGZjZTdiZTg4YTA3NGEzYmIzZTA6OTI0NA=='
Expand Down