New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
write starvation fixes #545
Conversation
By default (in base_connection) Pika buffers all writes and only attempts to send the next time it drops into the ioloop and detects the socket as writable. This causes some odd behaviour in a number of cases. 1. A process generating large numbers of messages will not actually send them until it finishes the processing and drops into the ioloop. 2. A process that is consuming a large queue will only send messages when it's read buffer is empty. If the messages are small this means it may end up consuming 000s of messages for every one it manages to publish. This behaviour stalls pipelines of processes. This patch tries to send the data on the socket as soon as it's generated and to avoid timeouts that might be generated by the use of sendall() it uses send() and handles partial writes by requeing the data.
Move the _handle_write changes from tornado_connection to base_connection. They're equally applicable to all connections and this fixes a bug in master where BlockingConnections wouldn't send messages until close was called.
Move the _handle_write changes from tornado_connection to base_connection. They're equally applicable to all connections and this fixes a bug in master where BlockingConnections wouldn't send messages until close was called.
bytes_written += bw | ||
|
||
if frame: | ||
LOGGER.warning("Partial write, requeing remaining data") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wjps, a partial write in this scenario is not abnormal at all, so let's not log it as a warning. debug level would be more appropriate, if it's necessary to log it at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, fair enough.
If I am not mistaken, the proposed change in |
except socket.timeout: | ||
# Will only come here if the socket is blocking | ||
LOGGER.warning("socket timeout, requeuing frame") | ||
self.outbound_buffer.appendleft(frame) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, socket.timeout
was re-raised in both _handle_read()
and _handle_write()
. This PR makes them inconsistent: re-raised in _handle_read()
, but suppressed in _handle_write()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If socket.timeout
exception is going to be suppressed in _handle_write()
, it necessitates clean-up in BlockingConnection._flush_outbound()
, which still expects to handle socket.timeout
from BaseConnection._handle_write()
, and BlockingConnection._flush_outbound
's handling of socket.timeout
becomes dead code with this change.
Then there are also BlockingConnection._handle_timeout()
and BlockingConnection._socket_timeouts
logic bits that need to be considered. I think that some of that logic becomes inconsistent as the result of this PR.
Finally, this changes the semantics of pika.connection.Properties.socket_timeout
; at least as far as BlockingConnection is concerned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I like the proposed change in _handle_write()
, but I also wonder what was pika author's intention behind pika.connection.Properties.socket_timeout
and the corresponding handling and re-raising of the socket.timeout
exceptions in the original read and write paths. Was pika's author perhaps concerned about stalled connections, relying on socket.timeout
exceptions to detect stalls and abort stalled connections? @gmr, would you mind chiming in on this? Thx!
Yup, it will fix #538 and various other intermittent failures that people who publish large volumes of messages will see. The existing code was basically giving up on a socket.timeout on write when this is actually very possible if you're generating messages faster than the network can send. |
I'll take a look at this ASAP. |
remove unused import, dodgy error handling
1 similar comment
1 similar comment
There are quite a few changes in this PR. Please rebase down to a single commit, and reopen in a new PR referencing this one. |
Hi, this contains some changes to the way writes are sent in pika that addresses some issues we were seeing when using the tornado adapter. There's more detail in the individual commit messages but the basic premise is that we should get the data out on the wire as quickly as possible. Unfortunately the use sendall() and timeouts isn't really suited to that way of working so the socket has been changed to non-blocking (after the connect sequence) and we simply handle the write errors when the socket buffer is full.
The changes seem to be generally applicable to other connection adapters and in fact there seems to be a bug with BlockingConnections on master that is fixed by applying these changes.
I've load tested the various adapters and In general I see much more consistent and reliable behaviour now (no random timeouts, pipeline stalls etc).
Cheers,
Will