Skip to content

Commit

Permalink
Merge pull request #578 from wjps/write-starvation-fixes3
Browse files Browse the repository at this point in the history
A fix the write starvation problem that we see with tornado and pika
  • Loading branch information
gmr committed May 18, 2015
2 parents 5e19543 + cd8c9b0 commit c7a72cd
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 43 deletions.
66 changes: 50 additions & 16 deletions pika/adapters/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def remove_timeout(self, timeout_id):
self.ioloop.remove_timeout(timeout_id)

def _adapter_connect(self):
"""Connect to the RabbitMQ broker, returning True if connected
"""Connect to the RabbitMQ broker, returning True if connected.
:returns: error string or exception instance on error; None on success
Expand All @@ -124,6 +124,8 @@ def _adapter_connect(self):
for sock_addr in addresses:
error = self._create_and_connect_to_socket(sock_addr)
if not error:
# Make the socket non-blocking after the connect
self.socket.setblocking(0)
return None
self._cleanup_socket()

Expand Down Expand Up @@ -259,7 +261,10 @@ def _get_error_code(error_value):
return None

def _flush_outbound(self):
"""Call the state manager who will figure out that we need to write."""
"""write early, if the socket will take the data why not get it out
there asap.
"""
self._handle_write()
self._manage_event_state()

def _handle_disconnect(self):
Expand Down Expand Up @@ -317,6 +322,12 @@ def _handle_error(self, error_value):
# Disconnect from our IOLoop and let Connection know what's up
self._handle_disconnect()

def _handle_timeout(self):
"""Handle a socket timeout in read or write.
We don't do anything in the non-blocking handlers because we
only have the socket in a blocking state during connect."""
pass

def _handle_events(self, fd, events, error=None, write_only=False):
"""Handle IO/Event loop events, processing them.
Expand Down Expand Up @@ -354,9 +365,19 @@ def _handle_read(self):
data = self.socket.read(self._buffer_size)
else:
data = self.socket.recv(self._buffer_size)

except socket.timeout:
raise
self._handle_timeout()
return 0

except ssl.SSLWantReadError:
# ssl wants more data but there is nothing currently
# available in the socket, wait for it to become readable.
return 0

except socket.error as error:
if error.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
return 0
return self._handle_error(error)

# Empty data, should disconnect
Expand All @@ -369,22 +390,35 @@ def _handle_read(self):
return len(data)

def _handle_write(self):
"""Handle any outbound buffer writes that need to take place."""
if self.outbound_buffer:
frame = self.outbound_buffer.popleft()
try:
bytes_written = self.socket.send(frame)
if bytes_written < len(frame):
LOGGER.debug("Could not write the full frame")
self.outbound_buffer.appendleft(frame[bytes_written:])
return bytes_written
except socket.timeout:
self.outbound_buffer.appendleft(frame)
raise
except socket.error as error:
"""Try and write as much as we can, if we get blocked requeue
what's left"""
bytes_written = 0
try:
while self.outbound_buffer:
frame = self.outbound_buffer.popleft()
bw = self.socket.send(frame)
bytes_written += bw
if bw < len(frame):
LOGGER.debug("Partial write, requeing remaining data")
self.outbound_buffer.appendleft(frame[bw:])
break

except socket.timeout:
# Will only come here if the socket is blocking
LOGGER.debug("socket timeout, requeuing frame")
self.outbound_buffer.appendleft(frame)
self._handle_timeout()

except socket.error as error:
if error.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
LOGGER.debug("Would block, requeuing frame")
self.outbound_buffer.appendleft(frame)
else:
return self._handle_error(error)

return bytes_written


def _init_connection_state(self):
"""Initialize or reset all of our internal state variables for a given
connection. If we disconnect and reconnect, all of our state needs to
Expand Down
7 changes: 2 additions & 5 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,8 @@ def _handle_timeout(self):
def _flush_outbound(self):
"""Flush the outbound socket buffer."""
if self.outbound_buffer:
try:
if self._handle_write():
self._socket_timeouts = 0
except socket.timeout:
return self._handle_timeout()
if self._handle_write():
self._socket_timeouts = 0

def _on_connection_closed(self, method_frame, from_adapter=False):
"""Called when the connection is closed remotely. The from_adapter value
Expand Down
21 changes: 0 additions & 21 deletions pika/adapters/select_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,6 @@ def _adapter_disconnect(self):
self.ioloop.remove_handler(self.socket.fileno())
super(SelectConnection, self)._adapter_disconnect()

def _flush_outbound(self):
"""Call the state manager who will figure out that we need to write
then call the poller's poll function to force it to process events.
"""
self._manage_event_state()
# Force our poller to come up for air, but in write only mode
# write only mode prevents messages from coming in and kicking off
# events through the consumer
self.ioloop.poll(write_only=True)


class IOLoop(object):
"""Singlton wrapper that decides which type of poller to use, creates an
Expand Down Expand Up @@ -378,17 +368,6 @@ def _process_fd_events(self, fd_event_map, write_only):
handler = self._fd_handlers[fileno]
handler(fileno, events, write_only=write_only)

def _flush_outbound(self):
"""Call the state manager who will figure out that we need to write
then call the poller's poll function to force it to process events.
"""
self._manage_event_state()
# Force our poller to come up for air, but in write only mode
# write only mode prevents messages from coming in and kicking off
# events through the consumer
self.poll(write_only=True)


class KQueuePoller(SelectPoller):
"""KQueuePoller works on BSD based systems and is faster than select"""

Expand Down
2 changes: 1 addition & 1 deletion pika/adapters/tornado_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self,

def _adapter_connect(self):
"""Connect to the remote socket, adding the socket to the IOLoop if
connected
connected.
:rtype: bool
Expand Down

0 comments on commit c7a72cd

Please sign in to comment.