Skip to content
This repository has been archived by the owner on Sep 11, 2022. It is now read-only.

Heartbeats broken for long running tasks (Fix included) #43

Closed
sadowski opened this issue Apr 15, 2011 · 18 comments
Closed

Heartbeats broken for long running tasks (Fix included) #43

sadowski opened this issue Apr 15, 2011 · 18 comments
Assignees
Milestone

Comments

@sadowski
Copy link

Right now the init_heartbeat method (lib/amqp/client.rb) looks something like this:

def init_heartbeat
  @last_server_heartbeat = Time.now

  @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
    if connected?
      if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
        log "Reconnecting due to missing server heartbeats"
        reconnect(true)
      else
        send AMQP::Frame::Heartbeat.new
      end
    end
  end
end

However, when you send the hearbeat frame you don't update the @last_server_heartbeat variable. That causes delivery tags for long running jobs to be reset, and all sorts of other problems. You can fix this by updating the @last_server_heartbeat time when you send the heartbeat frame. Like so:

def init_heartbeat
  @last_server_heartbeat = Time.now

  @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
    if connected?
      if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
        log "Reconnecting due to missing server heartbeats"
        reconnect(true)
      else
        @last_server_heartbeat = Time.now
        send AMQP::Frame::Heartbeat.new
      end
    end
  end
end
@michaelklishin
Copy link
Member

Thanks. I want to finish with 0.8.0 implementation first (likely to go to ruby-amq/amq-client) and then pull down your changes for 0.7.

@ghost ghost assigned michaelklishin Apr 17, 2011
@botanicus
Copy link
Contributor

I already did https://github.com/ruby-amqp/amqp/tree/0.7.2 Specs are OK, so I guess we can push 0.7.2 to RubyGems.org?

@jpr5
Copy link
Contributor

jpr5 commented Apr 19, 2011

I don't think this is correct, but I don't understand the issue about delivery tags. Can you elaborate more?

I think the only place @last_server_heartbeat should be set after initialization is when the server acknowledges the Heartbeat Frame. Since the server's view of the world is the relevant one (nothing matters if it doesn't accept our messages), I don't think the library and its consumers should be fooled into a false sense of correctness (heartbeat interval satisfied) just because the Frame was sent.

For example, if commands are out of sync, or a previously sent Frame is already occupying the connection/Channel, the RabbitMQ server isn't going to recognize/acknowledge the Heartbeat frame, and it will still drop the connection on us anyway. Better that connection reacquisition happens on our own terms than as a reaction to an unexpected event, which has other potentially bad ramifications -- see #40 (comment)

In addition, unless the subscribe block is itself non-blocking, the entire EM reactor will block on completion of the closure. Thus, for what I perceive to be the common case:

queue.subscribe do |h, msg|  
    do_stuff_that_takes_longer_than_heartbeat_times_2(); 
end 

this change may not matter because the PeriodicTimer won't fire until the EM is released.

I have a patch for something that sounds similar on the surface, though. I'll put together a branch and send a pull request now.

@sadowski
Copy link
Author

Basically what I am seeing is that after the first heartbeat or so, the client issues a reconnect(true). If there were any messages that the client had received before the reconnect, and ack'ed after the reconnect, and error would get raised. For example:

Using Ruby 1.8.7
message 1 started
message 1 completed
message 2 started
message 2 completed
message 3 started
Sending Heartbeat...
message 3 completed
message 4 started
message 4 completed
message 5 started
Reconnecting!
message 5 completed
COMMAND_INVALID - unknown delivery tag 5 in AMQP::Protocol::Basic::Ack
Sending Heartbeat...

and the test code I used to generate this:

require 'rubygems'
require 'eventmachine'
require 'amqp'
puts "Using Ruby #{RUBY_VERSION}"

conf={
    :user => 'guest',
    :pass => 'guest',
    :host => '127.0.0.1',
    :port => 5672,
    :heartbeat => 1
}

n=1

AMQP.start(conf) do 
    work_channel = AMQP::Channel.new
    work_channel.prefetch(1)
    
    work_q = work_channel.queue('test1')
    work_q.purge

    # Non-blocking code that does 30.times{ |n| work_q.publish("message #{n}") }
    pub = Proc.new do
        work_q.publish("message #{n}")
        EM.next_tick(pub) if n < 30
        n+=1
    end
    EM.next_tick(pub)

    
    work_q.subscribe(:ack => true) do |header, message|
        op = proc{ puts "#{message} started"; sleep(rand()); puts "#{message} completed" }
        cb = proc{ header.ack }
        
        EM.defer(op, cb)
    end
    
end

(I also made a quick modification to the AMQP::Client module to output the "Sending Heartbeat..." and "Reconnecting!" text.)

Requiring acknowledgment, a heartbeat interval, and having a random delay between jobs all seem to be required in order to make this bug appear. I have no idea why the random delay matters, but I can't reproduce it with a constant delay. Perhaps you will give this code a try and see if you are able to produce the same results?

@jpr5
Copy link
Contributor

jpr5 commented Apr 19, 2011

Well, off the cuff, I don't see why you'd get that result. I ran it several times without any problem. The heartbeat interval is pretty short, but the server and client will both wait 2 * heartbeat interval before dropping the connection.

What version of ruby-amqp/amqp are you using (I'm using something equivalent to 0.7.2)? What version of RabbitMQ is being used (locally I'm using 2.3.1, and in production we use 1.7.2)?

When I debug AMQP problems like this, I enable logging + a monkeypatch to inspect what's crossing the wire. Add this between n=1 and AMQP.start():

conf[:logging] = true
module AMQP::Client; def log(*args) warn Time.now.strftime("%Y-%m-%d %H:%M:%S ") + args.inspect end end
class AMQP::Channel; def log(*args) end end

I'd be curious to know if you can see anything amiss. Maybe pop the output into a gist? Might be worthy of a separate support issue.

@michaelklishin
Copy link
Member

I think that setting @last_server_heartbeat time when you send the heartbeat frame is just shoveling the real issue (event loop being blocked) under the rug. We need to find a way to ensure hearbeats delivery even if application code tries hard to block the event loop.

@botanicus
Copy link
Contributor

I think that setting @last_server_heartbeat time when you send the heartbeat frame is just shoveling the real issue (event loop being blocked) under the rug. We need to find a way to ensure hearbeats delivery even if application code tries hard to block the event loop.

I agree. Theoretically running another EM loop in a separate thread might help(?)

@jpr5
Copy link
Contributor

jpr5 commented Apr 19, 2011

This is something I have spent a lot amount of time on, and is why I know so much about RMQ's parallelism capability (or lack thereof).

All deferred EM operations get run on an internal pool of threads, but the main EM::run loop invocations themselves run on the same thread that PeriodicTimers are invoked from. So unless you use EM::defer in your subscribe blocks, there is no chance for the PeriodicTimer to fire while your subscribe block is active.

I myself settled on a 'subscribe do ... end' block design that set up callback/errback pairs only. That was sufficient to get the Heartbeats to fire in parallel and make everything more non-blocking in general BUT this is how I discovered that RMQ doesn't seem to respond to multiple commands at the same time. I specifically observed: after the server delivers a message to the client for consumption, while the client is operating on the message, and before the client responds with an ACK, the server will not respond to any other commands, Heartbeat or otherwise.

The first change I made was that "Heartbeat on Channel 0" patch, to see if the server would multiplex Frame processing across different Channels. This did not work - the server didn't respond to the Heartbeat on Channel 0, and still dropped the connection when 2*heartbeat_interval expired. Empirically, this is part of what leads me to believe that RMQ doesn't actually handle out-of-order packets.

So, I haven't been able to make this work any way other than subscribe/consume blocks that make specific use of EM::defer. And still, it doesn't seem to matter because the Heartbeat frames aren't processed OOB.

@michaelklishin
Copy link
Member

"the main EM::run loop invocations themselves run on the same thread that PeriodicTimers are invoked from" — too bad. While running multiple event loops in a process is technically possible, doing so brings us back to shared state: we have to share connection object we send heartbeats for (yes, technically there is no data to share if we only send frames but remember, we detect TCP connection loss in Client, that alone uses at least two @Ivars that are not currently synchronized, and also affects connection object by resetting it when TCP connection fails).

All this is not an option for Ruby (which lacks java.util.concurrent safety net) and Ruby community (large part of which lacks experience with shared state concurrency). Especially combined with widespread belief that "if I run event loop, there can never be any threading hazards".

Because this issue affects small % of library users and event loop blocking is a well-known downside of evented libraries, I think the best we can do is to document event blocking issue (create a separate guide on this subject). EventMachine 1.0 is going to feature EventMachine::Iterator which makes it easy to do the following common processing concurrent:

list.each { |item| exchange.send(worker.handle(item)) }

Basically, "no solution at the library level" sounds like a better idea to me than hiding problems under the rug by altering @last_server_heartbeat on every outgoing frame. Although I can change my mind if topic starter can explain why it is so difficult to avoid severe chronical event loop blocking (in his particular case, that is).

@michaelklishin
Copy link
Member

My remarks about shared state between two event loops is for 0.8.x. 0.7.x is different but I am sure something somewhere will be mutated indirectly from both event loop threads and voilà, obscure issues in production, this time without all the Java debugging and threads inspection tools ;)

@botanicus
Copy link
Contributor

Jordan, thanks a lot for your input, I learnt a lot. After I thought it through, I have to agree with Michael that we unfortunately can't do much about it at this point, unless someone comes up with some brilliant idea.

@jpr5
Copy link
Contributor

jpr5 commented Apr 19, 2011

I agree with Michael's sentiments, and like the suggestion about creating a separate guide on this subject.

Regarding EM and the general perception that it is a magical panacea for all things: clearly part of the "problem" stems from user ignorance and false assumptions about how EM works (even we made that mistake at some point on this particular issue). From my own experience with the AMQP library, I know that I was initially heavily influenced by the bundled examples and the few but well-referenced blog posts that demonstrate how quick and easy it is to get up and running with Ruby AMQP. If docs + examples espoused the EM::defer technique more frequently, I'm sure more folks would embrace it.

On the more specific topic of Heartbeats, doc-wise I think the key point that needs to be made is: They are only really useful as network keepalives (i.e. through firewalls/NATs), with the caveat that any subscribe block that hasn't ACKed will (effectively) prevent the Heartbeat from being seen by the server. IIRC most of the +1's for the feature were having firewall/keepalive-related problems on Amazon, and network traffic is network traffic, even if the server still ends up dropping the connection. Limited value, but not zero.

Do either of you know anyone working on/at RabbitMQ? Would be useful to get their perspective on the Heartbeat issue.

@sadowski
Copy link
Author

Sorry for the delay, and thank you guys for all your help and effort!

I am using RabbitMQ 1.7.2 locally and RabbitMQ 1.7.0 in production with ruby-amqp 0.7.1 on both.

I took your suggestion and ran the monkey patch to see what there was to see. It looks like after the reconnect an old (pre-disconnect) message is trying to ack, and (as I understand it) the server releases all the sent but not ack'ed messages after disconnect. I suppose if it client were trying to ack a message that were no longer assigned to it then I would see this type of error? Here is a gist of the log: https://gist.github.com/928919

I see now that setting the last_server_heartbeat when the heartbeat frame is sent is a bad idea and just hiding another problem, so I'll withdraw my request for that particular code change.

@botanicus
Copy link
Contributor

Jordan I'm on contract for VMware / RabbitMQ team, I'll ask ...

sadowski: RabbitMQ 1.7 is a really old version. The current version is 2.4.1, I'd recommend you to upgrade. Definitely if you want to use AMQP gem 0.8, then you'll have to, because RabbitMQ supports AMQP 0.9.1 since version 2, whereas RabbitMQ 1.7 supports only AMQP 0.8 (which is OK for AMQP gem 0.7 as it doesn't support 0.9.1 either, but AMQP gem 0.8 is AMQP 0.9.1 only).

@michaelklishin
Copy link
Member

@jpr5, just post your questions on rabbitmq-discuss, RabbitMQ developers are there and usually respond quickly.

@michaelklishin
Copy link
Member

@sadowski: note that upgrading RabbitMQ doesn't mean you have to give up amqp gem version you are running. RabbitMQ supports AMQP 0.8.0 as well as 0.9.1.

@botanicus
Copy link
Contributor

@jpr5 oh yeah, that'd be better, I'm not sure what exactly to ask about.

@jpr5
Copy link
Contributor

jpr5 commented Apr 20, 2011

Well, time-wise, testing with the new library is a higher priority for me, but FWIW:

The issue they should be asked about is whether or not the server is supposed to respond to different channels in parallel; e.g. shouldn't Heartbeats on Channel 0 be acknowledged by the server mid-Deliver/Ack on Channel 1?

michaelklishin added a commit that referenced this issue Jul 18, 2011
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants