Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request marshaling error should not corrupt a channel #991

Merged
merged 6 commits into from Apr 27, 2018
Merged

Request marshaling error should not corrupt a channel #991

merged 6 commits into from Apr 27, 2018

Conversation

lukebakken
Copy link
Member

@lukebakken lukebakken commented Mar 7, 2018

Fixes #990 and #912

@lukebakken lukebakken added this to the 1.0.0 milestone Mar 7, 2018
@vitaly-krugl
Copy link
Member

@lukebakken, I haven't forgotten - will catch up in the next couple of days.

@lukebakken
Copy link
Member Author

@vitaly-krugl no hurry at all! Thanks again.

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something doesn't seem quite right to me with the fix. It might be treating one symptom with a work-around, but not the entire problem. I would expect RabbitMQ to react to the bad queue name in ch.queue_declare(queue=[1, 2, 3]) by sending Channel.Close to the client, in which case the channel should have been removed from Connection and BlockingConnection, and the final connection.close() call should not be sending Channel.Close on that channel at all during connection closing.

Does ch.queue_declare(queue=[1, 2, 3]) actually get sent to the broker, and does it elicit a Channel.Close response from the broker in this case?

@lukebakken
Copy link
Member Author

lukebakken commented Mar 13, 2018

@vitaly-krugl the queue.declare method never makes it to RabbitMQ. An AssertionError is thrown here which gums up the works when the with clause tries to exit.

@lukebakken
Copy link
Member Author

I'll try that out.

@vitaly-krugl
Copy link
Member

Don't try it out just yet, I missed something there (and the fix in the PR did, too, I think)

@vitaly-krugl
Copy link
Member

vitaly-krugl commented Mar 13, 2018

@lukebakken:

  1. The if acceptable_replies: block sets self._blocking and also registers a number of callbacks. If self._send_method(method) fails while encoding frames (as in this case), we really don't want any of those registered callbacks to remain registered either.

    Since a successful self._send_method(method) call will ultimately just enqueue some data on the output write buffer, it should be possible to move the if acceptable_replies: block after self._send_method(method). This way, if any of the marshaling in self._send_method(method) fails, then nothing needs to be undone.

  2. Furthermore, to ensure that an incomplete message doesn't get placed in the output buffer (due to marshaling failure of one of its subframes), Connection._send_message() needs to be modified to pre-marshal all of its parts and then append them to the outbound buffer deque only after all marshaling is done, updating the stats and finally calling self._flush_outbound() and self._detect_backpressure() like Connection._send_frame(). To this end, Connection._send_message() and Connection._send_frame() should share a method (e.g., Connection._output_marshaled_frame() that updates self.bytes_sent and self.frames_sent and appends the marshaled frame data to self.outbound_buffer .

And code comments should provide some insight into each of these, so that a future contribution won't refactor or reorder things in a way that's incompatible with the solution.

@lukebakken
Copy link
Member Author

@vitaly-krugl - I have merged in the tests you provided and this is ready for another review. Thanks!

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lukebakken, please see my feedback.

pika/channel.py Outdated
@@ -1327,9 +1327,10 @@ def _on_synchronous_complete(self, _method_frame_unused):
while self._blocked and self._blocking is None:
self._rpc(*self._blocked.popleft())

def _drain_blocked_methods_on_remote_close(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in this method (including its name) is not necessary for fixing the issue and breaks normal functionality, please revert the change in this method entirely including the name. Here is why: the user was and should be able to make a blocking legitimate request or a series of them followed by Channel.Close and not have those legitimate requests preempted. Preempting them would be overly-opinionated :). If we think we need an "emergency channel-close" method that purges queued-up requests (I don't think we do), then we should either have a different appropriately-named method or an optional arg that makes that explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think we need an "emergency channel-close" method that purges queued-up requests (I don't think we do

Draining blocked methods on a broker-initiated close was introduced in #957 - please check that PR out again. I still think it's necessary.

pika/channel.py Outdated
@@ -1397,6 +1398,13 @@ def _rpc(self, method, callback=None, acceptable_replies=None):
if self.is_closed:
self._raise_if_not_open()

if self.is_closing and self._blocking:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want/need this if self.is_closing and self._blocking code block. It prevents the user from being able to queue up several blocking requests followed by channel.close() and having those requests complete normally.

pika/channel.py Outdated
# thrown
try:
self._send_method(method)
except Exception as err:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the except Exception as err: block adds any value. The user should be able to glean the same information from the traceback produced by the failed self._send_method().

pika/channel.py Outdated
except Exception as err:
if self._blocking:
LOGGER.error(
"send_method failed for blocking method %s with %s, will discard",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably doesn't matter given my preceding comment, but if self._blocking was True, we wouldn't have made it this far.

pika/channel.py Outdated
@@ -1405,6 +1413,19 @@ def _rpc(self, method, callback=None, acceptable_replies=None):
self._blocked.append([method, callback, acceptable_replies])
return

# Note: _send_method can throw exceptions if there are framing errors
# or invalid data passed in. Call it here, before the acceptable_replies
# block to prevent callbacks from being registered if an exception is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and even more importantly prevent self._blocking from being erroneously set too (which was causing the deadlock).

frame_header = frame.Header(channel_number, length, content[0])
frame_method_marshaled = frame_method.marshal()
frame_header_marshaled = frame_header.marshal()
self._output_marshaled_frame(frame_method_marshaled)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sequence of calls (as well as the one below) still permits a partial request to be placed on the output buffer, thus corrupting the output stream. What needs to happen is all the marshaled frames need to be appended to a list. Only when the entire message has been successfully marshaled, we go ahead and place the marshaled frames on the output buffer.

Also, I find that the single-use intermediate variables as used here detract from code readability in Python. They have their place when the value being computed would make an all-in-one statement difficult to read, but I don't think it's the case here. Same in _send_frame(). Here is an example of what I have in mind:

marshaled_frames = []
marshaled_frames.append(
    frame.Method(channel_number, method_frame).marshal())
marshaled_frames.append(
    frame.Header(channel_number, length, content[0]).marshal())

# And below...
if content[1]:
    # ...
    for chunk in xrange(0, chunks):
        # ...
        marshaled_frames.append(
            frame.Body(channel_number, content[1][start:end]).marshal())

self._output_marshaled_frames(marshaled_frames)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, as you can see from my code snippet above, I now think it's better to have a _output_marshaled_frames() (note plural) method instead of _output_marshaled_frame() in order to avoid _detect_backpressure() calls on per-frame basis. _send_frame() can just wrap its single frame in a list when calling _output_marshaled_frames().

@@ -2266,8 +2261,15 @@ def _send_message(self, channel_number, method_frame, content):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incidentally, the method_frame parameter to _send_message() is misnamed in this legacy code. What's passed in should be called method. The method_frame is created inside the function when it constructs frame.Method.

pika/channel.py Outdated
# Note: This is normally called when a synchronous command is
# completed. It will undo the blocking state and send all the
# frames that stacked up while we were in the blocking state.
self._on_synchronous_complete(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukebakken, we really don't want this. If there is a blocking command, then Chanel.Close will get appended to self._blocked just like any other blocked command. When the command(s) that blocked complete, _on_synchronous_complete that is separately registered as callback in _rpc() will allow the Channel.Close to do its work. This self._on_synchronous_complete(None) call here breaks the synchronous RPC workflow by allowing a new RPC method to be sent before the previous one completes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem we're trying to fix is "Request marshaling error should not corrupt a channel". The only reason the channel becomes corrupted is the marshaling issue, which the rest of this PR addresses.

@@ -2266,8 +2261,14 @@ def _send_message(self, channel_number, method_frame, content):

"""
length = len(content[1])
self._send_frame(frame.Method(channel_number, method_frame))
self._send_frame(frame.Header(channel_number, length, content[0]))
marshaled_body_frames = collections.deque()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A specialized deque class isn't needed here. A simple list will do. The marshaled_body_frames.popleft() logic in the loop at the bottom adds no value and is less efficient than a simple loop iteration. Keep in mind that when the list goes out of scope, python will automatically deref the list and all its items inside C code versus popping here one at a time.

marshaled_body_frames.append(frame_body.marshal())

while marshaled_body_frames:
self._output_marshaled_frame(marshaled_body_frames.popleft())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my previous comment, the popping is unnecessary and is adds inefficiencies. The following is preferred:

for chunk in marshaled_body_frames:
    self._output_marshaled_frame(chunk)

However, it's better to do this instead in order to avoid unnecessary per-chunk overhead inside self._output_marshaled_frame(), namely self._flush_outbound() and self._detect_backpressure():

self._output_marshaled_frames(marshaled_body_frames)

_output_marshaled_frames() should append all the chunks to self.outbound_buffer while updating self.bytes_sent and self.frames_sent (again no need to pop, see the for chunk in marshaled_body_frames: example above), and then perform self._flush_outbound() and if self.params.backpressure_detection: self._detect_backpressure() just once outside the loop at the end of the _output_marshaled_frames() method.

@vitaly-krugl vitaly-krugl changed the title Blocked method should not prevent closing a channel Request marshaling error should not corrupt a channel Apr 10, 2018
@vitaly-krugl
Copy link
Member

@lukebakken, I renamed this PR "Request marshaling error should not corrupt a channel", which reflects issues #990 and #912 more accurately.

@vitaly-krugl
Copy link
Member

vitaly-krugl commented Apr 10, 2018 via email

@vitaly-krugl
Copy link
Member

@lukebakken, is this PR ready for re-review?

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a regression test in blocking connection acceptance tests from issue #990, making sure that this raises the expected exception:

with pika.BlockingConnection() as connection:
    channel = connection.channel()
    channel.queue_declare(queue=[1, 2, 3])

@lukebakken
Copy link
Member Author

@vitaly-krugl if I re-select your name in the "Reviewers" dropdown, the status icon changes back to an orange disk ... do you not get a new email saying I re-requested a review? I assumed that you did. If you don't get an email, I can @-mention you in a comment. Thanks for the re-re-reviews 😄

@vitaly-krugl
Copy link
Member

vitaly-krugl commented Apr 16, 2018 via email

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukebakken, could you please add a regression test in blocking connection acceptance tests from issue #990, making sure that this raises the expected exception from the bad channel.queue_declare()?

with pika.BlockingConnection() as connection:
    channel = connection.channel()
    channel.queue_declare(queue=[1, 2, 3])

@lukebakken
Copy link
Member Author

@vitaly-krugl thanks! Sorry I missed the previous comment about that test.

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my feedback

@@ -271,6 +270,16 @@ def test(self):
self.assertFalse(ch._impl._consumers)


class TestUsingInvalidQueueArgument(BlockingTestCaseBase):
def test(self):
"""BlockingConnection raises expected connection when invalid queue parameter is used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"expected connection" ==> "expected exception"

"""
connection = self._connect()
ch = connection.channel()
with self.assertRaises(AssertionError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprising that the underlying implementation raises AssertionError instead of TypeError/ValueError. But no need to fix that.

self._send_frame(frame.Header(channel_number, length, content[0]))
marshaled_body_frames = []

# Note: we construct the Method and Header objects, marshal them
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Method, Header, and Content objects, ...

…ds differently on client-requested close

Add test for passing an invalid parameter as the queue name
@lukebakken
Copy link
Member Author

@vitaly-krugl I made your latest suggested changes and rebased against master. I'm waiting on the builds at the moment. Thanks.

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - thank you!

@lukebakken
Copy link
Member Author

Thanks @vitaly-krugl

@lukebakken lukebakken merged commit 7f5a73c into pika:master Apr 27, 2018
@lukebakken lukebakken deleted the pika-990 branch April 27, 2018 19:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants