Skip to content

Commit

Permalink
Fixes to the heartbeat channel
Browse files Browse the repository at this point in the history
* The heartbeat channel had some erroneous zeromq logic, and entirely False comments (as described in #967).  This has been fixed.

* KernelManager.is_alive() checks if the hb_channel is running if
the kernel is not owned, rather than always returning True.

* BlockingKM's hb_channel has been relaxed to 1s polling, because replies are not
reliably much faster than that.  There are occasional >0.5s outlier responses.
  • Loading branch information
minrk committed Nov 29, 2011
1 parent 34b30bb commit 804dd6f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 69 deletions.
8 changes: 5 additions & 3 deletions IPython/zmq/blockingkernelmanager.py
Expand Up @@ -134,11 +134,13 @@ def msg_ready(self):

class BlockingHBSocketChannel(HBSocketChannel):

# This kernel needs rapid monitoring capabilities
time_to_dead = 0.2
# This kernel needs quicker monitoring, shorten to 1 sec.
# less than 0.5s is unreliable, and will get occasional
# false reports of missed beats.
time_to_dead = 1.

def call_handlers(self, since_last_heartbeat):
#io.rprint('[[Heart]]', since_last_heartbeat) # dbg
"""pause beating on missed heartbeat"""
pass


Expand Down
140 changes: 74 additions & 66 deletions IPython/zmq/kernelmanager.py
Expand Up @@ -471,83 +471,89 @@ class HBSocketChannel(ZMQSocketChannel):
poller = None
_running = None
_pause = None
_beating = None

def __init__(self, context, session, address):
super(HBSocketChannel, self).__init__(context, session, address)
self._running = False
self._pause = True
self._pause =True
self.poller = zmq.Poller()

def _create_socket(self):
if self.socket is not None:
# close previous socket, before opening a new one
self.poller.unregister(self.socket)
self.socket.close()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.connect('tcp://%s:%i' % self.address)
self.poller = zmq.Poller()

self.poller.register(self.socket, zmq.POLLIN)

def _poll(self, start_time):
"""poll for heartbeat replies until we reach self.time_to_dead
Ignores interrupts, and returns the result of poll(), which
will be an empty list if no messages arrived before the timeout,
or the event tuple if there is a message to receive.
"""

until_dead = self.time_to_dead - (time.time() - start_time)
# ensure poll at least once
until_dead = max(until_dead, 1e-3)
events = []
while True:
try:
events = self.poller.poll(1000 * until_dead)
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
# ignore interrupts during heartbeat
# this may never actually happen
until_dead = self.time_to_dead - (time.time() - start_time)
until_dead = max(until_dead, 1e-3)
pass
else:
raise
else:
break
return events

def run(self):
"""The thread's main activity. Call start() instead."""
self._create_socket()
self._running = True
self._beating = True

while self._running:
if self._pause:
# just sleep, and skip the rest of the loop
time.sleep(self.time_to_dead)
continue

since_last_heartbeat = 0.0
# io.rprint('Ping from HB channel') # dbg
# no need to catch EFSM here, because the previous event was
# either a recv or connect, which cannot be followed by EFSM
self.socket.send(b'ping')
request_time = time.time()
ready = self._poll(request_time)
if ready:
self._beating = True
# the poll above guarantees we have something to recv
self.socket.recv()
# sleep the remainder of the cycle
remainder = self.time_to_dead - (time.time() - request_time)
if remainder > 0:
time.sleep(remainder)
continue
else:
since_last_heartbeat = 0.0
request_time = time.time()
try:
#io.rprint('Ping from HB channel') # dbg
self.socket.send(b'ping')
except zmq.ZMQError, e:
#io.rprint('*** HB Error:', e) # dbg
if e.errno == zmq.EFSM:
#io.rprint('sleep...', self.time_to_dead) # dbg
time.sleep(self.time_to_dead)
self._create_socket()
else:
raise
else:
while True:
try:
self.socket.recv(zmq.NOBLOCK)
except zmq.ZMQError, e:
#io.rprint('*** HB Error 2:', e) # dbg
if e.errno == zmq.EAGAIN:
before_poll = time.time()
until_dead = self.time_to_dead - (before_poll -
request_time)

# When the return value of poll() is an empty
# list, that is when things have gone wrong
# (zeromq bug). As long as it is not an empty
# list, poll is working correctly even if it
# returns quickly. Note: poll timeout is in
# milliseconds.
if until_dead > 0.0:
while True:
try:
self.poller.poll(1000 * until_dead)
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
continue
else:
raise
else:
break

since_last_heartbeat = time.time()-request_time
if since_last_heartbeat > self.time_to_dead:
self.call_handlers(since_last_heartbeat)
break
else:
# FIXME: We should probably log this instead.
raise
else:
until_dead = self.time_to_dead - (time.time() -
request_time)
if until_dead > 0.0:
#io.rprint('sleep...', self.time_to_dead) # dbg
time.sleep(until_dead)
break
# nothing was received within the time limit, signal heart failure
self._beating = False
since_last_heartbeat = time.time() - request_time
self.call_handlers(since_last_heartbeat)
# and close/reopen the socket, because the REQ/REP cycle has been broken
self._create_socket()
continue

def pause(self):
"""Pause the heartbeat."""
Expand All @@ -558,8 +564,8 @@ def unpause(self):
self._pause = False

def is_beating(self):
"""Is the heartbeat running and not paused."""
if self.is_alive() and not self._pause:
"""Is the heartbeat running and responsive (and not paused)."""
if self.is_alive() and not self._pause and self._beating:
return True
else:
return False
Expand All @@ -573,7 +579,7 @@ def call_handlers(self, since_last_heartbeat):
Subclasses should override this method to handle incoming messages.
It is important to remember that this method is called in the thread
so that some logic must be done to ensure that the application leve
so that some logic must be done to ensure that the application level
handlers are called in the application thread.
"""
raise NotImplementedError('call_handlers must be defined in a subclass.')
Expand Down Expand Up @@ -900,16 +906,18 @@ def signal_kernel(self, signum):
@property
def is_alive(self):
"""Is the kernel process still running?"""
# FIXME: not using a heartbeat means this method is broken for any
# remote kernel, it's only capable of handling local kernels.
if self.has_kernel:
if self.kernel.poll() is None:
return True
else:
return False
elif self._hb_channel is not None:
# We didn't start the kernel with this KernelManager so we
# use the heartbeat.
return self._hb_channel.is_beating()
else:
# We didn't start the kernel with this KernelManager so we don't
# know if it is running. We should use a heartbeat for this case.
# no heartbeat and not local, we can't tell if it's running,
# so naively return True
return True

#--------------------------------------------------------------------------
Expand Down

0 comments on commit 804dd6f

Please sign in to comment.