New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a shared ExecutorService for all AMQP connections / plugin instances #93

Open
micoq opened this Issue Jan 5, 2017 · 18 comments

Comments

Projects
None yet
4 participants
@micoq

micoq commented Jan 5, 2017

Currently, for each AMQP connection, an ExecutorService is created with a number of threads equal to two times the number of CPUs/cores. These threads seems always to be in sleeping/blocking state (checked with VisualVM).
On a system with many cores and many AMQP connections this can waste a large amount of memory and sometimes slow down the whole pipeline.

Example on my environment:

  • Logstash version : 5.1.1 (but the behavior is the same with the old versions)
  • RabbitMQ : 3.6.5
  • OS : RHEL 6
  • System : 2 sockets / 6 cores per CPU / hyperthreading (=24 threads)
  • 40 queues from 4 RabbitMQ instances (160 queues)

Total of sleeping threads : 7680 (160 connections24 cores2) for about 2GB of threads stacks.

If we look at the code, the RabbitMQ client library create a FixedThreadPool with 2*core threads in the constructor of com.rabbitmq.client.impl.ConsumerWorkService if no pool is provided:

private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;

this.executor = (executor == null ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory) : executor);

Actually, the March Hare library (which uses the RabbitMQ client) should provide this pool in the connection factory (com.rabbitmq.client.ConnectionFactory).

Here is a quick&dirty workaround to create a pool with only one thread for each connection by patching March Hare (/usr/share/logstash/vendor/bundle/jruby/1.9/gems/march_hare-2.20.0-java/lib/march_hare/session.rb). Add this line in self.connect:
cf.setSharedExecutor(Executors.newSingleThreadExecutor)

and the import line at the beginning:
java_import java.util.concurrent.Executors

On my setup:

  • before : 7739 threads for a single Logstash process
  • after : 2130 threads

We can even use a single thread for all the connections without a loss of speed (@4000 events/s with filters). In this case, the thread is running but not heavily used. It seems to handle the consumer delivery in the RabbitMQ client library.

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jan 5, 2017

Contributor

March Hare allows you to use any executor service you want.

Contributor

michaelklishin commented Jan 5, 2017

March Hare allows you to use any executor service you want.

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jan 5, 2017

Contributor

I -1 the idea of using the single threaded executor by default. It can be Runtime.getRuntime().availableProcessors() instead of Runtime.getRuntime().availableProcessors() * 2, for example.

Contributor

michaelklishin commented Jan 5, 2017

I -1 the idea of using the single threaded executor by default. It can be Runtime.getRuntime().availableProcessors() instead of Runtime.getRuntime().availableProcessors() * 2, for example.

@micoq

This comment has been minimized.

Show comment
Hide comment
@micoq

micoq Jan 5, 2017

Hello,

Yes, it was a very dirty workaround :) How can I do that properly ? By using the ":thread_pool_size" and or the ":executor_factory" parameter in the the self.connect method ?

micoq commented Jan 5, 2017

Hello,

Yes, it was a very dirty workaround :) How can I do that properly ? By using the ":thread_pool_size" and or the ":executor_factory" parameter in the the self.connect method ?

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jan 5, 2017

Contributor

@micoq correct. The former is for cases when you only want to specify pool size and nothing else (the executor is of a fixed size). I'd probably recommend calculating pool size as Runtime.getRuntime().availableProcessors() over using a dynamically growing and shrinking service for Logstash specifically.

Contributor

michaelklishin commented Jan 5, 2017

@micoq correct. The former is for cases when you only want to specify pool size and nothing else (the executor is of a fixed size). I'd probably recommend calculating pool size as Runtime.getRuntime().availableProcessors() over using a dynamically growing and shrinking service for Logstash specifically.

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jan 5, 2017

Contributor

Also worth pointing out that the setting is per ConnectionFactory and most projects only ever have one (or few). March Hare tries to provide an API that hides it from the user (some Ruby users experience a cultural shock when they see a class with a Factory in the name), effectively equating the number of factories to the number of connections.

I'd consider an alternative API for March Hare that would e.g. accept a connection factory. The next release will be 3.0.0, so it's a good chance to consider it.

Contributor

michaelklishin commented Jan 5, 2017

Also worth pointing out that the setting is per ConnectionFactory and most projects only ever have one (or few). March Hare tries to provide an API that hides it from the user (some Ruby users experience a cultural shock when they see a class with a Factory in the name), effectively equating the number of factories to the number of connections.

I'd consider an alternative API for March Hare that would e.g. accept a connection factory. The next release will be 3.0.0, so it's a good chance to consider it.

@micoq

This comment has been minimized.

Show comment
Hide comment
@micoq

micoq Jan 6, 2017

So, we could create a single ConnectionFactory for the whole Logstash process with only one thread pool for the consumer work service executor (with something like this: Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())).

Just one more question: I don't really understand the exact purpose of the consumer work service executor, does it handle the consumer I/O, serialization of AMQP frames, acks... ? In VisualVM, the threads from this pool (pool-XXX-thread-X) are almost always in Park state and very rarely in Running state (for about 4000 messages/s).

micoq commented Jan 6, 2017

So, we could create a single ConnectionFactory for the whole Logstash process with only one thread pool for the consumer work service executor (with something like this: Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())).

Just one more question: I don't really understand the exact purpose of the consumer work service executor, does it handle the consumer I/O, serialization of AMQP frames, acks... ? In VisualVM, the threads from this pool (pool-XXX-thread-X) are almost always in Park state and very rarely in Running state (for about 4000 messages/s).

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jan 6, 2017

Contributor

@micoq correct.

ConsumerWorkService handles dispatch of deliveries and other consumer methods (e.g. a server-sent basic.cancel) to consumers. There are two major requirements that shape its design:

  • We must provide a per-channel delivery ordering guarantee
  • Consumer methods, e.g. handleDelivery, must be able to invoke synchronous protocol operations (e.g. queue.declare) without causing deadlocks

I think you can find enough comments in the Java client code as well test examples to learn more.

Contributor

michaelklishin commented Jan 6, 2017

@micoq correct.

ConsumerWorkService handles dispatch of deliveries and other consumer methods (e.g. a server-sent basic.cancel) to consumers. There are two major requirements that shape its design:

  • We must provide a per-channel delivery ordering guarantee
  • Consumer methods, e.g. handleDelivery, must be able to invoke synchronous protocol operations (e.g. queue.declare) without causing deadlocks

I think you can find enough comments in the Java client code as well test examples to learn more.

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jan 6, 2017

Contributor

@micoq well, change your workload to do something more intensive/time consuming and you will see a different pool thread utilization picture.

Contributor

michaelklishin commented Jan 6, 2017

@micoq well, change your workload to do something more intensive/time consuming and you will see a different pool thread utilization picture.

@micoq

This comment has been minimized.

Show comment
Hide comment
@micoq

micoq Jan 6, 2017

@michaelklishin Thank you for the explanation. I could use the Java client directly in a custom application to charge the consumer threads. However, in Logstash, these threads never do an CPU-intensive job: They just put the paylod in a queue. The queue may block but in this case, the bottleneck are the worker threads which handle the filters (and the outputs) so all the CPU-intensives tasks (Groks, fields manipulations...) or the input thread themselves (payload deserialization, codec...) just after the queue if I don't make any mistake.

Anyway, the availableProcessors() pool size is a good compromise.
(I forgot to mention the thread stack is pretty heavy on JRuby with a default size of 2048kb).

micoq commented Jan 6, 2017

@michaelklishin Thank you for the explanation. I could use the Java client directly in a custom application to charge the consumer threads. However, in Logstash, these threads never do an CPU-intensive job: They just put the paylod in a queue. The queue may block but in this case, the bottleneck are the worker threads which handle the filters (and the outputs) so all the CPU-intensives tasks (Groks, fields manipulations...) or the input thread themselves (payload deserialization, codec...) just after the queue if I don't make any mistake.

Anyway, the availableProcessors() pool size is a good compromise.
(I forgot to mention the thread stack is pretty heavy on JRuby with a default size of 2048kb).

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jan 7, 2017

Contributor

@micoq I didn't say "CPU intensive", I said "time consuming."

I'll see what would a suitable connection method look like in March Hare soon.

Contributor

michaelklishin commented Jan 7, 2017

@micoq I didn't say "CPU intensive", I said "time consuming."

I'll see what would a suitable connection method look like in March Hare soon.

@micoq

This comment has been minimized.

Show comment
Hide comment
@micoq

micoq Jan 8, 2017

@michaelklishin Yes, I just said "CPU intensive" compared to the job done by the workers threads in Logstash.

micoq commented Jan 8, 2017

@michaelklishin Yes, I just said "CPU intensive" compared to the job done by the workers threads in Logstash.

@andrewvc

This comment has been minimized.

Show comment
Hide comment
@andrewvc

andrewvc Feb 9, 2017

Contributor

@micoq thanks for the excellent sleuthing. We definitely should fix this issue.

@michaelklishin I would love to see that option added. We'd use a single global executor for all of logstash, with perhaps #cpu_cores slots allocated across all rabbitmq input instances. WDYT of that idea?

Contributor

andrewvc commented Feb 9, 2017

@micoq thanks for the excellent sleuthing. We definitely should fix this issue.

@michaelklishin I would love to see that option added. We'd use a single global executor for all of logstash, with perhaps #cpu_cores slots allocated across all rabbitmq input instances. WDYT of that idea?

@micoq

This comment has been minimized.

Show comment
Hide comment
@micoq

micoq Feb 12, 2017

Without touching the march_hare code, could it be possible to share an unique connection with multiple consumers (if all the rabbitmq input instances have to the same server/port/credentials) e.g. by caching the connection among the input instances ?

In the ideal case, the input thread and the consumer thread could also be merged into an unique thread (so the internal_queue between the two threads could be removed). However, in practice, the consumer thread is declared as a callback and the run() method can't return before Logstash stops.

Meanwhile, I discovered an issue in Logstash 5.1.1 where too many threads can severely slow down the whole pipeline : see here. If the monitoring can't be disabled, I think it should be good to reuse/share the threads as much as possible across the JVM.

micoq commented Feb 12, 2017

Without touching the march_hare code, could it be possible to share an unique connection with multiple consumers (if all the rabbitmq input instances have to the same server/port/credentials) e.g. by caching the connection among the input instances ?

In the ideal case, the input thread and the consumer thread could also be merged into an unique thread (so the internal_queue between the two threads could be removed). However, in practice, the consumer thread is declared as a callback and the run() method can't return before Logstash stops.

Meanwhile, I discovered an issue in Logstash 5.1.1 where too many threads can severely slow down the whole pipeline : see here. If the monitoring can't be disabled, I think it should be good to reuse/share the threads as much as possible across the JVM.

@andrewvc

This comment has been minimized.

Show comment
Hide comment
@andrewvc

andrewvc Feb 13, 2017

Contributor

@micoq I think that kind of connection sharing makes a ton of sense. In the long run I'd like to make it explicit via new additions to the logstash grammar, but in the meantime I think it makes sense for plugins to detect this.

I would say the best way to do this would be for the rabbitmq_connection mixin to set class variables in a special namespace as singletons and manage that pool.

I'm glad to tackle that, but given some other priorities that change would be at least a month away. Do you have any interest in tackling that patch? I'm glad to give speedy PR reviews.

Contributor

andrewvc commented Feb 13, 2017

@micoq I think that kind of connection sharing makes a ton of sense. In the long run I'd like to make it explicit via new additions to the logstash grammar, but in the meantime I think it makes sense for plugins to detect this.

I would say the best way to do this would be for the rabbitmq_connection mixin to set class variables in a special namespace as singletons and manage that pool.

I'm glad to tackle that, but given some other priorities that change would be at least a month away. Do you have any interest in tackling that patch? I'm glad to give speedy PR reviews.

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Feb 13, 2017

Contributor

In case it would be necessary (or more convenient) to extend March Hare to expose the new Java client settings, let me know. I plan to work on a March Hare release based on the 4.x Java client quite soon.

Contributor

michaelklishin commented Feb 13, 2017

In case it would be necessary (or more convenient) to extend March Hare to expose the new Java client settings, let me know. I plan to work on a March Hare release based on the 4.x Java client quite soon.

@micoq

This comment has been minimized.

Show comment
Hide comment
@micoq

micoq Feb 17, 2017

@andrewvc I made a small patch on the mixin rabbitmq_connection.
I don't know if it's the best solution but I hope it can help a bit :)

There's a new parameter connection_pool_name to provide a custom name to group all the rabbitmq input/outputs instances with the same name in a single connection (when it's possible). By default, this parameter is nil so there will be no grouping (like the legacy behavior).Each thread have his own channel (by following the recommendation of the march hare documentation.

After some tests, the best solution seems to have a single connection for the inputs and another connection for the outputs (on a single RabbitMQ). I think the consumers can be slowed down by the whole connection (in flow state).

@michaelklishin I didn't have to touch march hare for this but for our earlier discussion it could be a good idea to let the user pass an already initialized executor to the self.connect method (and by default, to use an executor with Runtime.getRuntime().availableProcessors() threads).

micoq commented Feb 17, 2017

@andrewvc I made a small patch on the mixin rabbitmq_connection.
I don't know if it's the best solution but I hope it can help a bit :)

There's a new parameter connection_pool_name to provide a custom name to group all the rabbitmq input/outputs instances with the same name in a single connection (when it's possible). By default, this parameter is nil so there will be no grouping (like the legacy behavior).Each thread have his own channel (by following the recommendation of the march hare documentation.

After some tests, the best solution seems to have a single connection for the inputs and another connection for the outputs (on a single RabbitMQ). I think the consumers can be slowed down by the whole connection (in flow state).

@michaelklishin I didn't have to touch march hare for this but for our earlier discussion it could be a good idea to let the user pass an already initialized executor to the self.connect method (and by default, to use an executor with Runtime.getRuntime().availableProcessors() threads).

@jakauppila

This comment has been minimized.

Show comment
Hide comment
@jakauppila

jakauppila Apr 11, 2017

I was curious where this was left off, we also connect to a number of RabbitMQ instances from our indexers that would likely benefit from having this in place.

jakauppila commented Apr 11, 2017

I was curious where this was left off, we also connect to a number of RabbitMQ instances from our indexers that would likely benefit from having this in place.

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Apr 11, 2017

Contributor

@jakauppila I haven't looked at a new March Hare version but would definitely find some time to review/guide a PR that adds support for the new Java client options. Integrating them into the Logstash plugins then should be pretty straightforward.

Contributor

michaelklishin commented Apr 11, 2017

@jakauppila I haven't looked at a new March Hare version but would definitely find some time to review/guide a PR that adds support for the new Java client options. Integrating them into the Logstash plugins then should be pretty straightforward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment