Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

UNEXPECTED_FRAME for BlockingConnection #344

Open
fat-crocodile opened this Issue · 7 comments

3 participants

@fat-crocodile

Hi!

BlockingConnection._send_frame looks like this:

def _send_frame(self, frame_value):
....super(BlockingConnection, self)._send_frame(frame_value)
....self._frames_written_without_read += 1
....if self._frames_written_without_read == self.WRITE_TO_READ_RATIO:
........self._frames_written_without_read = 0
........self.process_data_events()

self._frames_written_without_read may become equal with self.WRITE_TO_READ_RATIO in the middle of the message A.
if self.process_data_events() got some frames, it will call handlers.
If handler will cause some messages sending (publish or ack), they would be sended in the middle of message A.

and RabbitMQ server would return UNEXPECTED_FRAME

And It strange, but I got it not only on big messages, that contain more then 10 frames. I don't know, why :(

Ubuntu 12.4, pika 9.13.

Are there any reasons, to keep lines

....if self._frames_written_without_read == self.WRITE_TO_READ_RATIO:
........self._frames_written_without_read = 0
........self.process_data_events()

in _send_frame? May be move them in BlockingConnection._send_method, like this:

def _send_method(self, channel_number, method_frame, content=None):
....super(BlockingConnection, self)._send_method(channel_number, method_frame, content)
....if self._frames_written_without_read >= self.WRITE_TO_READ_RATIO:
........self._frames_written_without_read = 0
........self.process_data_events()

@fat-crocodile

Workaround, that works:

class MyConnection(BlockingConnection):
....def init(self, args, *nargs):
........BlockingConnection.init(self, args, *nargs)

....def _send_method(self, channel_number, method_frame, content=None):
........BaseConnection._send_method(self, channel_number, method_frame, content)
........if self._frames_written_without_read >= self.WRITE_TO_READ_RATIO:
............self._frames_written_without_read = 0
............self.process_data_events()

....def _send_frame(self, frame_value):
........BaseConnection._send_frame(self, frame_value)
........self._frames_written_without_read += 1

@gmr
Owner

Do you mind testing against the current HEAD in master to see if the problem goes away? A problem was identified that a frame was not fully sending as expected.

@fat-crocodile

Do you mean abf9fa8 ?
No, for my problem this patch does not work.

And, in any case, server says that it have got unexpected frame, not bad frame.

My tests:

======= Send: ==========

#!/usr/bin/env python
import pika

input_exchange_name = "test.inputexchange.123456"
input_queue_name = "test.inputqueue.123456"

connection = pika.BlockingConnection()
c = connection.channel()

c.exchange_declare(exchange=input_exchange_name, type='fanout')
c.queue_declare(queue = input_queue_name)
c.queue_bind(exchange=input_exchange_name, queue=input_queue_name, routing_key='')

for x in range(1000):
....c.basic_publish(exchange=input_exchange_name, routing_key='', body="%d" % x)

======= Receive: ============

#!/usr/bin/env python
from pika import BlockingConnection, BaseConnection
import logging

logger = logging.getLogger(name)
logging.getLogger('pika').addHandler(logging.StreamHandler())

output_exchange_name = "test.outputexchange.123456"
output_queue_name = "test.outputqueue.123456"
input_queue_name = "test.inputqueue.123456"

connection = BlockingConnection()
c = connection.channel()

c.exchange_declare(exchange=output_exchange_name, type='fanout')
c.queue_declare(queue = output_queue_name)
c.queue_bind(exchange=output_exchange_name, queue=output_queue_name, routing_key='')

def callback(ch, method, properties, body):
....c.basic_publish(exchange=output_exchange_name, routing_key='', body='xxx')
....c.basic_ack(method.delivery_tag)

c.basic_consume(callback, queue=input_queue_name)
c.start_consuming()

I run "send", wait until it finished, and then run "recv".

@fat-crocodile

A little bit easier: bug reproduce without acking. My server -- RabbitMQ 3.1.1

From AMPQ 0.9.1 specification (section 4.2.6, page 35):

Content frames on a specific channel are strictly sequential. That is, they may be mixed with frames for
other channels, but no two content frames from the same channel may be mixed or overlapped, nor may
content frames for a single content be mixed with method frames on the same channel.

So...

@gmr
Owner

I am aware of the AMQP spec. I'll look into why you're running into this issue shortly. There is a reason for the READ to WRITE ratio and the logic there trying to force reads for apps that write without reading from the stack.

@fat-crocodile

Previous workaround cause problems with too deep recursion. My next try, seems to work

https://github.com/fat-crocodile/heap/blob/master/sync_connection.py

May it is interesting for you. Or may be you will find some bugs, that I've missed.

@samirbaaz

I'm facing the same issue, has there been any progress on this?

I meet the same condition in the _send_frame function where self._frames_written_without_read ends up equaling the WRITE_TO_READ_RATIO in the middle of a message. Pika then throws a socket error and closes the connection and the message never gets sent. My current workaround is to just set the WRITE_TO_READ_RATIO to 2000 instead of 1000.

The reason we're hitting this limit is because one consuming channel generates messages to send to another queue. Depending on the message, it can create a pretty high volume of messages to send to the other queue.

What i've now done is moved the check against the ratio into the basic_publish function and given appropriate access to the variables that are needed. I know it's been a while since you've looked at this but do you have any comments/ideas about this?

def _send_frame(self, frame_value):
....super(BlockingConnection, self)._send_frame(frame_value)
....self._frames_written_without_read += 1
....if self._frames_written_without_read == self.WRITE_TO_READ_RATIO:
........self._frames_written_without_read = 0
........self.process_data_events()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.