diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 30ddee20942..7353d97b846 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -14,6 +14,7 @@ import os import time +from getpass import getpass from pprint import pprint import zmq @@ -86,7 +87,29 @@ def remote_function(f): #-------------------------------------------------------------------------- class RemoteFunction(object): - """Turn an existing function into a remote function""" + """Turn an existing function into a remote function. + + Parameters + ---------- + + client : Client instance + The client to be used to connect to engines + f : callable + The function to be wrapped into a remote function + bound : bool [default: False] + Whether the affect the remote namespace when called + block : bool [default: None] + Whether to wait for results or not. The default behavior is + to use the current `block` attribute of `client` + targets : valid target list [default: all] + The targets on which to execute. + """ + + client = None # the remote connection + func = None # the wrapped function + block = None # whether to block + bound = None # whether to affect the namespace + targets = None # where to execute def __init__(self, client, f, bound=False, block=None, targets=None): self.client = client @@ -106,6 +129,7 @@ def __init__(self, msg_id): self.msg_id = msg_id class ControllerError(Exception): + """Exception Class for errors in the controller (not the Engine).""" def __init__(self, etype, evalue, tb): self.etype = etype self.evalue = evalue @@ -795,7 +819,7 @@ def pull(self, keys, targets=None, block=True): """Pull objects from `target`'s namespace by `keys`""" if isinstance(keys, str): pass - elif isistance(keys, (list,tuple,set)): + elif isinstance(keys, (list,tuple,set)): for key in keys: if not isinstance(key, str): raise TypeError diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index a7da3649f44..ab0fba37736 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -495,29 +495,30 @@ def save_task_request(self, idents, msg): client_id = idents[0] try: - msg = self.session.unpack_message(msg, content=False, copy=False) + msg = self.session.unpack_message(msg, content=False) except: logger.error("task::client %r sent invalid task message: %s"%( client_id, msg), exc_info=True) return - rec = init_record(msg) + record = init_record(msg) if MongoDB is not None and isinstance(self.db, MongoDB): record['buffers'] = map(Binary, record['buffers']) - rec['client_uuid'] = client_id - rec['queue'] = 'task' + record['client_uuid'] = client_id + record['queue'] = 'task' header = msg['header'] msg_id = header['msg_id'] self.pending.add(msg_id) - self.db.add_record(msg_id, rec) + self.db.add_record(msg_id, record) def save_task_result(self, idents, msg): """save the result of a completed task.""" client_id = idents[0] try: - msg = self.session.unpack_message(msg, content=False, copy=False) + msg = self.session.unpack_message(msg, content=False) except: logger.error("task::invalid task result message send to %r: %s"%( client_id, msg)) + raise return parent = msg['parent_header'] diff --git a/IPython/zmq/parallel/streamsession.py b/IPython/zmq/parallel/streamsession.py index 2c6ee85cf02..dd8768af35e 100644 --- a/IPython/zmq/parallel/streamsession.py +++ b/IPython/zmq/parallel/streamsession.py @@ -65,7 +65,7 @@ def wrap_exception(): tb = traceback.format_exception(etype, evalue, tb) exc_content = { 'status' : 'error', - 'traceback' : tb.encode('utf8'), + 'traceback' : [ line.encode('utf8') for line in tb ], 'etype' : etype.encode('utf8'), 'evalue' : evalue.encode('utf8') }