Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed-size thread pool leaks on connection recovery #97

Closed
ivoanjo opened this issue Sep 19, 2016 · 14 comments · Fixed by #101
Closed

Fixed-size thread pool leaks on connection recovery #97

ivoanjo opened this issue Sep 19, 2016 · 14 comments · Fixed by #101

Comments

@ivoanjo
Copy link
Contributor

ivoanjo commented Sep 19, 2016

When using the thread_pool_size option or specifying an executor_factory, march hare creates a new pool every time it connects to rabbitmq.

This is the right and expected behavior when the first connection is made, but when the connection is lost and march hare tries to retry it, then the Session#automatically_recover method calls either Session#new_uri_connection_impl or Session#new_connection_impl both of which create a new thread pool and a new connection.

This means that the previous thread pool is never shutdown, and thus means that the system slowly leaks threads until it cannot allocate any more if there are intermittent network issues that make the recovery behavior kick in regularly.

A simple example that shows this is:

require 'march_hare'

connection = MarchHare.connect(
  uri:                'amqp://guest:guest@localhost:5672/%2F',
  thread_pool_size:   20,
  automatic_recovery: true,
)

exchange_name = 'footest'
queue_name = 'foo-consumer'

channel = connection.create_channel
_exchange = channel.topic(exchange_name, auto_delete: false, durable: true)
channel.close

20.times do
  channel = connection.create_channel

  queue = channel.queue(queue_name, durable: true, auto_delete: false)
  queue.bind(exchange_name, routing_key: queue_name)

  queue.subscribe(block: false, manual_ack: true) { }
end

puts "Listening to queue. Now stop and restart the rabbitmq server to see thread leaks"

while true
  puts "Thread count is #{java::lang::Thread.get_all_stack_traces.key_set.size}"
  sleep 2
end

By executing this script and just quitting and restarting rabbitmq I see:

Listening to queue. Now stop and restart the rabbitmq server to see thread leaks
Thread count is 27
...
Thread count is 26 # <-- connection lost
...
Thread count is 47 # <-- connection reopened
...
Thread count is 46
Thread count is 45 # <-- connection lost (not sure why two threads)
...
Thread count is 67 # <-- connection reopened

I believe the fix would be to call maybe_shut_down_executor after the sleep on line https://github.com/ruby-amqp/march_hare/blob/master/lib/march_hare/session.rb#L249 but still need to investigate it a bit more before being sure.

@michaelklishin
Copy link
Member

Feel free to submit a PR once you are more certain :)

On 19 Sep 2016, at 18:59, Ivo Anjo notifications@github.com wrote:

When using the thread_pool_size option or specifying an executor_factory, march hare creates a new pool every time it connects to rabbitmq.

This is the right and expected behavior when the first connection is made, but when the connection is lost and march hare tries to retry it, then the Session#automatically_recover method calls either Session#new_uri_connection_impl or Session#new_connection_impl both of which create a new thread pool and a new connection.

This means that the previous thread pool is never shutdown, and thus means that the system slowly leaks threads until it cannot allocate any more if there are intermittent network issues that make the recovery behavior kick in regularly.

A simple example that shows this is:

require 'march_hare'

connection = MarchHare.connect(
uri: 'amqp://guest:guest@localhost:5672/%2F',
thread_pool_size: 20,
automatic_recovery: true,
)

exchange_name = 'footest'
queue_name = 'foo-consumer'

channel = connection.create_channel
_exchange = channel.topic(exchange_name, auto_delete: false, durable: true)
channel.close

20.times do
channel = connection.create_channel

queue = channel.queue(queue_name, durable: true, auto_delete: false)
queue.bind(exchange_name, routing_key: queue_name)

queue.subscribe(block: false, manual_ack: true) { }
end

puts "Listening to queue. Now stop and restart the rabbitmq server to see thread leaks"

while true
puts "Thread count is #{java::lang::Thread.get_all_stack_traces.key_set.size}"
sleep 2
end
By executing this script and just quitting and restarting rabbitmq I see:

Listening to queue. Now stop and restart the rabbitmq server to see thread leaks
Thread count is 27
...
Thread count is 26 # <-- connection lost
...
Thread count is 47 # <-- connection reopened
...
Thread count is 46
Thread count is 45 # <-- connection lost (not sure why two threads)
...
Thread count is 67 # <-- connection reopened
I believe the fix would be to call maybe_shut_down_executor after the sleep on line https://github.com/ruby-amqp/march_hare/blob/master/lib/march_hare/session.rb#L249 but still need to investigate it a bit more before being sure.


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub, or mute the thread.

@ivoanjo
Copy link
Contributor Author

ivoanjo commented Sep 19, 2016

Will do! I actually ended up going with a workaround where I pass a custom executor_factory during connect:

connection = MarchHare.connect(..., executor_factory: CachedWorkaroundFactory.new(size))

and its implementation

class CachedWorkaroundFactory
  def initialize(size)
    @pool = MarchHare::ThreadPools.fixed_of_size(size)
  end

  def call
    @pool
  end
end

This tricks march hare into reusing the same thread pool, instead of shutting down a pool and then booting up another, which is generally faster and more efficient (shutting down the pool has the advantage that any submitted tasks may still take some time to finish so you may still have double the amount of threads you expected processing concurrently for a time).

I am a bit unsure why exactly march hare doesn't do this by default and always creates a new pool. The code for this was added in 74ef8c7 in early 2014 so I'm guessing that at that time the pool was only ever created once (as there was no connection recovery) and that this decision was never revisited when connection recovery was added.

Is that what happened, or am I missing some important detail in how the pool is used that means that it should not be reused in such a way?

@michaelklishin
Copy link
Member

Reusing a pool is not generally safe.

On 19 Sep 2016, at 23:46, Ivo Anjo notifications@github.com wrote:

Will do! I actually ended up going with a workaround where I pass a custom executor_factory during connect:

connection = MarchHare.connect(..., executor_factory: CachedWorkaroundFactory.new(size))
and its implementation

class CachedWorkaroundFactory
def initialize(size)
@pool = MarchHare::ThreadPools.fixed_of_size(size)
end

def call
@pool
end
end
This tricks march hare into reusing the same thread pool, instead of shutting down a pool and then booting up another, which is generally faster and more efficient (shutting down the pool has the advantage that any submitted tasks may still take some time to finish so you may still have double the amount of threads you expected processing concurrently for a time).

I am a bit unsure why exactly march hare doesn't do this by default and always creates a new pool. The code for this was added in 74ef8c7 in early 2014 so I'm guessing that at that time the pool was only ever created once (as there was no connection recovery) and that this decision was never revisited when connection recovery was added.

Is that what happened, or am I missing some important detail in how the pool is used that means that it should not be reused in such a way?


You are receiving this because you commented.
Reply to this email directly, view it on GitHub, or mute the thread.

@michaelklishin
Copy link
Member

Using a cached executor service is a better option than what you have: it won't run in most issues with thread pool reuse but inactive threads will be cleaned up after 60 seconds of inactivity.

@michaelklishin
Copy link
Member

My guess is that we need to make sure consumer work pool is shut down before performing recovery. The problem is, there is no easy way to pick a timeout for the currently running threads to finish before forcing shutdown with ExecutorService#shutdownNow.

@ivoanjo
Copy link
Contributor Author

ivoanjo commented Sep 20, 2016

I'm not sure reusing a pool is a problem in this case. According to https://www.rabbitmq.com/api-guide.html in the section "Consumer thread pool" they state that

When the connection is closed a default ExecutorService will be shutdown(), but a user-supplied ExecutorService (like es above) will not be shutdown(). Clients that supply a custom ExecutorService must ensure it is shutdown eventually (by calling its shutdown() method), or else the pool’s threads may prevent JVM termination.

So this means that the rabbitmq Java client will not "harm" or make the pool unusable in any way.
Furthermore, they say that:

The same executor service may be shared between multiple connections, or serially re-used on re-connection but it cannot be used after it is shutdown().

Since the pool is created by march hare using the factory, and unless someone implements a factory that leaks the pool in other ways, it means that march hare is the sole user of that pool. This is what led me to the "cached pool" workaround I suggested.

@michaelklishin
Copy link
Member

See above. Both in the Java client and March Hare, it is not generally safe to reuse a pool because there can be things that are waiting to shut it down. This may or may not be a common occurrence in practice, that's why my comment above says "generally."

Anyhow, the only solution I see at the moment is listed above.

@ivoanjo
Copy link
Contributor Author

ivoanjo commented Sep 27, 2016

I'm still a bit unconvinced about the unsafety of reusing pools, so I've been looking at both the java rabbitmq implementation, and march_hare, in an attempt to confirm or disprove my thesis. This is my attempt to document my findings.

So starting with the java rabbitmq client:

How can a thread pool can be made invalid?

Looking at the ExecutorService documentation, the answer seems to be:

Where are pools shut down?

The two lines of code above are all the shutdown()s I could find in the entire codebase, and I could find no shutdownNow(). So this seems to me to be consistent with what is documented at https://www.rabbitmq.com/api-guide.html , that the java rabbitmq client only shuts down its own pool, and preserves existing pools that it receives.

And important detail here is that any threads already working on work items will not observe that the connection is gone and will still keep on working on those tasks. Note that this only happens to ongoing work, since new work is not going to be picked up, as that happens in the WorkPoolRunnable and as soon as a connection is shut down, the WorkPool is emptied (in https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java#L50) and thus any runnables still queued in the thread pool (if the pool has queueing capabilities) will wake up to find an empty pool and then immediately mark the task as complete.

So, as far as the Java rabbimq library goes, I was able to find no evidence to suggest that any client directly using the Java rabbitmq library would find issues with reusing thread pools, other than the detail of, whenever a thread pool is reused, some number of threads in the pool may still be occupied with working on tasks from the previous connection, but will become available as soon as they finish them.

So I am led to believe that if pools are made unusable, this will happen inside march_hare's codebase and not here, so the next steps in my investigation is find where that happens and from then decide how hard it would be to change that to make reuse a safe operation vs changing march_hare to always shut down the pool and start anew.

(So, stay tuned for the next episode ;) )

@michaelklishin
Copy link
Member

Thanks for the fairly detailed investigation. I certainly don't expect the issue to be in the Java client.

The goal should not be ExecutorService reusal. The goal is to close those ES'es that are no longer used (e.g. because we know for sure their connection is closing).
#101 is a good attempt that doing that.

@ivoanjo
Copy link
Contributor Author

ivoanjo commented Sep 27, 2016

I'm a bit puzzled: can you expand on the why is the goal not to reuse? A single march hare connection would still have a single executor, it's just that the multiple underlying java rabbitmq connections that are being wrapped would share the same executor---e.g. different march hare connections for sure should have different executors, but conceptually having multiple executors for the same march hare connection seems unnecessary.

Are you really against that approach? If I did the work to make sure it is safe, and submitted a PR, would you still not like it? I'm trying to understand the underlying design decision here, as I'm still a bit confused about this choice.

@nbekirov
Copy link

Still confused about all this...

If I have thread_pool_size configured I'm unable to properly control the automatic_recovery and my program termination in general.

Please tell me is this intentional:

require 'march_hare'

con = MarchHare.connect(
    automatic_recovery: false,
    uri: 'amqp://guest:guest@localhost',
    thread_pool_size: 5 # this line prevents JVM termination
)

con.create_channel # note: I need this channel to be created in order to replicate
  • Expected result
    The program should terminate when connection is broken.

  • Actual result
    The program stays alive indefinitely.

@michaelklishin
Copy link
Member

Consumer dispatch pool size is orthogonal to connection recovery. If you provide a custom executor to the Java client, you are responsible for shutting it down so that the JVM can terminate cleanly
but that's the only relatively non-obvious thing I can think of.

When automatic recovery isn't used, a connection termination should not necessarily result in a JVM termination. Use VisualVM or jstack or similar to take a thread dump and see what threads are alive and running.

Discussions belong to the mailing list (rabbitmq-users is fine).

@ivoanjo
Copy link
Contributor Author

ivoanjo commented May 18, 2017

@nbekirov your problem may be because march hare's thread pool uses non-daemon threads.

On the JVM, threads can be daemon or non-daemon. The only difference is that the JVM terminates when all non-daemon threads finished.

So this means that if/while march hare's threads are live, your app will not exit.

@michaelklishin
Copy link
Member

Java client certainly allows you to provide a custom ExecutorService (thread pool) and even thread factory. I'm not sure how much of this was exposed via March Hare and documented because in many cases it's not something the user has to think about much.

I will take a look at the state of those settings. IIRC you can even provide a custom ConnectionFactory, which means you control everything there is ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants