Skip to content

Commit

Permalink
Merge pull request ipython#4074 from minrk/close-on-fail
Browse files Browse the repository at this point in the history
close Client sockets if connection fails

also cleanup Client.close a little bit, allowing setting linger on sockets at shutdown time,
which is helpful for preventing FD growth on connection failure.

Should be considered for backport to 1.1.
  • Loading branch information
minrk committed Sep 3, 2013
2 parents 50cf452 + d888764 commit 2e0134d
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions IPython/parallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,12 @@ def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=No
}
self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
'apply_reply' : self._handle_apply_reply}
self._connect(sshserver, ssh_kwargs, timeout)

try:
self._connect(sshserver, ssh_kwargs, timeout)
except:
self.close(linger=0)
raise

# last step: setup magics, if we are in IPython:

Expand Down Expand Up @@ -599,7 +604,6 @@ def _connect(self, sshserver, ssh_kwargs, timeout):
self._connected=True

def connect_socket(s, url):
# url = util.disambiguate_url(url, self._config['location'])
if self._ssh:
return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
else:
Expand Down Expand Up @@ -956,14 +960,23 @@ def activate(self, targets='all', suffix=''):
view.activate(suffix)
return view

def close(self):
def close(self, linger=None):
"""Close my zmq Sockets
If `linger`, set the zmq LINGER socket option,
which allows discarding of messages.
"""
if self._closed:
return
self.stop_spin_thread()
snames = filter(lambda n: n.endswith('socket'), dir(self))
for socket in map(lambda name: getattr(self, name), snames):
if isinstance(socket, zmq.Socket) and not socket.closed:
socket.close()
snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
for name in snames:
socket = getattr(self, name)
if socket is not None and not socket.closed:
if linger is not None:
socket.close(linger=linger)
else:
socket.close()
self._closed = True

def _spin_every(self, interval=1):
Expand Down

0 comments on commit 2e0134d

Please sign in to comment.