Permalink
Browse files

adapt kernel/error.py to zmq, improve error propagation.

  • Loading branch information...
1 parent 65bfa60 commit b0d94c76944aa7bce95b820f8c78028bfbf4e8f7 @minrk minrk committed Jan 21, 2011
@@ -27,6 +27,7 @@
# from remotenamespace import RemoteNamespace
from view import DirectView, LoadBalancedView
from dependency import Dependency, depend, require
+import error
def _push(ns):
globals().update(ns)
@@ -128,13 +129,14 @@ class AbortedTask(object):
def __init__(self, msg_id):
self.msg_id = msg_id
-class ControllerError(Exception):
- """Exception Class for errors in the controller (not the Engine)."""
- def __init__(self, etype, evalue, tb):
- self.etype = etype
- self.evalue = evalue
- self.traceback=tb
-
+class ResultDict(dict):
+ """A subclass of dict that raises errors if it has them."""
+ def __getitem__(self, key):
+ res = dict.__getitem__(self, key)
+ if isinstance(res, error.KernelError):
+ raise res
+ return res
+
class Client(object):
"""A semi-synchronous client to the IPython ZMQ controller
@@ -402,12 +404,18 @@ def _handle_apply_reply(self, msg):
if content['status'] == 'ok':
self.results[msg_id] = ss.unserialize_object(msg['buffers'])
elif content['status'] == 'aborted':
- self.results[msg_id] = AbortedTask(msg_id)
+ self.results[msg_id] = error.AbortedTask(msg_id)
elif content['status'] == 'resubmitted':
# TODO: handle resubmission
pass
else:
- self.results[msg_id] = ss.unwrap_exception(content)
+ e = ss.unwrap_exception(content)
+ e_uuid = e.engine_info['engineid']
+ for k,v in self._engines.iteritems():
+ if v == e_uuid:
+ e.engine_info['engineid'] = k
+ break
+ self.results[msg_id] = e
def _flush_notifications(self):
"""Flush notifications of engine registrations waiting
@@ -649,6 +657,13 @@ def run(self, code, block=None):
result = self.apply(execute, (code,), targets=None, block=block, bound=False)
return result
+ def _maybe_raise(self, result):
+ """wrapper for maybe raising an exception if apply failed."""
+ if isinstance(result, error.RemoteError):
+ raise result
+
+ return result
+
def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
after=None, follow=None):
"""Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
@@ -758,7 +773,7 @@ def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
self.history.append(msg_id)
if block:
self.barrier(msg_id)
- return self.results[msg_id]
+ return self._maybe_raise(self.results[msg_id])
else:
return msg_id
@@ -795,12 +810,12 @@ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
else:
return msg_ids
if len(msg_ids) == 1:
- return self.results[msg_ids[0]]
+ return self._maybe_raise(self.results[msg_ids[0]])
else:
result = {}
for target,mid in zip(targets, msg_ids):
result[target] = self.results[mid]
- return result
+ return error.collect_exceptions(result, f.__name__)
#--------------------------------------------------------------------------
# Data movement
@@ -0,0 +1,276 @@
+# encoding: utf-8
+
+"""Classes and functions for kernel related errors and exceptions."""
+from __future__ import print_function
+
+__docformat__ = "restructuredtext en"
+
+# Tell nose to skip this module
+__test__ = {}
+
+#-------------------------------------------------------------------------------
+# Copyright (C) 2008 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-------------------------------------------------------------------------------
+
+#-------------------------------------------------------------------------------
+# Error classes
+#-------------------------------------------------------------------------------
+class IPythonError(Exception):
+ """Base exception that all of our exceptions inherit from.
+
+ This can be raised by code that doesn't have any more specific
+ information."""
+
+ pass
+
+# Exceptions associated with the controller objects
+class ControllerError(IPythonError): pass
+
+class ControllerCreationError(ControllerError): pass
+
+
+# Exceptions associated with the Engines
+class EngineError(IPythonError): pass
+
+class EngineCreationError(EngineError): pass
+
+class KernelError(IPythonError):
+ pass
+
+class NotDefined(KernelError):
+ def __init__(self, name):
+ self.name = name
+ self.args = (name,)
+
+ def __repr__(self):
+ return '<NotDefined: %s>' % self.name
+
+ __str__ = __repr__
+
+
+class QueueCleared(KernelError):
+ pass
+
+
+class IdInUse(KernelError):
+ pass
+
+
+class ProtocolError(KernelError):
+ pass
+
+
+class ConnectionError(KernelError):
+ pass
+
+
+class InvalidEngineID(KernelError):
+ pass
+
+
+class NoEnginesRegistered(KernelError):
+ pass
+
+
+class InvalidClientID(KernelError):
+ pass
+
+
+class InvalidDeferredID(KernelError):
+ pass
+
+
+class SerializationError(KernelError):
+ pass
+
+
+class MessageSizeError(KernelError):
+ pass
+
+
+class PBMessageSizeError(MessageSizeError):
+ pass
+
+
+class ResultNotCompleted(KernelError):
+ pass
+
+
+class ResultAlreadyRetrieved(KernelError):
+ pass
+
+class ClientError(KernelError):
+ pass
+
+
+class TaskAborted(KernelError):
+ pass
+
+
+class TaskTimeout(KernelError):
+ pass
+
+
+class NotAPendingResult(KernelError):
+ pass
+
+
+class UnpickleableException(KernelError):
+ pass
+
+
+class AbortedPendingDeferredError(KernelError):
+ pass
+
+
+class InvalidProperty(KernelError):
+ pass
+
+
+class MissingBlockArgument(KernelError):
+ pass
+
+
+class StopLocalExecution(KernelError):
+ pass
+
+
+class SecurityError(KernelError):
+ pass
+
+
+class FileTimeoutError(KernelError):
+ pass
+
+class RemoteError(KernelError):
+ """Error raised elsewhere"""
+ ename=None
+ evalue=None
+ traceback=None
+ engine_info=None
+
+ def __init__(self, ename, evalue, traceback, engine_info=None):
+ self.ename=ename
+ self.evalue=evalue
+ self.traceback=traceback
+ self.engine_info=engine_info or {}
+ self.args=(ename, evalue)
+
+ def __repr__(self):
+ engineid = self.engine_info.get('engineid', ' ')
+ return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
+
+ def __str__(self):
+ sig = "%s(%s)"%(self.ename, self.evalue)
+ if self.traceback:
+ return sig + '\n' + self.traceback
+ else:
+ return sig
+
+
+class TaskRejectError(KernelError):
+ """Exception to raise when a task should be rejected by an engine.
+
+ This exception can be used to allow a task running on an engine to test
+ if the engine (or the user's namespace on the engine) has the needed
+ task dependencies. If not, the task should raise this exception. For
+ the task to be retried on another engine, the task should be created
+ with the `retries` argument > 1.
+
+ The advantage of this approach over our older properties system is that
+ tasks have full access to the user's namespace on the engines and the
+ properties don't have to be managed or tested by the controller.
+ """
+
+
+class CompositeError(KernelError):
+ """Error for representing possibly multiple errors on engines"""
+ def __init__(self, message, elist):
+ Exception.__init__(self, *(message, elist))
+ # Don't use pack_exception because it will conflict with the .message
+ # attribute that is being deprecated in 2.6 and beyond.
+ self.msg = message
+ self.elist = elist
+ self.args = [ e[0] for e in elist ]
+
+ def _get_engine_str(self, ei):
+ if not ei:
+ return '[Engine Exception]'
+ else:
+ return '[%i:%s]: ' % (ei['engineid'], ei['method'])
+
+ def _get_traceback(self, ev):
+ try:
+ tb = ev._ipython_traceback_text
+ except AttributeError:
+ return 'No traceback available'
+ else:
+ return tb
+
+ def __str__(self):
+ s = str(self.msg)
+ for en, ev, etb, ei in self.elist:
+ engine_str = self._get_engine_str(ei)
+ s = s + '\n' + engine_str + en + ': ' + str(ev)
+ return s
+
+ def __repr__(self):
+ return "CompositeError(%i)"%len(self.elist)
+
+ def print_tracebacks(self, excid=None):
+ if excid is None:
+ for (en,ev,etb,ei) in self.elist:
+ print (self._get_engine_str(ei))
+ print (etb or 'No traceback available')
+ print ()
+ else:
+ try:
+ en,ev,etb,ei = self.elist[excid]
+ except:
+ raise IndexError("an exception with index %i does not exist"%excid)
+ else:
+ print (self._get_engine_str(ei))
+ print (etb or 'No traceback available')
+
+ def raise_exception(self, excid=0):
+ try:
+ en,ev,etb,ei = self.elist[excid]
+ except:
+ raise IndexError("an exception with index %i does not exist"%excid)
+ else:
+ try:
+ raise RemoteError(en, ev, etb, ei)
+ except:
+ et,ev,tb = sys.exc_info()
+
+
+def collect_exceptions(rdict, method):
+ """check a result dict for errors, and raise CompositeError if any exist.
+ Passthrough otherwise."""
+ elist = []
+ for r in rdict.values():
+ if isinstance(r, RemoteError):
+ en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
+ # Sometimes we could have CompositeError in our list. Just take
+ # the errors out of them and put them in our new list. This
+ # has the effect of flattening lists of CompositeErrors into one
+ # CompositeError
+ if en=='CompositeError':
+ for e in ev.elist:
+ elist.append(e)
+ else:
+ elist.append((en, ev, etb, ei))
+ if len(elist)==0:
+ return rdict
+ else:
+ msg = "one or more exceptions from call to method: %s" % (method)
+ # This silliness is needed so the debugger has access to the exception
+ # instance (e in this case)
+ try:
+ raise CompositeError(msg, elist)
+ except CompositeError, e:
+ raise e
+
Oops, something went wrong.

0 comments on commit b0d94c7

Please sign in to comment.