Skip to content

Commit

Permalink
Socket close/shutdown cleanup
Browse files Browse the repository at this point in the history
This is directly taken from #505 but I could not cleanly merge it due to cleanup and some post-PR refactoring. Thanks @	kherrala
  • Loading branch information
gmr committed Apr 29, 2015
1 parent 5289125 commit 1c921c1
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 15 deletions.
34 changes: 23 additions & 11 deletions pika/adapters/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def __init__(self,
raise RuntimeError("SSL specified but it is not available")
self.base_events = self.READ | self.ERROR
self.event_state = self.base_events
self.fd = None
self.ioloop = ioloop
self.socket = None
self.stop_ioloop_on_close = stop_ioloop_on_close
Expand Down Expand Up @@ -125,15 +124,15 @@ def _adapter_connect(self):
error = self._create_and_connect_to_socket(sock_addr)
if not error:
return None
self._cleanup_socket()

# Failed to connect
return error

def _adapter_disconnect(self):
"""Invoked if the connection is being told to disconnect"""
self._remove_heartbeat()
if self.socket:
self.socket.close()
self.socket = None
self._cleanup_socket()
self._check_state_on_disconnect()
self._handle_ioloop_stop()
self._init_connection_state()
Expand Down Expand Up @@ -162,6 +161,19 @@ def _check_state_on_disconnect(self):
LOGGER.warning('Unknown state on disconnect: %i',
self.connection_state)

def _cleanup_socket(self):
"""Close the socket cleanly"""
if self.socket:
try:
if hasattr('socket', self.socket):
self.socket.socket.shutdown(socket.SHUT_RDWR)

This comment has been minimized.

Copy link
@vitaly-krugl

vitaly-krugl May 4, 2015

Member

@gmr, what's the point of accessing self.socket.socket for the shutdown call? Thanks, just curious.

else:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
self.socket.close()
self.socket = None

def _create_and_connect_to_socket(self, sock_addr_tuple):
"""Create socket and connect to it, using SSL if enabled."""
self.socket = socket.socket(sock_addr_tuple[0], socket.SOCK_STREAM, 0)
Expand Down Expand Up @@ -311,23 +323,24 @@ def _handle_events(self, fd, events, error=None, write_only=False):
:param bool write_only: Only handle write events
"""
if not fd:
LOGGER.error('Received events on closed socket: %d', fd)
if not self.socket:
LOGGER.error('Received events on closed socket: %r', fd)
return

if events & self.WRITE:
if self.socket and (events & self.WRITE):
self._handle_write()
self._manage_event_state()

if not write_only and (events & self.READ):
if self.socket and not write_only and (events & self.READ):
self._handle_read()

if write_only and (events & self.READ) and (events & self.ERROR):
if (self.socket and write_only and (events & self.READ) and
(events & self.ERROR)):
LOGGER.error('BAD libc: Write-Only but Read+Error. '
'Assume socket disconnected.')
self._handle_disconnect()

if events & self.ERROR:
if self.socket and (events & self.ERROR):
LOGGER.error('Error event %r, %r', events, error)
self._handle_error(error)

Expand Down Expand Up @@ -376,7 +389,6 @@ def _init_connection_state(self):
"""
super(BaseConnection, self)._init_connection_state()
self.fd = None
self.base_events = self.READ | self.ERROR
self.event_state = self.base_events
self.socket = None
Expand Down
5 changes: 2 additions & 3 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,8 @@ def _adapter_connect(self):

def _adapter_disconnect(self):
"""Called if the connection is being requested to disconnect."""
if self.socket:
self.socket.close()
self.socket = None
self._remove_heartbeat()
self._cleanup_socket()
self._check_state_on_disconnect()
self._init_connection_state()

Expand Down
2 changes: 1 addition & 1 deletion pika/adapters/twisted_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _adapter_connect(self):
def _adapter_disconnect(self):
"""Called when the adapter should disconnect"""
self.ioloop.remove_handler(None)
self.socket.close()
self._cleanup_socket()

def _handle_disconnect(self):
"""Do not stop the reactor, this would cause the entire process to exit,
Expand Down

0 comments on commit 1c921c1

Please sign in to comment.