Skip to content

Commit

Permalink
add to some client docstrings, fix some typos
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Apr 8, 2011
1 parent ffd81f5 commit 4d5f89a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 9 deletions.
28 changes: 26 additions & 2 deletions IPython/zmq/parallel/client.py
Expand Up @@ -14,6 +14,7 @@

import os
import time
from getpass import getpass
from pprint import pprint

import zmq
Expand Down Expand Up @@ -86,7 +87,29 @@ def remote_function(f):
#--------------------------------------------------------------------------

class RemoteFunction(object):
"""Turn an existing function into a remote function"""
"""Turn an existing function into a remote function.
Parameters
----------
client : Client instance
The client to be used to connect to engines
f : callable
The function to be wrapped into a remote function
bound : bool [default: False]
Whether the affect the remote namespace when called
block : bool [default: None]
Whether to wait for results or not. The default behavior is
to use the current `block` attribute of `client`
targets : valid target list [default: all]
The targets on which to execute.
"""

client = None # the remote connection
func = None # the wrapped function
block = None # whether to block
bound = None # whether to affect the namespace
targets = None # where to execute

def __init__(self, client, f, bound=False, block=None, targets=None):
self.client = client
Expand All @@ -106,6 +129,7 @@ def __init__(self, msg_id):
self.msg_id = msg_id

class ControllerError(Exception):
"""Exception Class for errors in the controller (not the Engine)."""
def __init__(self, etype, evalue, tb):
self.etype = etype
self.evalue = evalue
Expand Down Expand Up @@ -795,7 +819,7 @@ def pull(self, keys, targets=None, block=True):
"""Pull objects from `target`'s namespace by `keys`"""
if isinstance(keys, str):
pass
elif isistance(keys, (list,tuple,set)):
elif isinstance(keys, (list,tuple,set)):
for key in keys:
if not isinstance(key, str):
raise TypeError
Expand Down
13 changes: 7 additions & 6 deletions IPython/zmq/parallel/controller.py
Expand Up @@ -495,29 +495,30 @@ def save_task_request(self, idents, msg):
client_id = idents[0]

try:
msg = self.session.unpack_message(msg, content=False, copy=False)
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("task::client %r sent invalid task message: %s"%(
client_id, msg), exc_info=True)
return
rec = init_record(msg)
record = init_record(msg)
if MongoDB is not None and isinstance(self.db, MongoDB):
record['buffers'] = map(Binary, record['buffers'])
rec['client_uuid'] = client_id
rec['queue'] = 'task'
record['client_uuid'] = client_id
record['queue'] = 'task'
header = msg['header']
msg_id = header['msg_id']
self.pending.add(msg_id)
self.db.add_record(msg_id, rec)
self.db.add_record(msg_id, record)

def save_task_result(self, idents, msg):
"""save the result of a completed task."""
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False, copy=False)
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("task::invalid task result message send to %r: %s"%(
client_id, msg))
raise
return

parent = msg['parent_header']
Expand Down
2 changes: 1 addition & 1 deletion IPython/zmq/parallel/streamsession.py
Expand Up @@ -65,7 +65,7 @@ def wrap_exception():
tb = traceback.format_exception(etype, evalue, tb)
exc_content = {
'status' : 'error',
'traceback' : tb.encode('utf8'),
'traceback' : [ line.encode('utf8') for line in tb ],
'etype' : etype.encode('utf8'),
'evalue' : evalue.encode('utf8')
}
Expand Down

0 comments on commit 4d5f89a

Please sign in to comment.