New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: free resources on close #173
Conversation
Our transport class Manticore is explicitly threadsafe and by default creates a pool with up to 50 connections; therefore we do not need an individual instance of Manticore per worker thread. Using a shared instance resolves a memory leak that was caused by using a strong reference to the worker threads themselves as a key in the routing to per-thread clients, which prevented those threads from being garbage-collected.
2cdb7de
to
25a9389
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good to me, but I'm wondering if the default pool size of 50 connections would be enough for all use cases. We could maybe allow users to tune it like we do on the ES output plugin (pool_max and pool_max_per_route), so users with higher connections requirements would have options to change it and wouldn't need to downgrade the plugin. WDYT?
transport_options[:pool_max] = 1000 | ||
transport_options[:pool_max_per_route] = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edmocosta to avoid adding config options in a patch release, I have added these with the same default values that we have in the referenced Elasticsearch Output plugin, which should allow us to completely eliminate connection contention on pipelines with fewer than 100 workers and 10 hosts
. We can make this configurable if/when necessary.
@@ -712,9 +723,9 @@ def wait_receive_request | |||
end | |||
|
|||
it "should read and send non-ascii query" do | |||
expect(client).to receive(:search).with( | |||
expect(client).to receive(:search).with({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review note: rspec's message expectations differentiate between bare keyword arguments or a hash in Ruby 3.x (and so: JRuby 9.4.x from Logstash 8.10), so making this expect a hash fixes the build)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Before merging, please add the PR link on the CHANGELOG :)
Our transport class Manticore is explicitly threadsafe and by default creates a pool with up to 50 connections; therefore we do not need an individual instance of Manticore per worker thread. Using a shared instance resolves a memory leak that was caused by using a strong reference to the worker threads themselves as a key in the routing to per-thread clients, which prevented those threads from being garbage-collected.
Additionally, Manticore gives us the opportunity to explicitly close the client to free its resources in advance of garbage collection, so we should propagate[EDIT: Elasticsearch client doesn't propagate to its transport]LogStash::Filters::Base#close
to the shared client if it exists.Supersedes #167