Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When listeners are getting crashed, it is failing to recover by itself. #126

Open
Naren1997 opened this issue May 12, 2020 · 19 comments
Open

Comments

@Naren1997
Copy link

Naren1997 commented May 12, 2020

When the Kakfa server is down, the listeners are getting crashed and it is unable to recover by itself, giving me the following error..

{:message=>"Listener crashed, waiting 19.22s (undefined method join' for nil:NilClass)", :listener_id=>"438526", :retry_count=>4, :waiting_time=>19.22, :exception_class=>"NoMethodError", :exception_message=>"undefined method join' for nil:NilClass", :backtrace=>["/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/ruby-kafka-0.7.5/lib/kafka/fetcher.rb:66:in stop'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/ruby-kafka-0.7.5/lib/kafka/consumer.rb:113:in stop"

@Naren1997 Naren1997 changed the title When listeners is getting crashed, it is failing to recover by itself. When listeners are getting crashed, it is failing to recover by itself. May 12, 2020
@dorner
Copy link
Contributor

dorner commented May 12, 2020

@Naren1997 can you post the full backtrace? These errors seem to be in the underlying RubyKafka library rather than Phobos.

@Naren1997
Copy link
Author

@dorner Here is the full backtrace,

ERROR: {:message=>"Listener crashed, waiting 23.78s (undefined method join' for nil:NilClass)", :listener_id=>"bcb1bc", :retry_count=>4, :waiting_time=>23.78, :exception_class=>"NoMethodError", :exception_message=>"undefined method join' for nil:NilClass", :backtrace=>["/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/ruby-kafka-0.7.5/lib/kafka/fetcher.rb:66:in stop'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/ruby-kafka-0.7.5/lib/kafka/consumer.rb:113:in stop'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/phobos-1.8.0/lib/phobos/listener.rb:83:in block in start'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/phobos-1.8.0/lib/phobos/instrumentation.rb:19:in block in instrument'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/activesupport-5.2.1/lib/active_support/notifications.rb:170:in instrument'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/phobos-1.8.0/lib/phobos/instrumentation.rb:18:in instrument'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/phobos-1.8.0/lib/phobos/listener.rb:80:in ensure in start'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/phobos-1.8.0/lib/phobos/listener.rb:80:in start'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/phobos-1.8.0/lib/phobos/executor.rb:79:in run_listener'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/phobos-1.8.0/lib/phobos/executor.rb:45:in block (2 levels) in start'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/concurrent-ruby-1.1.3/lib/concurrent/executor/ruby_thread_pool_executor.rb:348:in run_task'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/concurrent-ruby-1.1.3/lib/concurrent/executor/ruby_thread_pool_executor.rb:337:in block (3 levels) in create_worker'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/concurrent-ruby-1.1.3/lib/concurrent/executor/ruby_thread_pool_executor.rb:320:in loop'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/concurrent-ruby-1.1.3/lib/concurrent/executor/ruby_thread_pool_executor.rb:320:in block (2 levels) in create_worker'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/concurrent-ruby-1.1.3/lib/concurrent/executor/ruby_thread_pool_executor.rb:319:in catch'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/concurrent-ruby-1.1.3/lib/concurrent/executor/ruby_thread_pool_executor.rb:319:in block in create_worker'", "/opt/IBM/cobalt/sla.25.4/messenger/vendor/bundle/ruby/2.4.0/gems/logging-2.2.2/lib/logging/diagnostic_context.rb:474:in `block in create_with_logging_context'"]}

@dorner
Copy link
Contributor

dorner commented May 13, 2020

@Naren1997 yeah I can't see anything in Phobos that would cause the fetcher to suddenly lose its thread. Maybe try posting this issue in https://github.com/zendesk/ruby-kafka ?

@Naren1997
Copy link
Author

@dorner Okay, I'll check in ruby-kafka. What would be an ideal time interval between listener restarts? Currently I've 10 seconds, will it be due to this?

@dorner
Copy link
Contributor

dorner commented May 13, 2020

It would totally depend on your broker and consumer settings, and would also depend on why you're crashing in the first place. :)

@austinmoore
Copy link

austinmoore commented Oct 21, 2020

We've been having this problem sporadically as well when a kafka broker restarts. I looked into the code of phobos and ruby-kafka and I have a theory of why it occurs. In short, I think the underlying problem is a race condition. I'm just not sure if ruby-kafka is supporting the threading case that phobos is performing.

Here's what I think occurs in this order in these threads:

  1. (Thread1) phobos runner starts the phobos executor
  2. (Thread1) phobos executor iterates over the listeners and uses the thread pool to start each listener in its own thread
  3. (Thread2) phobos executor runs the listener in its own thread via the thread pool
  4. (Thread2) phobos listener calls ruby-kafka consume_each_* method
  5. (Thread2) ruby-kafka consumer starts the ruby-kafka fetcher
  6. (Thread2) ruby-kafka fetcher sets @running = true. It has not initialized @thread yet though!
  7. (Thread1) phobos executor is given the signal to stop so it iterates over all listeners and tells them to stop. I'm not exactly sure why this occurs in the case of a kafka broken restart, but I'm pretty sure that this or something very similar happens.
  8. (Thread1) phobos listener stops the ruby-kafka consumer
  9. (Thread1) ruby-kafka consumer stops the ruby-kafka fetcher. Note: we started the ruby-kafka fetcher in Thread2, but we are stopping it in Thread1.
  10. (Thread1) ruby-kafka fetcher checks that it is @running. It is running, but remember that @thread has not been initialized yet!
  11. (Thread1) ruby-kafka fetcher attempts to @thread.join, but since @thread is nil, it raises undefined method 'join' for nil:NilClass.

I'm not 100% sure this is what is happening, but I think that it is very likely. A "cheap" solution to this problem is for ruby-kafka fetcher to call @thread&.join instead of @thread.join. Then it won't raise an error if @thread is nil. I tried this out in the following monkey patch and it seemed to work. However, I was locally not able to reproduce the race condition without the monkey patch.

require 'kafka/fetcher'
module KafkaFetcherExt
  # @see https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/fetcher.rb#L66
  def stop
    return unless @running
    @commands << [:stop, []]
    @thread&.join # Here is the fix. @thread is sometimes, somehow nil.
  end
end
Kafka::Fetcher.prepend(KafkaFetcherExt)

https://github.com/zendesk/ruby-kafka#thread-safety

Furthermore, you should avoid using threads in a consumer unless you're very careful about waiting for all work to complete before returning from the #each_message or #each_batch block.

I'm not sure the "cheap" fix from above is the "correct" fix though. This is why I said, I'm "not sure if ruby-kafka is supporting the threading case that phobos is performing". Is it ok for phobos to start the ruby-kafka fetcher in one thread and stop it in another? In other words, is it ok for the phobos executor to stop the listeners explicitly or would it be (thread) safer to send a signal to the threads to stop which will in turn stop the listener and therefore in turn stop the ruby-kafka consumer & fetcher.

To implement such a solution I think the following would need to be done:

As a side note, I think it's great that phobos is using threads instead of a separate process per consumer. In our project we have lot's of little consumers that run sporadically. Letting them run in a single process is much more memory efficient in our case. A big thanks to the maintainers of this gem. We enjoy using it and can definitely recommend it.

@dorner
Copy link
Contributor

dorner commented Oct 21, 2020

Hey @austinmoore!

I think you might be misreading the situation in terms of the quote you added from the ruby-kafka docs. That's talking about creating threads within your consumer, which we don't do (we have separate consumers per thread). It also says you shouldn't share a Kafka client between threads, which again we don't do.

I think probably the best solution would be to move the @running = true line from before the thread is initialized to after it's initialized. That would avoid these crashes as I don't think there are any other adverse effects.

@dorner
Copy link
Contributor

dorner commented Oct 21, 2020

I do see your point about stopping the listeners from the main thread. I'm not sure what the ramifications are of that, but I can say we've been using Phobos in production for about three years now and have yet to see any real issues coming out of it.

@reachlin
Copy link

is this problem fixed?

@dorner
Copy link
Contributor

dorner commented Sep 10, 2021

I don't think so. I'm not sure if anyone has actually opened an issue on the ruby-kafka library though, as that's where the problem lies.

@yin-ym
Copy link

yin-ym commented Sep 10, 2021

I don't agree that this is a pure ruby-kafka issue, I think there should be something we can do here for this retry logic: https://github.com/phobos/phobos/blob/master/lib/phobos/executor.rb#L71 it is keeping using the original listener object for all the way to retry, but if there is any fundamental issues in that listener object, it won't be fixed by retry, and the retry is just useless at all. Endless loop with repeating the previous error, just like what is reported here.

@dorner
Copy link
Contributor

dorner commented Sep 10, 2021

It's not easy to try and recreate the listener from scratch unfortunately. If you have any ideas or proposals, I'd be happy to review a PR for them.

@reachlin
Copy link

reachlin commented Dec 17, 2021

endless retrying, in this case 118, even the kafka server is ok. if we can fix the retry, then we won't see this problem on stop, i think.

[2021-12-17T00:45:57:708+0000Z] ERROR -- Phobos : <Hash> {:message=>"Listener crashed 
 waiting 61.95s (undefined method `join' for nil:NilClass)" 
 :listener_id=>"586eef" 
 :retry_count=>118 
 :waiting_time=>61.95 
 :exception_class=>"NoMethodError" 
 :exception_message=>"undefined method `join' for nil:NilClass" 
 :backtrace=>["/usr/local/bundle/gems/ruby-kafka-1.3.0/lib/kafka/fetcher.rb:69:in `stop'" 
 "/usr/local/bundle/gems/ruby-kafka-1.3.0/lib/kafka/consumer.rb:130:in `stop'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/listener.rb:113:in `block in stop_listener'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/instrumentation.rb:21:in `block in instrument'" 
 "/usr/local/bundle/gems/activesupport-6.0.4.1/lib/active_support/notifications.rb:180:in `block in instrument'" 
 "/usr/local/bundle/gems/activesupport-6.0.4.1/lib/active_support/notifications/instrumenter.rb:24:in `instrument'" 
 "/usr/local/bundle/gems/activesupport-6.0.4.1/lib/active_support/notifications.rb:180:in `instrument'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/instrumentation.rb:20:in `instrument'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/listener.rb:110:in `stop_listener'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/listener.rb:60:in `ensure in start'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/listener.rb:60:in `start'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/executor.rb:72:in `run_listener'" 
 "/usr/local/bundle/gems/phobos-2.1.0/lib/phobos/executor.rb:31:in `block (2 levels) in start'" 
 "/usr/local/bundle/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:363:in `run_task'" 
 "/usr/local/bundle/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:352:in `block (3 levels) in create_worker'" 
 "/usr/local/bundle/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:335:in `loop'" 
 "/usr/local/bundle/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:335:in `block (2 levels) in create_worker'" 
 "/usr/local/bundle/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:334:in `catch'" 
 "/usr/local/bundle/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:334:in `block in create_worker'" 
 "/usr/local/bundle/gems/logging-2.3.0/lib/logging/diagnostic_context.rb:474:in `block in create_with_logging_context'"]}

@reachlin
Copy link

@dorner can we add a retry up limit? because most will run this in a container, if the container stops. the container will be started again. but if phobos keeps this kind of retry failures to itself, the container won't be rebooted.

@reachlin
Copy link

proposed pr, #144

@dorner
Copy link
Contributor

dorner commented Dec 17, 2021

Hmm... I'm not sure this is the right approach. If one of your listeners crashes, does that mean you have to shut down the entire container? In particular it means that any listeners that are still live won't be able to shut down properly, meaning you're likely to get into a big rebalancing loop for a period of time until you can be stable again.

I am still convinced that the right solution is to fix the underlying ruby-kafka library, and I'm not sure why out of everyone commenting on this thread and suggesting fixes, as far as I can tell no one has attempted to do so. 🤔 From what I can tell we should at least be able to stop the crash by adding a simple null check.

@reachlin
Copy link

@dorner thanks. how about now? recreate the client when retrying? https://github.com/phobos/phobos/pull/144/files

@dorner
Copy link
Contributor

dorner commented Dec 20, 2021

It's a better approach, but it'll take more than recreating the client I think? The consumer was created with the old client and I'm not sure if just changing the reference to a new client will fix anything or break things worse.

@milosivanovic
Copy link

@reachlin do you know why your PR was closed? Also, were you able to test your fix to see if it was able to handle the retry better?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants