intermittent error in TestBasic.test_close #34

Closed
schmir opened this Issue Dec 18, 2012 · 15 comments

Projects

None yet

3 participants

@schmir

I see intermittent errors in test_basic. I've seen this in test_close and test_basic_qos, when the code is waiting on the client.close() promise:

=============================================================== FAILURES ================================================================
_________________________________________________________ TestBasic.test_close __________________________________________________________

self = <test_basic.TestBasic testMethod=test_close>

    def test_close(self):
    client = puka.Client(self.amqp_url)
    promise = client.connect()
    client.wait(promise)

    promise = client.queue_declare(queue=self.name)
    client.wait(promise)

    promise = client.basic_publish(exchange='', routing_key=self.name,
                       body=self.msg)
    client.wait(promise)

    consume_promise = client.basic_consume(queue=self.name)
    msg_result = client.wait(consume_promise)

    promise = client.queue_delete(self.name)
    client.wait(promise)

    promise = client.close()
>       client.wait(promise)

client     = <puka.client.Client object at 0x2eb0fd0>
consume_promise = 5
msg_result = {'body': '0.479241924429', 'exchange': '', 'consumer_tag': 'amq.ctag-wjqFqhJU4...'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1}
promise    = 1
self       = <test_basic.TestBasic testMethod=test_close>

tests/test_basic.py:428:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.client.Client object at 0x2eb0fd0>, promise_numbers = set([1]), timeout = None, raise_errors = True

    def wait(self, promise_numbers, timeout=None, raise_errors=True):
    '''
        Wait for selected promises. Exit after promise runs a callback.
        '''
    if timeout is not None:
        t1 = time.time() + timeout
    else:
        td = None

    if isinstance(promise_numbers, int):
        promise_numbers = [promise_numbers]
    promise_numbers = set(promise_numbers)

    # Try flushing the write buffer before entering the loop, we
    # may as well return soon, and the user has no way to figure
    # out if the write buffer was flushed or not - (ie: did the
    # wait run select() or not)
    #
    # This is problem is especially painful with regard to
    # async messages, like basic_ack. See #3.
    #
    # Additionally, during the first round trip on windows - when
    # the connection is being established, the socket may not yet
    # be in the connected state - swallow an error in that case.
    try:
        self.on_write()
    except socket.error, e:
        if e.errno != errno.ENOTCONN:
        raise

    while True:
        while True:
        ready = promise_numbers & self.promises.ready
        if not ready:
            break
        promise_number = ready.pop()
        return self.promises.run_callback(promise_number,
                          raise_errors=raise_errors)

        if timeout is not None:
        t0 = time.time()
        td = t1 - t0
        if td < 0:
            break

        r, w, e = select.select([self],
                    [self] if self.needs_write() else [],
                    [self],
                    td)
        if r or e:
>               self.on_read()

e          = []
promise_numbers = set([1])
r          = [<puka.client.Client object at 0x2eb0fd0>]
raise_errors = True
ready      = set([])
self       = <puka.client.Client object at 0x2eb0fd0>
td         = None
timeout    = None
w          = []

puka/connection.py:256:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.client.Client object at 0x2eb0fd0>

    def on_read(self):
    try:
        r = self.sd.recv(131072)
    except socket.error, e:
        if e.errno == errno.EAGAIN:
        return
        else:
        raise

    if len(r) == 0:
        # a = self.sd.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        self._shutdown(exceptions.mark_frame(spec.Frame(),
                         exceptions.ConnectionBroken()))

    self.recv_buf.write(r)

    if len(self.recv_buf) >= self.recv_need:
        data = self.recv_buf.read()
        offset = 0
        while len(data) - offset >= self.recv_need:
        offset, self.recv_need = \
>                   self._handle_read(data, offset)

data       = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
offset     = 12
r          = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
self       = <puka.client.Client object at 0x2eb0fd0>

puka/connection.py:112:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.client.Client object at 0x2eb0fd0>
data = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
start_offset = 12

    def _handle_frame_read(self, data, start_offset):
    offset = start_offset
    if len(data)-start_offset < 8:
        return start_offset, 8
    frame_type, channel, payload_size = \
        struct.unpack_from('!BHI', data, offset)
    offset += 7
    if len(data)-start_offset < 8+payload_size:
        return start_offset, 8+payload_size
    assert data[offset+payload_size] == '\xCE'

    if frame_type == 0x01: # Method frame
        method_id, = struct.unpack_from('!I', data, offset)
        offset += 4
        frame, offset = spec.METHODS[method_id](data, offset)
>           self.channels.channels[channel].inbound_method(frame)

channel    = 2
data       = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
frame      = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
frame_type = 1
method_id  = 3932191
offset     = 55
payload_size = 36
self       = <puka.client.Client object at 0x2eb0fd0>
start_offset = 12

puka/connection.py:141:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.channel.Channel object at 0x2eb0490>, frame = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}

    def inbound_method(self, frame):
    if frame.has_content:
        self.method_frame = frame
    else:
>           self._handle_inbound(frame)

frame      = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
self       = <puka.channel.Channel object at 0x2eb0490>

puka/channel.py:76:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.channel.Channel object at 0x2eb0490>, result = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}

    def _handle_inbound(self, result):
>       self.promise.recv_method(result)

result     = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
self       = <puka.channel.Channel object at 0x2eb0490>

puka/channel.py:102:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.promise.Promise object at 0x2eb0810>, result = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}

    def recv_method(self, result):
    # log.debug('#%i recv_method %r', self.number, result)
    # In this order, to allow callback to re-register to the same method.
>       callback = self.methods[result.method_id]
E       KeyError: 3932191

result     = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
self       = <puka.promise.Promise object at 0x2eb0810>

puka/promise.py:87: KeyError
================================================== 1 failed, 60 passed in 6.06 seconds ==================================================
@majek
Owner

Any idea how can I reliably reproduce that? (or how to fix it / where the error lies)

@schmir

I don't know how to reliably produce that. It feels like it fails 1 out of 30 times. The above traceback contains the values of local variables. Doesn't that give you a hint? (I know basically nothing about the wire protocol, so I don't even have a good guess what the problem may be)

@schmir

I've added some print statements.

diff --git a/puka/promise.py b/puka/promise.py
index 95d3d55..9e4b4e3 100644
--- a/puka/promise.py
+++ b/puka/promise.py
@@ -82,6 +82,7 @@ class Promise(object):
     self.done(result)

     def recv_method(self, result):
+        print "RECV:", self.number, type(result), result
     # log.debug('#%i recv_method %r', self.number, result)
     # In this order, to allow callback to re-register to the same method.
     callback = self.methods[result.method_id]
diff --git a/tests/test_basic.py b/tests/test_basic.py
index 1ed5fdd..d4bbb07 100644
--- a/tests/test_basic.py
+++ b/tests/test_basic.py
@@ -419,11 +419,16 @@ class TestBasic(base.TestCase):
     client.wait(promise)

     consume_promise = client.basic_consume(queue=self.name)
+        print "CONSUME_PROMISE:", consume_promise
     msg_result = client.wait(consume_promise)

     promise = client.queue_delete(self.name)
+        # print "PROMISE:", promise
     client.wait(promise)

+        # client.wait(consume_promise)
+
+        print "QUEUE DELETED"
     promise = client.close()
     client.wait(promise)

With a successful run of test_basic I get the following output:

RECV: 1 <class 'puka.spec.FrameConnectionStart'> {'server_properties': {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2012 VMware, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'consumer_cancel_notify': True, 'publisher_confirms': True, 'basic.nack': True}, 'platform': 'Erlang/OTP', 'version': '2.8.6'}, 'version_minor': 9, 'mechanisms': 'PLAIN AMQPLAIN', 'locales': 'en_US', 'version_major': 0}
RECV: 1 <class 'puka.spec.FrameConnectionTune'> {'frame_max': 131072, 'channel_max': 0, 'heartbeat': 0}
RECV: 1 <class 'puka.spec.FrameConnectionOpenOk'> {'known_hosts': ''}
RECV: 2 <class 'puka.spec.FrameChannelOpenOk'> {'channel_id': ''}
RECV: 3 <class 'puka.spec.FrameChannelOpenOk'> {'channel_id': ''}
RECV: 2 <class 'puka.spec.FrameConfirmSelectOk'> {}
RECV: 3 <class 'puka.spec.FrameQueueDeclareOk'> {'queue': 'test0.297503201551', 'message_count': 0, 'consumer_count': 0}
RECV: 2 <class 'puka.spec.FrameBasicAck'> {'multiple': False, 'delivery_tag': 1}
CONSUME_PROMISE: 5
RECV: 5 <class 'puka.spec.FrameBasicQosOk'> {}
RECV: 5 <class 'puka.spec.FrameBasicConsumeOk'> {'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5'}
RECV: 5 <class 'puka.spec.FrameBasicDeliver'> {'body': '0.22912621845', 'exchange': '', 'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5', 'routing_key': 'test0.297503201551', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1}
RECV: 6 <class 'puka.spec.FrameChannelOpenOk'> {'channel_id': ''}
RECV: 6 <class 'puka.spec.FrameQueueDeleteOk'> {'message_count': 0}
RECV: 5 <class 'puka.spec.FrameBasicCancel'> {'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5', 'nowait': True}
QUEUE DELETED
RECV: 5 <class 'puka.spec.FrameBasicCancelOk'> {'consumer_tag': 'amq.ctag-gwAvLTEtzSr6sn878VmNB5'}
RECV: 1 <class 'puka.spec.FrameConnectionCloseOk'> {}

On errors I get

...
QUEUE DELETED
RECV: 1 <class 'puka.spec.FrameConnectionCloseOk'> {}
RECV: 5 <class 'puka.spec.FrameBasicCancelOk'> {'consumer_tag': 'amq.ctag-wgB5BfsORMGmQVZrhfsECx'}

i.e. the client receives the FrameBasicCancelOk (for the
consume_promise) after the FrameConnectionCloseOk.

Handling of FrameConnectionCloseOk involves calling the
Connection._shutdown method, which marks the consume_promise as done.

Later on it can't handle the FrameBasicCancelOk frame...

@majek
Owner

Interesting. So you're saying there is a race. Does it also happen when you add basic_cancel for the basic_consume promise?

I still can't reproduce this. Which RMQ do you use?

@majek
Owner

i.e. the client receives the FrameBasicCancelOk (for the
consume_promise) after the FrameConnectionCloseOk.

How is that even possible?

Do I get it right: on network layer RMQ first sends connection_close_ok frame and later sends basic_cancel_ok frame?

@schmir

RabbitMQ 2.8.6 on a 64 bit linux. the tests are running on the same machine and I it looks like I need to put a bit of load on the machine. I've got something like os.walk implemented via RabbitMQ, which I run at the same time. I run the following shell command:

while true; do py.test --pdb -s -k test_close tests/test_basic.py; done

My guess it's possible that FrameBasicCancelOk is received after FrameConnectionCloseOk because RabbitMQ is sending it in exactly that order (and then closes the connection). :)
Is that a bug in RabbitMQ?

If I basic_cancel the consume_promise before and wait for it all should be fine. since then rabbitmq doesn't have anything sensible to tell me. I wouldn't expect a problem...but if you insist on it, I can test it...

@majek
Owner

Yes, I would say that after "connection close ok" frame on tcp/ip layer nothing should be sent by RMQ. I'm not sure if that really happens. If you can prove that, it would be nice to ask rabbitmq-discuss mailign list if they consider it a bug.

Should we clear receive buffer on handling of connection_close_ok frame?

(or even better: set a flag not to ignore all future reads from the network)

@schmir

Setting a flag looks reasonable. Another option may be to throw an exception, and catch in the calling code...

I may have written a test for it, but I'm still waiting on your opinion on py.test.

Can you reproduce the issue with https://gist.github.com/4344132 ?
it's basically just a loop over the code in question. I can easily reproduce the error on a otherwise unloaded machine with two instances of this program running...

@majek
Owner

Thanks for the code, now I can reproduce the issue :)

Try this:

diff --git a/puka/connection.py b/puka/connection.py
index ae51c07..ec6ea61 100644
--- a/puka/connection.py
+++ b/puka/connection.py
@@ -107,7 +107,7 @@ class Connection(object):
         if len(self.recv_buf) >= self.recv_need:
             data = self.recv_buf.read()
             offset = 0
-            while len(data) - offset >= self.recv_need:
+            while len(data) - offset >= self.recv_need and self.sd:
                 offset, self.recv_need = \
                     self._handle_read(data, offset)
             self.recv_buf.consume(offset)
@simonmacmullen

If you're happy building RabbitMQ from source, you can try branch bug25360, which I believe fixes the behaviour you're seeing in the server.

@schmir

I tried it and I can't reproduce the issue when using that branch! Thanks!

@majek
Owner

@schmir unless you oppose I'll apply the patch mentioned in #34 (comment)

@schmir

@majek sure, go ahead. that also seems to work.

@schmir

probably better "self.sd is not None"

@majek majek added a commit that closed this issue Dec 20, 2012
@majek Fix #34 - check if previously handled frame didn't close the connection
that may happen in some rabbitmq versions - it was possible for rmq to answer to basic.consume after sending connection.close-ok. With that check we avoid crash in that situation.
ffdbdbc
@majek majek closed this in ffdbdbc Dec 20, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment