Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write starvation fixes #545

Closed
wants to merge 9 commits into from
Closed
65 changes: 52 additions & 13 deletions pika/adapters/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self,
self.socket = None
self.stop_ioloop_on_close = stop_ioloop_on_close
self.write_buffer = None
self.buffer_hist = None
super(BaseConnection, self).__init__(parameters,
on_open_callback,
on_open_error_callback,
Expand Down Expand Up @@ -105,7 +106,7 @@ def remove_timeout(self, timeout_id):
self.ioloop.remove_timeout(timeout_id)

def _adapter_connect(self):
"""Connect to the RabbitMQ broker, returning True if connected
"""Connect to the RabbitMQ broker, returning True if connected.

:rtype: bool

Expand All @@ -123,6 +124,8 @@ def _adapter_connect(self):
for sock_addr in addresses:
error = self._create_and_connect_to_socket(sock_addr)
if not error:
# Make the socket non-blocking after the connect
self.socket.setblocking(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjps, what impact will this change have on the SSL connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole connect sequence including SSL handshake is still done with a blocking socket. I have a separate sequence of patches that makes the whole lot non-blocking (which I think is cleaner) but it requires quite a lot of fixing to Select and Asyncore Connections.

return None
# Failed to connect
return error
Expand Down Expand Up @@ -239,7 +242,10 @@ def _get_error_code(self, error_value):
return None

def _flush_outbound(self):
"""Call the state manager who will figure out that we need to write."""
"""write early, if the socket will take the data why not get it out
there asap.
"""
self._handle_write()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I now understand better what you are trying to accomplish with the placement of _handle_write() call here: try to write immediately, and if the socket buffer fills up leaving some frames unwritten, then have the ioloop trigger writing of the remaining frames when the socket becomes writable. If that's the case, we have to be careful to avoid a lot of unnecessary costly kernel calls. Let's say that an app is sending a large number of messages back-to-back and the socket buffer fills up; then, many subsequent calls to _flush_outbound() will result in many unnecessary, expensive kernel calls from unproductive _handle_write() attempts. These unnecessary kernel calls can be avoided by checking if the connection was already waiting to write before making the _handle_write() call here. I think this could be something like:

if not self.event_state & self.WRITE:
    self._handle_write()

self._manage_event_state()

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you could do, though I'd be surprised if that overhead would really be noticeable compared to everything else we're doing in python. Also if we haven't dropped back into the ioloop in a while the kernel may have cleared some space in the buffer in the meantime, and in cases where we're consuming a full queue of small messages we can go quite a long time before dropping into the ioloop (it's what lead me to take a look at this in the first place). So I'd prefer to keep it as is, might not be the absolutely most efficient but it will keep the output buffer full and avoid stalling the pipeline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjps, I agree that 100% non-blocking would be best. From my experience in developing 100% non-blocking I/O integration with OpenSSL a few years ago (in C), I recall that there were a number of gotchas. At the time, I captured the gotchas that I discovered in code comments in https://github.com/openwebos/libpalmsocket/blob/master/src/psl_channel_fsm_crypto.c.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vitaly-krugl, thanks. Though I think a lot of your gotchas will have to have been handled at the python wrapper layer, the interface exposed to python is reasonably straightforward. We'll see how we go.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When tornado ioloop calls BaseConnection._handle_events and READ is indicated, it calls BaseConnection._handle_read, which will attempt to read by default at most pika.spec.FRAME_MAX_SIZE = 131072 bytes from the socket and pass that to Connection._on_data_available. The pika overhead of processing this data should be pretty minimal, so processing time of incoming messages largely depends on processing by user code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup sure, I'm not casting any aspersions in pika's _handle_read overhead.

Perhaps my original comment was too flippant. I am actually extremely interested in the performance of pika, but more than that, I'm interested in getting behaviour that is reliable, consistent and what a user might reasonably expect. I think this PR gets us a step closer to that goal, AND it is actually significantly more efficient than the existing code.

Let's keep in mind that we're only talking about a situation here where the publisher is consistently generating messages faster than the network can send them, in this situation we would actually be pushing back using the backpressure detection, if it worked (I have a commit for that too). In such a situation, taken to extreme, we're talking about swapping and potentially memory exhaustion, IMO a small additional overhead (1.4us* on my laptop) is probably not the first thing to worry about.

Furthermore, scenarios such as the following continue to break down...

Imagine you have an application that is reading a large amount of data from a non rabbit source (say a large XML document retrieved over http, data from some sort of DB or even generated internally), as it processes that data it publishes a large number of messages. In this situation the code might well not drop into the ioloop until it's completed processing of all the data. On pika/master it would never attempt to send a single message until processing of the full document was complete whereas this PR would keep putting messages in the socket buffer whenever space was available. With your suggestion we would fill the socket buffer once and then never send another message until we finished processing the document.

My aim with this PR is to get the data on the wire ASAP to minimise latency and maximise opportunities for parallel-isation.

*1.4us is the overhead on my laptop of attempting a 1byte write on a blocked socket in python, including catching and handling the generated exception.
0.5us is the overhead of an equivalent write on an unblocked socket.
4.3us is the overhead of dropping into the tornado ioloop and getting triggered when the socket becomes writeable as the existing code does, individually for EVERY frame sent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjps, thanks for the explanation/analysis

self._manage_event_state()

def _handle_disconnect(self):
Expand Down Expand Up @@ -301,6 +307,12 @@ def _handle_error(self, error_value):
# Disconnect from our IOLoop and let Connection know what's up
self._handle_disconnect()

def _handle_timeout(self):
"""Handle a socket timeout in read or write.
We don't do anything in the non-blocking handlers because we
only have the socket in a blocking state during connect."""
pass

def _handle_events(self, fd, events, error=None, write_only=False):
"""Handle IO/Event loop events, processing them.

Expand Down Expand Up @@ -337,9 +349,19 @@ def _handle_read(self):
data = self.socket.read(self._buffer_size)
else:
data = self.socket.recv(self._buffer_size)

except socket.timeout:
raise
self._handle_timeout()
return 0

except ssl.SSLWantReadError:
# ssl wants more data but there is nothing currently
# available in the socket, wait for it to become readable.
return 0

except socket.error as error:
if error.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
return 0
return self._handle_error(error)

# Empty data, should disconnect
Expand All @@ -352,18 +374,35 @@ def _handle_read(self):
return len(data)

def _handle_write(self):
"""Handle any outbound buffer writes that need to take place."""
"""Try and write as much as we can, if we get blocked requeue
what's left"""
bytes_written = 0
if self.outbound_buffer:
frame = self.outbound_buffer.popleft()
try:
self.socket.sendall(frame)
bytes_written = len(frame)
except socket.timeout:
raise
except socket.error as error:
try:
while self.outbound_buffer:
frame = self.outbound_buffer.popleft()
bw = self.socket.send(frame)
bytes_written += bw
if bw < len(frame):
LOGGER.debug("Partial write, requeing remaining data")
self.outbound_buffer.appendleft(frame[bw:])
break

except socket.timeout:
# Will only come here if the socket is blocking
LOGGER.debug("socket timeout, requeuing frame")
self.outbound_buffer.appendleft(frame)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, socket.timeout was re-raised in both _handle_read() and _handle_write(). This PR makes them inconsistent: re-raised in _handle_read(), but suppressed in _handle_write().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If socket.timeout exception is going to be suppressed in _handle_write(), it necessitates clean-up in BlockingConnection._flush_outbound(), which still expects to handle socket.timeout from BaseConnection._handle_write(), and BlockingConnection._flush_outbound's handling of socket.timeout becomes dead code with this change.

Then there are also BlockingConnection._handle_timeout() and BlockingConnection._socket_timeouts logic bits that need to be considered. I think that some of that logic becomes inconsistent as the result of this PR.

Finally, this changes the semantics of pika.connection.Properties.socket_timeout; at least as far as BlockingConnection is concerned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I like the proposed change in _handle_write(), but I also wonder what was pika author's intention behind pika.connection.Properties.socket_timeout and the corresponding handling and re-raising of the socket.timeout exceptions in the original read and write paths. Was pika's author perhaps concerned about stalled connections, relying on socket.timeout exceptions to detect stalls and abort stalled connections? @gmr, would you mind chiming in on this? Thx!

self._handle_timeout()

except socket.error as error:
if error.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
LOGGER.debug("Would block, requeuing frame")
self.outbound_buffer.appendleft(frame)
else:
return self._handle_error(error)
return bytes_written

#LOGGER.debug("wrote %s bytes", bytes_written)
return bytes_written


def _init_connection_state(self):
"""Initialize or reset all of our internal state variables for a given
Expand Down
9 changes: 2 additions & 7 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,6 @@ def process_data_events(self):
self._socket_timeouts = 0
except AttributeError:
raise exceptions.ConnectionClosed()
except socket.timeout:
self._handle_timeout()
self._flush_outbound()
self.process_timeouts()

Expand Down Expand Up @@ -373,11 +371,8 @@ def _check_state_on_disconnect(self):
def _flush_outbound(self):
"""Flush the outbound socket buffer."""
if self.outbound_buffer:
try:
if self._handle_write():
self._socket_timeouts = 0
except socket.timeout:
return self._handle_timeout()
if self._handle_write():
self._socket_timeouts = 0

def _on_connection_closed(self, method_frame, from_adapter=False):
"""Called when the connection is closed remotely. The from_adapter value
Expand Down
5 changes: 4 additions & 1 deletion pika/adapters/tornado_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from tornado import ioloop
import logging
import time
import socket
import errno

from pika.adapters import base_connection

Expand Down Expand Up @@ -47,6 +49,7 @@ def __init__(self, parameters=None,
"""
self.sleep_counter = 0
self.ioloop = custom_ioloop or ioloop.IOLoop.instance()

super(TornadoConnection, self).__init__(parameters,
on_open_callback,
on_open_error_callback,
Expand All @@ -56,7 +59,7 @@ def __init__(self, parameters=None,

def _adapter_connect(self):
"""Connect to the remote socket, adding the socket to the IOLoop if
connected
connected.

:rtype: bool

Expand Down