Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingConnection doesn't tell publisher about closed connection #457

Closed
jamutton opened this issue Apr 8, 2014 · 5 comments
Closed

BlockingConnection doesn't tell publisher about closed connection #457

jamutton opened this issue Apr 8, 2014 · 5 comments

Comments

@jamutton
Copy link
Contributor

jamutton commented Apr 8, 2014

OS X Mavricks + Python 2.7.5 (also seen on multiple linux variants)

Issue with blocking connection where a publisher is allowed to do a basic_publish and is not told that the message has been lost. To reproduce setup a simple rabbitmq "hello world" tutorial example. In the sender, call the basic_publish multiple times, requiring a keypress in-between each. Launch both the receiver and the sender and observe the first message going through. Now restart the rabbitmq server, this terminates the connection. The receiver immediately notices with a select.error, the sender does not. Now unblock the sender, it will be allowed to fire another message without knowing the message has failed. Unblock the sender a third time and it will finally throw a select.error when attempting to close the connection.

The problem this creates is that if you're trying to use a single connection you may have lost the message you were trying to send without knowing until later that it was lost. This complicates the state management of the client. IMHO, A better solution is to raise an exception in the blocking connection class so that the sender can detect the error and respond to it either by retrying the connection immediately or by passing the error up the stack.

Consider the following example, simple receiver:

#!/usr/bin/env python
import pika, ssl

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

if __name__ == "__main__":
    cparams = pika.ConnectionParameters(host = 'localhost')
    connection = pika.BlockingConnection(cparams)
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    print ' [*] Waiting for messages. To exit press CTRL+C'
    channel.basic_consume(callback, queue='hello', no_ack=True)
    channel.start_consuming()

And a sender that sends multiple messages on the same channel with pauses in between to give you a chance to restart the AMQP server:

#!/usr/bin/env python
import pika, ssl, sys, pdb

if __name__ == "__main__":
    cparams = pika.ConnectionParameters(host='localhost')
    connection = pika.BlockingConnection(cparams)
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
    print " [x] Sent 'Hello World!'"
    _ = raw_input("Stop or Restart RabbitMQ and press enter...")
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World AGAIN!')
    print " [x] Sent 'Hello World AGAIN!'"
    _ = raw_input("other things may happen here and at this point the client believes the message was sent.  Press enter...")
    connection.close()  # this will raise an error because the socket is already closed

Now launch both. At the first pause from the sender, restart the rabbitmq server. The receiver will immediately crash:

[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
Traceback (most recent call last):
File "rec.py", line 14, in
channel.start_consuming()
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 946, in start_consuming
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 240, in process_data_events
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 347, in _handle_read
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 43, in inner
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 89, in ready
select.error: (9, 'Bad file descriptor')

Then hit enter on the sender to allow it to advance. You'll notice that it gets to the next pause, effectively allowing he message to be lost without telling the sender. If you up the logging you'll see that pika logs that the connection is closed but otherwise hides the fact from the sender. Unblock the sender again and it will crash while trying to close the connection:

ttys008:bunny :) python send.py
[x] Sent 'Hello World!'
Stop or Restart RabbitMQ and press enter...
[x] Sent 'Hello World AGAIN!'
other things may happen here and at this point the client believes the message was sent. Press enter...
Traceback (most recent call last):
File "send.py", line 15, in
connection.close() # this will raise an error because the socket is already closed
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 218, in close
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 240, in process_data_events
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 347, in _handle_read
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 43, in inner
File "build/bdist.macosx-10.9-intel/egg/pika/adapters/blocking_connection.py", line 89, in ready
select.error: (9, 'Bad file descriptor')

@jamutton
Copy link
Contributor Author

jamutton commented Apr 8, 2014

Proposed solution is in PR(#450)

Add a log entry to base_connection. _handle_error to note this specific case

        elif error_code == errno.EPIPE:
            # Broken pipe, happens when connection reset
            LOGGER.error("Socket connection was broken")

and put in some blocking_connection specific handling because I'm not certain if it makes sense in the other connection types:

    def _check_state_on_disconnect(self):
        """Checks closing corner cases to see why we were disconnected and if we should
        raise exceptions for the anticipated exception types.
        """
        super(BlockingConnection, self)._check_state_on_disconnect()
        if self.is_open:
            # already logged a warning in the base class, now fire an exception
            raise exceptions.ConnectionClosed()

@eandersson
Copy link
Contributor

You can add channel.confirm_delivery(). This way pika will confirm that the message had been published before moving on.

@jamutton
Copy link
Contributor Author

jamutton commented Apr 8, 2014

channel.confirm_delivery() will force a select.error on publish by wrapping _send_method in _rpc and adding callbacks to trigger a read from the socket which indirectly fires the select.error. It works, but in a very round-about way and the approach feels backwards, hide the error by default unless asked to show just opens the door for implementation bugs.

Mostly though, I'd assert that it's not consistent with other behaviors in the module to swallow the error like it does now. If the intention is to swallow all errors then why allow the select.error to propagate when connection.close() is called or when the receiver is waiting.

The BlockingConnection class already follows an error path, resets its state and None's the socket, yet it tells the caller everything was fine. It continues to do so if you keep trying to publish even though the object knows nothing is going through. If the PR is applied, the caller can duplicate the current behavior of swallowing the error by putting basic_publish in a try/except which requires them to ack and pass the error. Much less likely to inadvertently drop/lose messages.

@vitaly-krugl
Copy link
Member

This should be fixed in master branch now. Please re-test against master.

@vitaly-krugl
Copy link
Member

Closing due to inactivity. This should be fixed in the current release 0.10.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants