Skip to content
Browse files

more graceful handling of dying engines

  • Loading branch information...
1 parent 5e3faa6 commit cfbd77bcca876aa5147667abb7fbd2ffa1f271ea @minrk minrk committed Mar 14, 2011
Showing with 116 additions and 62 deletions.
  1. +65 −24 IPython/zmq/parallel/client.py
  2. +51 −38 IPython/zmq/parallel/hub.py
View
89 IPython/zmq/parallel/client.py
@@ -252,13 +252,14 @@ class Client(HasTraits):
block = Bool(False)
- outstanding=Set()
- results = Dict()
- metadata = Dict()
+ outstanding = Set()
+ results = Instance('collections.defaultdict', (dict,))
+ metadata = Instance('collections.defaultdict', (Metadata,))
history = List()
debug = Bool(False)
profile=CUnicode('default')
+ _outstanding_dict = Instance('collections.defaultdict', (set,))
_ids = List()
_connected=Bool(False)
_ssh=Bool(False)
@@ -498,23 +499,6 @@ def _unwrap_exception(self, content):
e.engine_info['engine_id'] = eid
return e
- def _register_engine(self, msg):
- """Register a new engine, and update our connection info."""
- content = msg['content']
- eid = content['id']
- d = {eid : content['queue']}
- self._update_engines(d)
-
- def _unregister_engine(self, msg):
- """Unregister an engine that has died."""
- content = msg['content']
- eid = int(content['id'])
- if eid in self._ids:
- self._ids.remove(eid)
- self._engines.pop(eid)
- if self._task_socket and self._task_scheme == 'pure':
- self._stop_scheduling_tasks()
-
def _extract_metadata(self, header, parent, content):
md = {'msg_id' : parent['msg_id'],
'received' : datetime.now(),
@@ -535,6 +519,54 @@ def _extract_metadata(self, header, parent, content):
md['completed'] = datetime.strptime(header['date'], util.ISO8601)
return md
+ def _register_engine(self, msg):
+ """Register a new engine, and update our connection info."""
+ content = msg['content']
+ eid = content['id']
+ d = {eid : content['queue']}
+ self._update_engines(d)
+
+ def _unregister_engine(self, msg):
+ """Unregister an engine that has died."""
+ content = msg['content']
+ eid = int(content['id'])
+ if eid in self._ids:
+ self._ids.remove(eid)
+ uuid = self._engines.pop(eid)
+
+ self._handle_stranded_msgs(eid, uuid)
+
+ if self._task_socket and self._task_scheme == 'pure':
+ self._stop_scheduling_tasks()
+
+ def _handle_stranded_msgs(self, eid, uuid):
+ """Handle messages known to be on an engine when the engine unregisters.
+
+ It is possible that this will fire prematurely - that is, an engine will
+ go down after completing a result, and the client will be notified
+ of the unregistration and later receive the successful result.
+ """
+
+ outstanding = self._outstanding_dict[uuid]
+
+ for msg_id in list(outstanding):
+ print msg_id
+ if msg_id in self.results:
+ # we already
+ continue
+ try:
+ raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
+ except:
+ content = error.wrap_exception()
+ # build a fake message:
+ parent = {}
+ header = {}
+ parent['msg_id'] = msg_id
+ header['engine'] = uuid
+ header['date'] = datetime.now().strftime(util.ISO8601)
+ msg = dict(parent_header=parent, header=header, content=content)
+ self._handle_apply_reply(msg)
+
def _handle_execute_reply(self, msg):
"""Save the reply to an execute_request into our results.
@@ -569,10 +601,15 @@ def _handle_apply_reply(self, msg):
header = msg['header']
# construct metadata:
- md = self.metadata.setdefault(msg_id, Metadata())
+ md = self.metadata[msg_id]
md.update(self._extract_metadata(header, parent, content))
+ # is this redundant?
self.metadata[msg_id] = md
+ e_outstanding = self._outstanding_dict[md['engine_uuid']]
+ if msg_id in e_outstanding:
+ e_outstanding.remove(msg_id)
+
# construct result:
if content['status'] == 'ok':
self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
@@ -642,7 +679,7 @@ def _flush_iopub(self, sock):
msg_type = msg['msg_type']
# init metadata:
- md = self.metadata.setdefault(msg_id, Metadata())
+ md = self.metadata[msg_id]
if msg_type == 'stream':
name = content['name']
@@ -653,6 +690,7 @@ def _flush_iopub(self, sock):
else:
md.update({msg_type : content['data']})
+ # reduntant?
self.metadata[msg_id] = md
msg = self.session.recv(sock, mode=zmq.NOBLOCK)
@@ -1067,6 +1105,8 @@ def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
msg_id = msg['msg_id']
self.outstanding.add(msg_id)
self.history.append(msg_id)
+ self.metadata[msg_id]['submitted'] = datetime.now()
+
ar = AsyncResult(self, [msg_id], fname=f.__name__)
if block:
try:
@@ -1099,6 +1139,7 @@ def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
content=content, buffers=bufs, ident=ident, subheader=subheader)
msg_id = msg['msg_id']
self.outstanding.add(msg_id)
+ self._outstanding_dict[ident].add(msg_id)
self.history.append(msg_id)
msg_ids.append(msg_id)
ar = AsyncResult(self, msg_ids, fname=f.__name__)
@@ -1345,7 +1386,7 @@ def result_status(self, msg_ids, status_only=True):
is False, then completed results will be keyed by their `msg_id`.
"""
if not isinstance(msg_ids, (list,tuple)):
- indices_or_msg_ids = [msg_ids]
+ msg_ids = [msg_ids]
theids = []
for msg_id in msg_ids:
@@ -1398,7 +1439,7 @@ def result_status(self, msg_ids, status_only=True):
if isinstance(rcontent, str):
rcontent = self.session.unpack(rcontent)
- md = self.metadata.setdefault(msg_id, Metadata())
+ md = self.metadata[msg_id]
md.update(self._extract_metadata(header, parent, rcontent))
md.update(iodict)
View
89 IPython/zmq/parallel/hub.py
@@ -231,7 +231,6 @@ def construct_hub(self):
# connect the db
self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
cdir = self.config.Global.cluster_dir
- print (cdir)
self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
time.sleep(.25)
@@ -415,24 +414,6 @@ def _validate_targets(self, targets):
raise IndexError("No Engines Registered")
return targets
- def _validate_client_msg(self, msg):
- """validates and unpacks headers of a message. Returns False if invalid,
- (ident, header, parent, content)"""
- client_id = msg[0]
- try:
- msg = self.session.unpack_message(msg[1:], content=True)
- except:
- self.log.error("client::Invalid Message %s"%msg, exc_info=True)
- return False
-
- msg_type = msg.get('msg_type', None)
- if msg_type is None:
- return False
- header = msg.get('header')
- # session doesn't handle split content for now:
- return client_id, msg
-
-
#-----------------------------------------------------------------------------
# dispatch methods (1 per stream)
#-----------------------------------------------------------------------------
@@ -598,22 +579,27 @@ def save_queue_result(self, idents, msg):
self.all_completed.add(msg_id)
self.queues[eid].remove(msg_id)
self.completed[eid].append(msg_id)
- rheader = msg['header']
- completed = datetime.strptime(rheader['date'], ISO8601)
- started = rheader.get('started', None)
- if started is not None:
- started = datetime.strptime(started, ISO8601)
- result = {
- 'result_header' : rheader,
- 'result_content': msg['content'],
- 'started' : started,
- 'completed' : completed
- }
+ elif msg_id not in self.all_completed:
+ # it could be a result from a dead engine that died before delivering the
+ # result
+ self.log.warn("queue:: unknown msg finished %s"%msg_id)
+ return
+ # update record anyway, because the unregistration could have been premature
+ rheader = msg['header']
+ completed = datetime.strptime(rheader['date'], ISO8601)
+ started = rheader.get('started', None)
+ if started is not None:
+ started = datetime.strptime(started, ISO8601)
+ result = {
+ 'result_header' : rheader,
+ 'result_content': msg['content'],
+ 'started' : started,
+ 'completed' : completed
+ }
- result['result_buffers'] = msg['buffers']
- self.db.update_record(msg_id, result)
- else:
- self.log.debug("queue:: unknown msg finished %s"%msg_id)
+ result['result_buffers'] = msg['buffers']
+ self.db.update_record(msg_id, result)
+
#--------------------- Task Queue Traffic ------------------------------
@@ -841,20 +827,46 @@ def unregister_engine(self, ident, msg):
self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
return
self.log.info("registration::unregister_engine(%s)"%eid)
+ # print (eid)
content=dict(id=eid, queue=self.engines[eid].queue)
self.ids.remove(eid)
- self.keytable.pop(eid)
+ uuid = self.keytable.pop(eid)
ec = self.engines.pop(eid)
self.hearts.pop(ec.heartbeat)
self.by_ident.pop(ec.queue)
self.completed.pop(eid)
- for msg_id in self.queues.pop(eid):
- msg = self.pending.remove(msg_id)
+ self._handle_stranded_msgs(eid, uuid)
############## TODO: HANDLE IT ################
if self.notifier:
self.session.send(self.notifier, "unregistration_notification", content=content)
+ def _handle_stranded_msgs(self, eid, uuid):
+ """Handle messages known to be on an engine when the engine unregisters.
+
+ It is possible that this will fire prematurely - that is, an engine will
+ go down after completing a result, and the client will be notified
+ that the result failed and later receive the actual result.
+ """
+
+ outstanding = self.queues.pop(eid)
+
+ for msg_id in outstanding:
+ self.pending.remove(msg_id)
+ self.all_completed.add(msg_id)
+ try:
+ raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
+ except:
+ content = error.wrap_exception()
+ # build a fake header:
+ header = {}
+ header['engine'] = uuid
+ header['date'] = datetime.now().strftime(ISO8601)
+ rec = dict(result_content=content, result_header=header, result_buffers=[])
+ rec['completed'] = header['date']
+ rec['engine_uuid'] = uuid
+ self.db.update_record(msg_id, rec)
+
def finish_registration(self, heart):
"""Second half of engine registration, called after our HeartMonitor
has received a beat from the Engine's Heart."""
@@ -1029,7 +1041,8 @@ def get_results(self, client_id, msg):
'result_header' : rec['result_header'],
'io' : io_dict,
}
- buffers.extend(map(str, rec['result_buffers']))
+ if rec['result_buffers']:
+ buffers.extend(map(str, rec['result_buffers']))
else:
try:
raise KeyError('No such message: '+msg_id)

0 comments on commit cfbd77b

Please sign in to comment.
Something went wrong with that request. Please try again.